The Simplest Message Bus, Part 3 - Async Stuff

· 793 words · 4 minute read

Last time we set up simple synchronous messaging with an Actor object. This time we want to have that Actor running in another thread.


I've had a brain drizzle. In ASP.NET apps there is the concept of middleware. Basically middleware sits between an HTTP (or other) request and the endpoint that you've implemented. So we could implement "middleware" for our messaging bus by adding in "filter" =Actor=s where the bus deliberately routes messages through the filters before the messages are sent on to their destination.

Ok, back to our regularly scheduled program.


First lets swap the callback thing with proper queues. These are thread-safe, and should be a decent option here. Let's reason about how data flows. We'll send a message from the main method to the bus. The message should be stored in the appropriate message queue. Finally, our actor running in a different thread should check the queue and do something with the message.

As before, let's begin by looking at how the main function changes.

def main():
    bus = MessageBus()

    actor = Actor(bus)
    bus.add_subject(actor)
    bus.subscribe(actor, actor)

    job = threading.Thread(target=actor.run, args=[True])
    job.start()

    while True:
        bus.send_message(Message(subject=actor, message=input("> ")))

As before, we init a bus and an actor. In this case, the Actor takes the bus as a parameter. We also add a subject on the bus for the actor instance. We also need to subscribe the actor to the actor subject. Basically we're using the actor object as a subject for demonstration purposes. Think of this as "I have a name, actor, and I get mail addressed to actor."

Then, we use python's threading module to create a Thread that registers the Actor's run method.

We start the job and then wait on the user for input.

Now let's take a look at our Actor implementation.

class Actor(IProducer, IConsumer):
    bus: MessageBus

    def __init__(self, bus: MessageBus):
        self.bus = bus

    def send_message(self, message: Message) -> None:
        pass

    def receive_message(self) -> Message:
        return self.bus.get_message(self, self)

    def run(self, signal) -> None:
        while signal:
            logging.debug(f"received message {self.receive_message()}")
            time.sleep(5)

We now have an __init__ method, we removed the callback method, and now we have the run method. The run method spins, waiting on messages to come into the queue. There are a few method calls that happen as we drill from run -> receive_message -> get_message -> receive_message -> Queue.get. This is fine for this demonstration. In a future implementation, I'll add a boolean flag that can bubble up? Something. I may have to make the whole structure more shallow? I have no idea if that's worth it. I guess we'll find out at the end when I do some performance tests and see where the bottlenecks are.

The MessageBus has some minor changes.

class MessageBus:
    subjects: typing.Dict[object, Subject]

    def __init__(self):
        self.subjects = {}

    def send_message(self, message: Message):
        self.subjects[message.subject].post_message(message)

    def get_message(self, subject: object, subscriber: object) -> Message:
        return self.subjects[subject].receive_message(subscriber)

    def subscribe(self, subject: object, subscriber: object):
        self.add_subject(subject)
        self.subjects[subject].subscribe(subscriber)

    def add_subject(self, subject: object):
        if subject not in self.subjects:
            self.subjects[subject] = Subject(subject)

We've adapted the MessageBus to use queues (pronounced "keys"). It's structure remains basically the same.

We have a new object, called Subject that acts as a container for a "subject" and all its constituent subscribers.

class Subject:
    name: object
    # Each subscriber has their own queue.
    subscribers: typing.Dict[object, Queue]

    def __init__(self, name: object):
        self.subscribers = {}
        pass

    def subscribe(self, subscriber: object):
        self.subscribers[subscriber] = Queue()

    def post_message(self, message: Message):
        for subscriber, queue in self.subscribers.items():
            queue.put(message)

    def receive_message(self, subscriber: object):
        return self.subscribers[subscriber].get()

The Subject houses queue management. Each subscriber has its own queue for any given subject. This is probably not good for performance as we scale into the millions of Actor's. It'll do for now.

Now, when you run the sample code, we see that the actor is running in the background.

$ poetry run python async_message_bus.py
> lolwat
>   2524 Thread-1 (run) received message Message: subject: <__main__.Actor object at 0x7f61815dae10>, message: lolwat
frankly delimit things
>   9833 Thread-1 (run) received message Message: subject: <__main__.Actor object at 0x7f61815dae10>, message: frankly delimit things
other stuff
>  14833 Thread-1 (run) received message Message: subject: <__main__.Actor object at 0x7f61815dae10>, message: other stuff

Because I have logging set to stdout, the input and output is mangled together. You can get a sense of what is happening, though. A message is sent, it finds its way into the actor's message queue, the actor polls the queue every 5 seconds, and when a message arrives it prints the message to the log.

I think I'd like to look at reducing the number of layers happening? There's some redundant naming in Subject, MessageBus, and Actor. I might be able to collapse Subject into the message bus if I'm careful. That would reduce the total method calls for getting/setting by a layer.

You can find the code here.