Design a Task Scheduler

Last Updated: December 19, 2025

Ashish

Ashish Pratap Singh

hard

In this chapter, we will explore the low-level design of a Task Scheduler.

Let’s start by clarifying the requirements:

1. Clarifying Requirements

Before starting the design, it's important to ask thoughtful questions to uncover hidden assumptions and better define the scope of the system.

Here is an example of how a conversation between the candidate and the interviewer might unfold:

After gathering the details, we can summarize the key system requirements.

1.1 Functional Requirements

  • Support one-time task: The system must allow a user to schedule a task (a piece of code) to be executed once at a specific future time.
  • Support recurring tasks: The system must support scheduling a task that runs repeatedly at a fixed interval (e.g., every 5 seconds).
  • Schedule with a delay: The system must allow scheduling a task to run after an initial delay.
  • Execution: The scheduler must execute the tasks at their designated times. Allow concurrent execution of tasks using multiple worker threads

1.2 Non-Functional Requirements

  • Thread-Safety: The scheduler must be safe to use in a multi-threaded environment. Multiple threads might try to schedule tasks concurrently.
  • Efficiency: The scheduler should not burn CPU cycles while waiting for the next task. It should sleep or wait efficiently.
  • Robustness: A long-running task should not block the scheduler from executing other tasks.
  • Extensibility: The design should be open to adding new types of scheduling logic in the future (e.g., CRON-based schedules) without major refactoring.

2. Core Entities and Classes

Core entities are the fundamental building blocks of our system. We identify them by analyzing the functional requirements and highlighting the key nouns and responsibilities that naturally map to object-oriented abstractions such as classes, enums, or interfaces.

Let’s walk through the functional requirements and extract the relevant entities:

1. Users can schedule one-time and recurring tasks for future execution.

This suggests we need a Task interface to represent executable logic. Users will implement this interface for each type of task (e.g., PrintMessageTask, DataBackupTask).

To differentiate how and when tasks should be executed (once vs repeatedly), we introduce a SchedulingStrategy abstraction that calculates the next execution time based on logic like “run once” or “run every N seconds.”

2. The scheduler manages and executes scheduled tasks.

We need a ScheduledTask entity to encapsulate a task, its scheduling strategy, execution metadata (e.g., nextExecutionTime, lastExecutionTime), and a unique ID. This allows the scheduler to manage execution order and rescheduling.

3. Users should be notified when tasks start, succeed, or fail.

We introduce a TaskExecutionObserver interface with hooks for task lifecycle events (e.g., started, completed, failed). Implementations like LoggingObserver can observe and log events such as task start, completion, and failure.

4. The system needs a central component to manage the scheduling, queuing, and execution of all tasks.

This points to a main orchestrator class, the TaskSchedulerService. This service will be responsible for:

  1. Accepting new tasks and their strategies.
  2. Maintaining an internal, priority-ordered queue of ScheduledTask objects.
  3. Managing a pool of worker threads to execute tasks concurrently.
  4. Notifying all registered TaskExecutionObservers of lifecycle events.
  5. Handling the re-scheduling of recurring tasks.

These core entities define the key abstractions of the task scheduler and will guide the structure of our low-level design and class diagrams.

3. Designing Classes and Relationships

Once the core entities have been identified, the next step is to design the system's class structure. This involves defining the attributes and behaviors of each class, establishing relationships among them, applying relevant object-oriented design patterns, and visualizing the overall architecture using a class diagram.

Core Classes

Task

Represents a unit of work that can be scheduled and executed.

ScheduledTask

Acts as a wrapper or container that bundles a Task with its SchedulingStrategy and state.

Scheduled

This is the object that the scheduler's internal priority queue will store and manage. Its compareTo method is crucial for ensuring the queue is ordered by the next execution time.

Attributes:

  • id: A unique identifier (UUID) assigned to each scheduled task.
  • task (Task): The actual task logic to be executed, implementing the Task interface.
  • strategy (SchedulingStrategy): Defines how the next execution time is determined (one-time or recurring).
  • nextExecutionTime: The upcoming time at which the task is scheduled to run.
  • lastExecutionTime: The most recent time this task was executed (null if never executed).

Methods (excluding getters):

  • updateNextExecutionTime(): Uses the scheduling strategy to compute and update the task’s next run time.
  • updateLastExecutionTime(): Updates lastExecutionTime to the task’s most recent run.
  • hasMoreExecutions(): Returns true if the task still has future scheduled runs (e.g., recurring), otherwise false.
  • compareTo(ScheduledTask other): Compares tasks based on nextExecutionTime, enabling priority queue ordering.

