Last Updated: December 19, 2025
A Task Scheduler is a system that manages the execution of tasks at predefined times or intervals. It is commonly used in operating systems, distributed systems, and backend services to automate jobs like backups, notifications, report generation, and periodic cleanup tasks.
For example, a task might be scheduled to run once at 8:00 AM, every day at midnight, or 5 minutes after another task completes.
The scheduler must ensure these tasks run reliably and at the correct times, even under heavy load or failures.
In this chapter, we will explore the low-level design of a Task Scheduler.
Let’s start by clarifying the requirements:
Before starting the design, it's important to ask thoughtful questions to uncover hidden assumptions and better define the scope of the system.
Here is an example of how a conversation between the candidate and the interviewer might unfold:
Candidate: Should the scheduler support recurring tasks or only one-time scheduled tasks?
Interviewer: Good question. For this version, let’s support both one-time and recurring tasks (e.g., every 5 minutes).
Candidate: Should tasks be executed exactly on time, or is a small delay acceptable?
Interviewer: A small delay is acceptable. We're not aiming for real-time precision, but tasks should execute as close as possible to the scheduled time.
Candidate: Should the scheduler support retries if a task fails?
Interviewer: No retries for now. However, you can keep the design open to supporting retries later.
Candidate: Can a task have dependencies, such that one task should only start after another completes?
Interviewer: Let's not handle task dependencies in this version. All tasks are independent.
Candidate: Can multiple tasks run in parallel?
Interviewer: Yes. The system should be able to run multiple tasks concurrently using a configurable number of worker threads.
After gathering the details, we can summarize the key system requirements.
Core entities are the fundamental building blocks of our system. We identify them by analyzing the functional requirements and highlighting the key nouns and responsibilities that naturally map to object-oriented abstractions such as classes, enums, or interfaces.
Let’s walk through the functional requirements and extract the relevant entities:
This suggests we need a Task interface to represent executable logic. Users will implement this interface for each type of task (e.g., PrintMessageTask, DataBackupTask).
To differentiate how and when tasks should be executed (once vs repeatedly), we introduce a SchedulingStrategy abstraction that calculates the next execution time based on logic like “run once†or “run every N seconds.â€
We need a ScheduledTask entity to encapsulate a task, its scheduling strategy, execution metadata (e.g., nextExecutionTime, lastExecutionTime), and a unique ID. This allows the scheduler to manage execution order and rescheduling.
We introduce a TaskExecutionObserver interface with hooks for task lifecycle events (e.g., started, completed, failed). Implementations like LoggingObserver can observe and log events such as task start, completion, and failure.
This points to a main orchestrator class, the TaskSchedulerService. This service will be responsible for:
Task: An interface representing any executable job. It defines a single execute() method, decoupling the scheduler from the concrete work being done.SchedulingStrategy: An interface (Strategy Pattern) that defines how to calculate the next execution time for a task. This allows for flexible and interchangeable scheduling logic (e.g., one-time, recurring).ScheduledTask: A wrapper class that bundles a Task with its SchedulingStrategy and its current state (e.g., nextExecutionTime). This is the object that the scheduler's priority queue will manage.TaskExecutionObserver: An interface (Observer Pattern) for objects that need to be notified about task lifecycle events, such as when a task starts, completes, or fails.TaskSchedulerService: The central engine and Facade of the system. It manages a priority queue of ScheduledTasks and a pool of worker threads. It is responsible for dispatching tasks at the correct time, executing them, and notifying observers.These core entities define the key abstractions of the task scheduler and will guide the structure of our low-level design and class diagrams.
Once the core entities have been identified, the next step is to design the system's class structure. This involves defining the attributes and behaviors of each class, establishing relationships among them, applying relevant object-oriented design patterns, and visualizing the overall architecture using a class diagram.
Represents a unit of work that can be scheduled and executed.
Acts as a wrapper or container that bundles a Task with its SchedulingStrategy and state.
This is the object that the scheduler's internal priority queue will store and manage. Its compareTo method is crucial for ensuring the queue is ordered by the next execution time.
Attributes:
Task interface. Methods (excluding getters):
lastExecutionTime to the task’s most recent run.true if the task still has future scheduled runs (e.g., recurring), otherwise false.nextExecutionTime, enabling priority queue ordering.Class Relationships:
This is the central engine of the system. It acts as a Facade, managing a thread pool and a priority queue to execute tasks according to their schedules.
Attributes:
ScheduledTask objects, ordered by execution time.TaskExecutionObserver instances that receive callbacks on task lifecycle events.Methods:
Class Relationships:
The Task interface and its implementations (PrintMessageTask, etc.) embody the Command Pattern.
Each Task object encapsulates a request (an action to be performed) as an object. The TaskSchedulerService acts as the "invoker," which holds and executes these commands without needing to know the details of the specific action.
Class Relationship: PrintMessageTask and DataBackupTask implement the Task interface.
The SchedulingStrategy interface and its concrete implementations (OneTimeSchedulingStrategy, RecurringSchedulingStrategy) are a classic example of the Strategy Pattern.
This allows the algorithm for determining a task's schedule to be selected at runtime and encapsulated independently from the task itself.
Class Relationship: OneTimeSchedulingStrategy and RecurringStrategy implement the SchedulingStrategy interface.
The TaskExecutionObserver interface, along with the TaskSchedulerService as the "subject," forms the Observer Pattern.
This allows multiple, disparate objects (like a logger) to subscribe to and be notified of task lifecycle events without being tightly coupled to the scheduler's execution logic.
Class Relationship: LoggingObserver implements the TaskExecutionObserver interface.
The TaskSchedulerService and its worker threads implement this pattern. The schedule() method acts as the "producer," adding ScheduledTask objects to the shared PriorityBlockingQueue. The worker threads act as "consumers," taking tasks from the queue and processing them. The blocking queue handles the synchronization between them.
The TaskSchedulerService serves as a Facade. It provides a simple, high-level interface (schedule(), shutdown()) that hides the complex internal machinery of the priority queue, worker thread management, synchronization, and rescheduling logic.
The TaskSchedulerService uses this pattern to guarantee that there is only one instance of the scheduler in the application, which is essential for centralizing control over all scheduled tasks and shared resources like the thread pool.
Defines a Task interface that represents a unit of executable work.
1class Task(ABC):
2 @abstractmethod
3 def execute(self):
4 pass
5
6class PrintMessageTask(Task):
7 def __init__(self, message: str):
8 self.message = message
9
10 def execute(self):
11 print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Executing PrintMessageTask: {self.message}")
12
13class DataBackupTask(Task):
14 def __init__(self, source: str, destination: str):
15 self.source = source
16 self.destination = destination
17
18 def execute(self):
19 print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Executing DataBackupTask: Backing up from {self.source} to {self.destination}...")
20 print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] DataBackupTask: Backup complete.")Each task must implement the execute() method. Two example implementations are provided:
PrintMessageTask: prints a message to the console.DataBackupTask: simulates a file backup operation.These classes demonstrate how tasks can encapsulate custom business logic.
Represents a task scheduled for future execution. It wraps a Task instance with metadata like a unique ID, scheduling strategy, next and last execution times.
1class ScheduledTask:
2 def __init__(self, task: Task, strategy: 'SchedulingStrategy'):
3 self.id = str(uuid.uuid4())
4 self.task = task
5 self.strategy = strategy
6 self.next_execution_time = None
7 self.last_execution_time = None
8 self.update_next_execution_time()
9
10 def update_next_execution_time(self):
11 next_time = self.strategy.get_next_execution_time(self.last_execution_time)
12 self.next_execution_time = next_time
13
14 def update_last_execution_time(self):
15 self.last_execution_time = self.next_execution_time
16
17 def __lt__(self, other):
18 return self.next_execution_time < other.next_execution_time
19
20 def has_more_executions(self):
21 return self.next_execution_time is not None
22
23 # GettersThe class is responsible for:
Defines the strategy interface for calculating the next execution time of a task.
1class SchedulingStrategy(ABC):
2 @abstractmethod
3 def get_next_execution_time(self, last_execution_time: Optional[datetime]) -> Optional[datetime]:
4 pass
5
6class OneTimeSchedulingStrategy(SchedulingStrategy):
7 def __init__(self, execution_time: datetime):
8 self.execution_time = execution_time
9
10 def get_next_execution_time(self, last_execution_time: Optional[datetime]) -> Optional[datetime]:
11 return self.execution_time if last_execution_time is None else None
12
13class RecurringSchedulingStrategy(SchedulingStrategy):
14 def __init__(self, interval: timedelta):
15 self.interval = interval
16
17 def get_next_execution_time(self, last_execution_time: Optional[datetime]) -> Optional[datetime]:
18 base_time = datetime.now() if last_execution_time is None else last_execution_time
19 return base_time + self.intervalTwo implementations are included:
OneTimeSchedulingStrategy: Schedules a task once at a specific LocalDateTime.RecurringSchedulingStrategy: Schedules a task at regular intervals using a Duration.This abstraction allows the system to support additional scheduling logic (e.g., CRON) in the future without changing the core scheduling service.
Introduces an observer interface to track the lifecycle of task execution.
1class TaskExecutionObserver(ABC):
2 @abstractmethod
3 def on_task_started(self, task: ScheduledTask):
4 pass
5
6 @abstractmethod
7 def on_task_completed(self, task: ScheduledTask):
8 pass
9
10 @abstractmethod
11 def on_task_failed(self, task: ScheduledTask, exception: Exception):
12 pass
13
14class LoggingObserver(TaskExecutionObserver):
15 def on_task_started(self, task: ScheduledTask):
16 print(f"[LOG - {datetime.now().strftime('%H:%M:%S.%f')[:-3]}] [{threading.current_thread().name}] Task {task.get_id()} started.")
17
18 def on_task_completed(self, task: ScheduledTask):
19 print(f"[LOG - {datetime.now().strftime('%H:%M:%S.%f')[:-3]}] [{threading.current_thread().name}] Task {task.get_id()} completed successfully.")
20
21 def on_task_failed(self, task: ScheduledTask, exception: Exception):
22 print(f"[LOG - {datetime.now().strftime('%H:%M:%S.%f')[:-3]}] [{threading.current_thread().name}] Task {task.get_id()} failed: {str(exception)}")
23It has three callbacks:
onTaskStartedonTaskCompletedonTaskFailedLoggingObserver is a concrete implementation that logs task events to the console, but other observers (e.g., metrics, alerting) can be plugged in easily.
The core of the system — a thread-safe singleton class that schedules and manages tasks.
1class TaskSchedulerService:
2 _instance = None
3 _lock = threading.Lock()
4
5 def __new__(cls):
6 if cls._instance is None:
7 with cls._lock:
8 if cls._instance is None:
9 cls._instance = super(TaskSchedulerService, cls).__new__(cls)
10 cls._instance._initialized = False
11 return cls._instance
12
13 def __init__(self):
14 if not hasattr(self, '_initialized') or not self._initialized:
15 self.task_queue = queue.PriorityQueue()
16 self.observers = []
17 self.workers = []
18 self.running = True
19 self._initialized = True
20
21 @staticmethod
22 def get_instance():
23 return TaskSchedulerService()
24
25 def initialize(self, worker_count: int):
26 if worker_count <= 0:
27 raise ValueError("Worker count must be >= 1")
28 self.workers = []
29 self.start_workers(worker_count)
30
31 def schedule(self, task: Task, strategy: SchedulingStrategy) -> str:
32 scheduled_task = ScheduledTask(task, strategy)
33 self.task_queue.put(scheduled_task)
34 return scheduled_task.get_id()
35
36 def start_workers(self, worker_count: int):
37 for i in range(worker_count):
38 worker = threading.Thread(target=self.run_worker, name=f"WorkerThread-{i}", daemon=True)
39 self.workers.append(worker)
40 worker.start()
41
42 def run_worker(self):
43 while self.running:
44 try:
45 task = self.task_queue.get(timeout=1)
46 now = datetime.now()
47 wait_time = 0
48
49 if task.get_next_execution_time() > now:
50 wait_time = (task.get_next_execution_time() - now).total_seconds()
51
52 if wait_time > 0:
53 time.sleep(wait_time)
54
55 # Check if a higher-priority task has arrived while we were sleeping
56 try:
57 head = self.task_queue.get_nowait()
58 if head < task:
59 self.task_queue.put(task)
60 self.task_queue.put(head)
61 continue
62 else:
63 self.task_queue.put(head)
64 except queue.Empty:
65 pass
66
67 self.execute(task)
68 self.task_queue.task_done()
69 except queue.Empty:
70 continue
71 except Exception as e:
72 print(f"Worker thread error: {e}")
73 break
74
75 print(f"{threading.current_thread().name} stopped.")
76
77 def execute(self, task: ScheduledTask):
78 for observer in self.observers:
79 observer.on_task_started(task)
80
81 try:
82 task.get_task().execute()
83 task.update_last_execution_time()
84 for observer in self.observers:
85 observer.on_task_completed(task)
86 except Exception as e:
87 for observer in self.observers:
88 observer.on_task_failed(task, e)
89 print(f"Task {task.get_id()} failed with error: {str(e)}")
90 finally:
91 task.update_next_execution_time()
92
93 if task.has_more_executions():
94 self.task_queue.put(task)
95 else:
96 print(f"Task {task.get_id()} has no more executions and will not be rescheduled.")
97
98 def shutdown(self):
99 self.running = False
100 for worker in self.workers:
101 worker.join(timeout=1)
102 print("Scheduler shut down.")
103
104 def add_observer(self, observer: TaskExecutionObserver):
105 self.observers.append(observer)It:
Demonstrates how to use the task scheduler in a real scenario.
1def main():
2 # 1. Setup the facade and observers
3 scheduler = TaskSchedulerService.get_instance()
4 scheduler.add_observer(LoggingObserver())
5
6 # 2. Initialize the scheduler
7 scheduler.initialize(10)
8
9 # 3. Define tasks and strategies
10 # Scenario 1: One-time task, 1 second from now
11 one_time_task = PrintMessageTask("This is a one-time task.")
12 one_time_strategy = OneTimeSchedulingStrategy(datetime.now() + timedelta(seconds=1))
13
14 # Scenario 2: Recurring task, every 2 seconds
15 recurring_task = PrintMessageTask("This is a recurring task.")
16 recurring_strategy = RecurringSchedulingStrategy(timedelta(seconds=2))
17
18 # Scenario 3: A long-running backup task, scheduled to run in 3 seconds
19 backup_task = DataBackupTask("/data/source", "/data/backup")
20 long_running_recurring_strategy = OneTimeSchedulingStrategy(datetime.now() + timedelta(seconds=3))
21
22 # 4. Schedule the tasks using the facade
23 print("Scheduling tasks...")
24 scheduler.schedule(one_time_task, one_time_strategy)
25 scheduler.schedule(recurring_task, recurring_strategy)
26 scheduler.schedule(backup_task, long_running_recurring_strategy)
27
28 # 5. Let the demo run for a while
29 print("Scheduler is running. Waiting for tasks to execute... (Demo will run for 6 seconds)")
30 time.sleep(6)
31
32 # 6. Shutdown the scheduler
33 scheduler.shutdown()
34
35if __name__ == "__main__":
36 main()Which entity is responsible for managing task scheduling and execution in a Task Scheduler system?
No comments yet. Be the first to comment!