Design Pub Sub System

Ashish

Ashish Pratap Singh

medium

In this chapter, we will explore the low-level design of a simple in-memory pub-sub system.

Lets start by clarifying the requirements:

1. Clarifying Requirements

Before starting the design, it's important to ask thoughtful questions to uncover hidden assumptions and better define the scope of the system.

Here is an example of how a conversation between the candidate and the interviewer might unfold:

After gathering the details, we can summarize the key system requirements.

1.1 Functional Requirements

  • Support creation and management of multiple topics
  • Allow multiple publishers to publish messages to a topic
  • Allow multiple subscribers to subscribe to one or more topics
  • Deliver messages to all active subscribers of a topic in the order they were published
  • Ensure non-blocking, asynchronous message delivery
  • Follow a "fire-and-forget" delivery model: no retries, or acknowledgments

1.2 Non-Functional Requirements

  • Modularity: The system should follow object-oriented principles with clear separation of responsibilities
  • Scalability: The system should efficiently support many concurrent publishers and subscribers
  • Extensibility: The design should be flexible enough to support future enhancements such as message persistence, retries, or delivery guarantees
  • Reliability: While exact delivery guarantees are not required, message ordering and dispatching must remain consistent and predictable within each topic

After the requirements are clear, lets identify the core entities/objects we will have in our system.

2. Identifying Core Entities

Core entities are the fundamental building blocks of our system. We identify them by analyzing the functional requirements and highlighting the key nouns and responsibilities that naturally map to object-oriented abstractions such as classes, interfaces, or enums.

Let’s walk through the functional requirements and extract the relevant entities:

1. Support creation and management of multiple topics.

This suggests the need for a Topic entity, which acts as a named communication channel. Each topic maintains a list of subscribers and is responsible for dispatching published messages to them in order.

2. Multiple publishers can publish messages to a topic.

This introduces the need for a Message entity to represent each published unit of communication.

For this design, we can treat the client or demo class as the publisher. There is no need for a separate Publisher class unless extended functionality is required.

A central PubSubService entity will serve as the orchestrator of the system. It will handle topic creation, manage subscriptions, accept published messages and route them to appropriate topic for distribution.

3. Multiple subscribers can subscribe to one or more topics.

This indicates the need for a Subscriber entity, which registers interest in specific topics and receives messages from them.

These core entities define the key abstractions of a Pub-Sub system and will guide the structure of our low-level design and class diagrams.

3. Class Design

This section details the classes, their relationships, and the design patterns used to structure the Pub-Sub system.

3.1 Class Definitions

The system is built around a few core classes and interfaces that manage topics, messages, and subscribers.

Data Classes

Message

A simple, immutable Data Transfer Object (DTO) that represents the data being transmitted.

Message

It contains a String payload and an Instant timestamp. Its immutability makes it inherently thread-safe.

Core Classes

Subscriber (Interface)

Defines the contract for any class that wishes to receive messages.

Subscriber

It declares two methods: getId() to uniquely identify the subscriber and onMessage(Message message) which is the callback method invoked when a message is delivered.

AlertSubscriber & NewsSubscriber

These are concrete implementations of the Subscriber interface. They demonstrate how different types of subscribers can handle the same message in unique ways, promoting system flexibility.

Topic

Represents a distinct message channel.

Topic

It maintains a thread-safe set of Subscriber objects and is responsible for broadcasting messages to them. Message delivery is handled asynchronously using an ExecutorService to avoid blocking the publisher.

PubSubService

The central entry point and control hub for the entire system.

pubsubservice

It manages the lifecycle of topics and provides a clean API for clients to publish messages and manage subscriptions. It is implemented as a Singleton to ensure a single point of control.

3.2 Class Relationships

The relationships between classes define the system's architecture, ensuring loose coupling and high cohesion.

Composition