Core Service

TaskSchedulerService

This is the central engine of the system. It acts as a Facade, managing a thread pool and a priority queue to execute tasks according to their schedules.

TaskSchedulerService

Attributes:

  • INSTANCE: Singleton instance to ensure only one scheduler exists in the application.
  • taskQueue (PriorityBlockingQueue): A thread-safe queue that stores ScheduledTask objects, ordered by execution time.
  • observers (List): A collection of TaskExecutionObserver instances that receive callbacks on task lifecycle events.
  • workers (Thread[]): The pool of worker threads responsible for executing scheduled tasks concurrently.
  • running (volatile boolean): A flag to indicate whether the scheduler is active or has been shut down.

Methods:

  • getInstance(): Returns the singleton instance of the scheduler.
  • initialize(int workerCount): Initializes the scheduler with a given number of worker threads and starts them.
  • schedule(Task task, SchedulingStrategy strategy): Adds a new task into the queue with its associated scheduling strategy.
  • startWorkers(): Creates and starts worker threads that will pull tasks from the queue.
  • runWorker(): The worker thread loop—fetches tasks, waits until their scheduled time, and executes them.
  • execute(ScheduledTask task): Runs the task, notifies observers, updates execution times, and re-queues if necessary.
  • addObserver(TaskExecutionObserver observer): Registers an observer for tracking task lifecycle events (start, complete, fail).
  • shutdown(): Gracefully stops the scheduler by interrupting worker threads and halting execution.

3.2 Key Design Patterns

Command Pattern

The Task interface and its implementations (PrintMessageTask, etc.) embody the Command Pattern.

Task

Each Task object encapsulates a request (an action to be performed) as an object. The TaskSchedulerService acts as the "invoker," which holds and executes these commands without needing to know the details of the specific action.

Strategy Pattern

The SchedulingStrategy interface and its concrete implementations (OneTimeSchedulingStrategyRecurringSchedulingStrategy) are a classic example of the Strategy Pattern.

SchedulingStrategy

This allows the algorithm for determining a task's schedule to be selected at runtime and encapsulated independently from the task itself.

Observer Pattern

The TaskExecutionObserver interface, along with the TaskSchedulerService as the "subject," forms the Observer Pattern.

Observer

This allows multiple, disparate objects (like a logger) to subscribe to and be notified of task lifecycle events without being tightly coupled to the scheduler's execution logic.

Producer-Consumer Pattern

The TaskSchedulerService and its worker threads implement this pattern. The schedule() method acts as the "producer," adding ScheduledTask objects to the shared PriorityBlockingQueue. The worker threads act as "consumers," taking tasks from the queue and processing them. The blocking queue handles the synchronization between them.

Facade Pattern

The TaskSchedulerService serves as a Facade. It provides a simple, high-level interface (schedule(), shutdown()) that hides the complex internal machinery of the priority queue, worker thread management, synchronization, and rescheduling logic.

Singleton Pattern

The TaskSchedulerService uses this pattern to guarantee that there is only one instance of the scheduler in the application, which is essential for centralizing control over all scheduled tasks and shared resources like the thread pool.

3.3 Full Class Diagram

Class Diagram

4. Implementation

4.1 Task

Defines a Task interface that represents a unit of executable work.

1class Task(ABC):
2    @abstractmethod
3    def execute(self):
4        pass
5
6class PrintMessageTask(Task):
7    def __init__(self, message: str):
8        self.message = message
9
10    def execute(self):
11        print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Executing PrintMessageTask: {self.message}")
12
13class DataBackupTask(Task):
14    def __init__(self, source: str, destination: str):
15        self.source = source
16        self.destination = destination
17
18    def execute(self):
19        print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Executing DataBackupTask: Backing up from {self.source} to {self.destination}...")
20        print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] DataBackupTask: Backup complete.")

Each task must implement the execute() method. Two example implementations are provided:

  • PrintMessageTask: prints a message to the console.
  • DataBackupTask: simulates a file backup operation.

These classes demonstrate how tasks can encapsulate custom business logic.

4.2 ScheduledTask

Represents a task scheduled for future execution. It wraps a Task instance with metadata like a unique ID, scheduling strategy, next and last execution times.