A strong "has-a" relationship where one object owns another.

  • PubSubService has Topics: The PubSubService creates and manages a Map<String, Topic>. The lifecycle of a Topic is controlled entirely by the PubSubService.
  • Topic has Subscribers: A Topic manages a Set<Subscriber> for its channel. It controls which subscribers are associated with it.

Association

A "has-a" relationship where objects are related but have independent lifecycles.

  • PubSubService has an ExecutorService: The service creates and owns a thread pool for message delivery.
  • Topic is associated with an ExecutorService: Each topic is given a reference to the shared ExecutorService from PubSubService to perform its broadcasting tasks.

Implementation

An "is-a" relationship based on an interface contract.

  • AlertSubscriber and NewsSubscriber are Subscribers. They both implement the Subscriber interface, providing concrete logic for the onMessage() method.

Dependency

A "uses-a" relationship where one class depends on another to perform its function.

  • The PubSubService, Topic, and Subscriber classes all depend on the Message class to publish, broadcast, and receive data, respectively.

3.3 Key Design Patterns

Several design patterns are employed to create a robust, scalable, and maintainable system.

Observer Pattern

This is the foundational pattern of the system.

  • Subject: The Topic class acts as the subject. It maintains a list of observers and notifies them of state changes (new messages).
  • Observer: The Subscriber interface acts as the observer. Concrete subscribers (NewsSubscriber, AlertSubscriber) register with a Topic to receive updates.
  • Mechanism: When PubSubService.publish() is called, the corresponding Topic's broadcast() method iterates through its Subscriber list and calls the onMessage() method on each one, decoupling the Topic from the concrete Subscriber implementations.

Strategy Pattern

The Subscriber interface can be viewed as a strategy interface. It defines an algorithm (onMessage). Concrete implementations (AlertSubscriber, NewsSubscriber) provide different strategies for how a message should be processed. The Topic is configured with a set of these strategies (its subscribers) and applies them when a message is broadcast.

Facade Pattern

The PubSubService serves as a facade. It provides a simplified, high-level interface (createTopic, subscribe, publish) to the client, hiding the more complex underlying components and interactions, such as topic creation, subscriber registration, and asynchronous message delivery.

Singleton Pattern

The PubSubService is implemented as a singleton. This ensures there is only one instance managing all topics and the shared thread pool, providing a single, globally accessible point of control for the entire pub-sub mechanism.

3.4 Full Class Diagram

Pub Sub System Class Diagram

4. Implementation

4.1 Message

This class is a simple, immutable Data Transfer Object (DTO) that represents the content being sent through the system.

1class Message:
2    def __init__(self, payload: str):
3        self.payload = payload
4        self.timestamp = datetime.now()
5
6    def get_payload(self) -> str:
7        return self.payload
8
9    def __str__(self) -> str:
10        return f"Message{{payload='{self.payload}'}}"

Each Message includes:

  • A payload: the actual message text.
  • A timestamp: marks when the message was created.

The payload and timestamp fields are marked as final, making Message objects immutable. 

4.2 Subscriber Interface and Implementations

The Subscriber interface defines the contract for any object that wishes to receive messages. This follows the Observer design pattern, allowing the system to be decoupled from the concrete classes that consume messages.

1class Subscriber(ABC):
2    @abstractmethod
3    def get_id(self) -> str:
4        pass
5
6    @abstractmethod
7    def on_message(self, message: Message):
8        pass
9
10class AlertSubscriber(Subscriber):
11    def __init__(self, subscriber_id: str):
12        self.id = subscriber_id
13
14    def get_id(self) -> str:
15        return self.id
16
17    def on_message(self, message: Message):
18        print(f"!!! [ALERT - {self.id}] : '{message.get_payload()}' !!!")
19
20class NewsSubscriber(Subscriber):
21    def __init__(self, subscriber_id: str):
22        self.id = subscriber_id
23
24    def get_id(self) -> str:
25        return self.id
26
27    def on_message(self, message: Message):
28        print(f"[Subscriber {self.id}] received message '{message.get_payload()}'")