1class ScheduledTask:
2    def __init__(self, task: Task, strategy: 'SchedulingStrategy'):
3        self.id = str(uuid.uuid4())
4        self.task = task
5        self.strategy = strategy
6        self.next_execution_time = None
7        self.last_execution_time = None
8        self.update_next_execution_time()
9
10    def update_next_execution_time(self):
11        next_time = self.strategy.get_next_execution_time(self.last_execution_time)
12        self.next_execution_time = next_time
13
14    def update_last_execution_time(self):
15        self.last_execution_time = self.next_execution_time
16
17    def __lt__(self, other):
18        return self.next_execution_time < other.next_execution_time
19
20    def has_more_executions(self):
21        return self.next_execution_time is not None
22
23    # Getters

The class is responsible for:

  • Computing the next execution time using the provided strategy.
  • Tracking when it was last executed.
  • Determining if it should be re-executed (for recurring tasks).

4.3 SchedulingStrategy

Defines the strategy interface for calculating the next execution time of a task.

1class SchedulingStrategy(ABC):
2    @abstractmethod
3    def get_next_execution_time(self, last_execution_time: Optional[datetime]) -> Optional[datetime]:
4        pass
5
6class OneTimeSchedulingStrategy(SchedulingStrategy):
7    def __init__(self, execution_time: datetime):
8        self.execution_time = execution_time
9
10    def get_next_execution_time(self, last_execution_time: Optional[datetime]) -> Optional[datetime]:
11        return self.execution_time if last_execution_time is None else None
12
13class RecurringSchedulingStrategy(SchedulingStrategy):
14    def __init__(self, interval: timedelta):
15        self.interval = interval
16
17    def get_next_execution_time(self, last_execution_time: Optional[datetime]) -> Optional[datetime]:
18        base_time = datetime.now() if last_execution_time is None else last_execution_time
19        return base_time + self.interval

Two implementations are included:

  • OneTimeSchedulingStrategy: Schedules a task once at a specific LocalDateTime.
  • RecurringSchedulingStrategy: Schedules a task at regular intervals using a Duration.

This abstraction allows the system to support additional scheduling logic (e.g., CRON) in the future without changing the core scheduling service.

4.4 TaskExecutionObserver

Introduces an observer interface to track the lifecycle of task execution.

1class TaskExecutionObserver(ABC):
2    @abstractmethod
3    def on_task_started(self, task: ScheduledTask):
4        pass
5
6    @abstractmethod
7    def on_task_completed(self, task: ScheduledTask):
8        pass
9
10    @abstractmethod
11    def on_task_failed(self, task: ScheduledTask, exception: Exception):
12        pass
13        
14class LoggingObserver(TaskExecutionObserver):
15    def on_task_started(self, task: ScheduledTask):
16        print(f"[LOG - {datetime.now().strftime('%H:%M:%S.%f')[:-3]}] [{threading.current_thread().name}] Task {task.get_id()} started.")
17
18    def on_task_completed(self, task: ScheduledTask):
19        print(f"[LOG - {datetime.now().strftime('%H:%M:%S.%f')[:-3]}] [{threading.current_thread().name}] Task {task.get_id()} completed successfully.")
20
21    def on_task_failed(self, task: ScheduledTask, exception: Exception):
22        print(f"[LOG - {datetime.now().strftime('%H:%M:%S.%f')[:-3]}] [{threading.current_thread().name}] Task {task.get_id()} failed: {str(exception)}")
23

It has three callbacks:

  • onTaskStarted
  • onTaskCompleted
  • onTaskFailed

LoggingObserver is a concrete implementation that logs task events to the console, but other observers (e.g., metrics, alerting) can be plugged in easily.

4.5 TaskSchedulerService

The core of the system — a thread-safe singleton class that schedules and manages tasks.