The onMessage() method is called when a message is delivered.

Two different subscriber types simulate real-world use cases:

  • AlertSubscriber: Reacts to critical alerts.
  • NewsSubscriber: Receives general news.

Each implementation customizes the display behavior of received messages.

4.3 Topic

A Topic represents a distinct channel for messages. It maintains its own list of subscribers and is responsible for broadcasting messages to them.

1class Topic:
2    def __init__(self, name: str, delivery_executor: ThreadPoolExecutor):
3        self.name = name
4        self.delivery_executor = delivery_executor
5        self.subscribers: Set[Subscriber] = set()
6
7    def get_name(self) -> str:
8        return self.name
9
10    def add_subscriber(self, subscriber: Subscriber):
11        self.subscribers.add(subscriber)
12
13    def remove_subscriber(self, subscriber: Subscriber):
14        self.subscribers.discard(subscriber)
15
16    def broadcast(self, message: Message):
17        for subscriber in self.subscribers:
18            self.delivery_executor.submit(self._deliver_message, subscriber, message)
19
20    def _deliver_message(self, subscriber: Subscriber, message: Message):
21        try:
22            subscriber.on_message(message)
23        except Exception as e:
24            print(f"Error delivering message to subscriber {subscriber.get_id()}: {str(e)}")

Represents a channel to which messages are published and from which subscribers receive messages.

  • subscribers: A thread-safe set of subscribers.
  • broadcast(): Asynchronously delivers a message to all subscribers using a thread pool.

4.4 PubSubService (Singleton)

This is the main facade for the system. It manages the lifecycle of topics and provides a centralized API for clients to interact with the system.

1class PubSubService:
2    _instance = None
3    _lock = threading.Lock()
4
5    def __new__(cls):
6        if cls._instance is None:
7            with cls._lock:
8                if cls._instance is None:
9                    cls._instance = super().__new__(cls)
10                    cls._instance._initialized = False
11        return cls._instance
12
13    def __init__(self):
14        if self._initialized:
15            return
16        
17        self.topic_registry: Dict[str, Topic] = {}
18        # A cached thread pool is suitable for handling many short-lived, bursty tasks (message deliveries).
19        self.delivery_executor = ThreadPoolExecutor()
20        self._initialized = True
21
22    @classmethod
23    def get_instance(cls):
24        return cls()
25
26    def create_topic(self, topic_name: str):
27        if topic_name not in self.topic_registry:
28            self.topic_registry[topic_name] = Topic(topic_name, self.delivery_executor)
29        print(f"Topic {topic_name} created")
30
31    def subscribe(self, topic_name: str, subscriber: Subscriber):
32        topic = self.topic_registry.get(topic_name)
33        if topic is None:
34            raise ValueError(f"Topic not found: {topic_name}")
35        topic.add_subscriber(subscriber)
36        print(f"Subscriber '{subscriber.get_id()}' subscribed to topic: {topic_name}")
37
38    def unsubscribe(self, topic_name: str, subscriber: Subscriber):
39        topic = self.topic_registry.get(topic_name)
40        if topic is not None:
41            topic.remove_subscriber(subscriber)
42        print(f"Subscriber '{subscriber.get_id()}' unsubscribed from topic: {topic_name}")
43
44    def publish(self, topic_name: str, message: Message):
45        print(f"Publishing message to topic: {topic_name}")
46        topic = self.topic_registry.get(topic_name)
47        if topic is None:
48            raise ValueError(f"Topic not found: {topic_name}")
49        topic.broadcast(message)
50
51    def shutdown(self):
52        print("PubSubService shutting down...")
53        self.delivery_executor.shutdown(wait=True)
54        print("PubSubService shutdown complete.")
  • Singleton Pattern: The service is implemented as a singleton to ensure a single, central point of control for the pub/sub functionality within the application.
  • Maintains a registry of all topics.
  • Uses a shared executor to handle asynchronous message delivery.