1class TaskSchedulerService:
2    _instance = None
3    _lock = threading.Lock()
4
5    def __new__(cls):
6        if cls._instance is None:
7            with cls._lock:
8                if cls._instance is None:
9                    cls._instance = super(TaskSchedulerService, cls).__new__(cls)
10                    cls._instance._initialized = False
11        return cls._instance
12
13    def __init__(self):
14        if not hasattr(self, '_initialized') or not self._initialized:
15            self.task_queue = queue.PriorityQueue()
16            self.observers = []
17            self.workers = []
18            self.running = True
19            self._initialized = True
20
21    @staticmethod
22    def get_instance():
23        return TaskSchedulerService()
24
25    def initialize(self, worker_count: int):
26        if worker_count <= 0:
27            raise ValueError("Worker count must be >= 1")
28        self.workers = []
29        self.start_workers(worker_count)
30
31    def schedule(self, task: Task, strategy: SchedulingStrategy) -> str:
32        scheduled_task = ScheduledTask(task, strategy)
33        self.task_queue.put(scheduled_task)
34        return scheduled_task.get_id()
35
36    def start_workers(self, worker_count: int):
37        for i in range(worker_count):
38            worker = threading.Thread(target=self.run_worker, name=f"WorkerThread-{i}", daemon=True)
39            self.workers.append(worker)
40            worker.start()
41
42    def run_worker(self):
43        while self.running:
44            try:
45                task = self.task_queue.get(timeout=1)
46                now = datetime.now()
47                wait_time = 0
48
49                if task.get_next_execution_time() > now:
50                    wait_time = (task.get_next_execution_time() - now).total_seconds()
51
52                if wait_time > 0:
53                    time.sleep(wait_time)
54
55                # Check if a higher-priority task has arrived while we were sleeping
56                try:
57                    head = self.task_queue.get_nowait()
58                    if head < task:
59                        self.task_queue.put(task)
60                        self.task_queue.put(head)
61                        continue
62                    else:
63                        self.task_queue.put(head)
64                except queue.Empty:
65                    pass
66
67                self.execute(task)
68                self.task_queue.task_done()
69            except queue.Empty:
70                continue
71            except Exception as e:
72                print(f"Worker thread error: {e}")
73                break
74
75        print(f"{threading.current_thread().name} stopped.")
76
77    def execute(self, task: ScheduledTask):
78        for observer in self.observers:
79            observer.on_task_started(task)
80        
81        try:
82            task.get_task().execute()
83            task.update_last_execution_time()
84            for observer in self.observers:
85                observer.on_task_completed(task)
86        except Exception as e:
87            for observer in self.observers:
88                observer.on_task_failed(task, e)
89            print(f"Task {task.get_id()} failed with error: {str(e)}")
90        finally:
91            task.update_next_execution_time()
92            
93            if task.has_more_executions():
94                self.task_queue.put(task)
95            else:
96                print(f"Task {task.get_id()} has no more executions and will not be rescheduled.")
97
98    def shutdown(self):
99        self.running = False
100        for worker in self.workers:
101            worker.join(timeout=1)
102        print("Scheduler shut down.")
103
104    def add_observer(self, observer: TaskExecutionObserver):
105        self.observers.append(observer)

It:

  • Starts worker threads to execute tasks concurrently.
  • Handles task execution and re-queues recurring tasks.
  • Notifies registered observers on lifecycle events.

4.6 TaskSchedulerDemo

Demonstrates how to use the task scheduler in a real scenario.

1def main():
2    # 1. Setup the facade and observers
3    scheduler = TaskSchedulerService.get_instance()
4    scheduler.add_observer(LoggingObserver())
5
6    # 2. Initialize the scheduler
7    scheduler.initialize(10)
8
9    # 3. Define tasks and strategies
10    # Scenario 1: One-time task, 1 second from now
11    one_time_task = PrintMessageTask("This is a one-time task.")
12    one_time_strategy = OneTimeSchedulingStrategy(datetime.now() + timedelta(seconds=1))
13
14    # Scenario 2: Recurring task, every 2 seconds
15    recurring_task = PrintMessageTask("This is a recurring task.")
16    recurring_strategy = RecurringSchedulingStrategy(timedelta(seconds=2))
17
18    # Scenario 3: A long-running backup task, scheduled to run in 3 seconds
19    backup_task = DataBackupTask("/data/source", "/data/backup")
20    long_running_recurring_strategy = OneTimeSchedulingStrategy(datetime.now() + timedelta(seconds=3))
21
22    # 4. Schedule the tasks using the facade
23    print("Scheduling tasks...")
24    scheduler.schedule(one_time_task, one_time_strategy)
25    scheduler.schedule(recurring_task, recurring_strategy)
26    scheduler.schedule(backup_task, long_running_recurring_strategy)
27
28    # 5. Let the demo run for a while
29    print("Scheduler is running. Waiting for tasks to execute... (Demo will run for 6 seconds)")
30    time.sleep(6)
31
32    # 6. Shutdown the scheduler
33    scheduler.shutdown()
34
35if __name__ == "__main__":
36    main()
  • Initializes the scheduler with 10 threads.
  • Registers a logging observer.
  • Schedules a one-time task, a recurring task every 2 seconds, and a simulated backup task.

5. Run and Test

Files11
observers
strategies
task
scheduled_task.py
task_scheduler_demo.py
main
task_scheduler_service.py
task_scheduler_demo.pymain
Output

6. Quiz

Design Task Scheduler Quiz

1 / 19
Multiple Choice

Which entity is responsible for managing task scheduling and execution in a Task Scheduler system?

How helpful was this article?

Comments


0/2000

No comments yet. Be the first to comment!

Copilot extension content script