Methods:

  • createTopic(): Creates a new topic if it doesn't already exist.
  • subscribe() / unsubscribe(): Manage subscriber enrollment.
  • publish(): Triggers broadcast of a message to all subscribers of a topic.

4.5 Demo: PubSubDemo

The LoggingFrameworkDemo class demonstrates how a client would use the PubSubService to create topics, manage subscriptions, and publish messages.

1class PubSubDemo:
2    @staticmethod  
3    def main():
4        pub_sub_service = PubSubService.get_instance()
5
6        # --- Create Subscribers ---
7        sports_fan1 = NewsSubscriber("SportsFan1")
8        sports_fan2 = NewsSubscriber("SportsFan2")
9        techie1 = NewsSubscriber("Techie1")
10        all_news_reader = NewsSubscriber("AllNewsReader")
11        system_admin = AlertSubscriber("SystemAdmin")
12
13        # --- Create Topics and Subscriptions ---
14        SPORTS_TOPIC = "SPORTS"
15        TECH_TOPIC = "TECH"
16        WEATHER_TOPIC = "WEATHER"
17
18        pub_sub_service.create_topic(SPORTS_TOPIC)
19        pub_sub_service.create_topic(TECH_TOPIC)
20        pub_sub_service.create_topic(WEATHER_TOPIC)
21
22        pub_sub_service.subscribe(SPORTS_TOPIC, sports_fan1)
23        pub_sub_service.subscribe(SPORTS_TOPIC, sports_fan2)
24        pub_sub_service.subscribe(SPORTS_TOPIC, all_news_reader)
25        pub_sub_service.subscribe(SPORTS_TOPIC, system_admin)
26
27        pub_sub_service.subscribe(TECH_TOPIC, techie1)
28        pub_sub_service.subscribe(TECH_TOPIC, all_news_reader)
29
30        print("\n--- Publishing Messages ---")
31
32        # --- Publish to SPORTS topic ---
33        pub_sub_service.publish(SPORTS_TOPIC, Message("Team A wins the championship!"))
34        # Expected: SportsFan1, SportsFan2, AllNewsReader, SystemAdmin receive this.
35
36        # --- Publish to TECH topic ---
37        pub_sub_service.publish(TECH_TOPIC, Message("New AI model released."))
38        # Expected: Techie1, AllNewsReader receive this.
39
40        # --- Publish to WEATHER topic (no subscribers) ---
41        pub_sub_service.publish(WEATHER_TOPIC, Message("Sunny with a high of 75°F."))
42        # Expected: Message is dropped.
43
44        # Allow some time for async messages to be processed
45        time.sleep(0.5)
46
47        print("\n--- Unsubscribing a user and re-publishing ---")
48
49        # SportsFan2 gets tired of sports news
50        pub_sub_service.unsubscribe(SPORTS_TOPIC, sports_fan2)
51
52        # Publish another message to SPORTS
53        pub_sub_service.publish(SPORTS_TOPIC, Message("Major player traded to Team B."))
54        # Expected: SportsFan1, AllNewsReader, SystemAdmin receive this. SportsFan2 does NOT.
55
56        # Give messages time to be delivered
57        time.sleep(0.5)
58
59        # --- Shutdown the service ---
60        pub_sub_service.shutdown()
61
62if __name__ == "__main__":
63    PubSubDemo.main()

5. Run and Test

Languages
Java
C#
Python
C++
Files7
entities
subscribers
pub_sub_demo.py
main
pub_sub_service.py
pub_sub_demo.py
Output

6. Quiz

Design Pub Sub System - Quiz

1 / 21
Multiple Choice

In a Pub-Sub system, which entity is responsible for managing subscribers and delivering messages to them?

How helpful was this article?

Comments (1)


0/2000
Sort by
The Cap17 days ago

Why we havent created a class for publishers? As only registered publishers should be allowed to publish message right

Copilot extension content script