Last time we set up simple synchronous messaging with an Actor
object. This time we want to have that Actor
running in another thread.
I've had a brain drizzle. In ASP.NET apps there is the concept of middleware. Basically middleware sits between an HTTP (or other) request and the endpoint that you've implemented. So we could implement "middleware" for our messaging bus by adding in "filter" =Actor=s where the bus deliberately routes messages through the filters before the messages are sent on to their destination.
Ok, back to our regularly scheduled program.
First lets swap the callback thing with proper queues. These are thread-safe, and should be a decent option here. Let's reason about how data flows. We'll send a message from the main method to the bus. The message should be stored in the appropriate message queue. Finally, our actor running in a different thread should check the queue and do something with the message.
As before, let's begin by looking at how the main function changes.
def main():
bus = MessageBus()
actor = Actor(bus)
bus.add_subject(actor)
bus.subscribe(actor, actor)
job = threading.Thread(target=actor.run, args=[True])
job.start()
while True:
bus.send_message(Message(subject=actor, message=input("> ")))
As before, we init a bus and an actor. In this case, the Actor
takes the bus as a parameter. We also add a subject on the bus for the actor
instance. We also need to subscribe the actor
to the actor
subject. Basically we're using the actor
object as a subject for demonstration purposes. Think of this as "I have a name, actor
, and I get mail addressed to actor
."
Then, we use python's threading
module to create a Thread
that registers the Actor's
run
method.
We start the job and then wait on the user for input.
Now let's take a look at our Actor
implementation.
class Actor(IProducer, IConsumer):
bus: MessageBus
def __init__(self, bus: MessageBus):
self.bus = bus
def send_message(self, message: Message) -> None:
pass
def receive_message(self) -> Message:
return self.bus.get_message(self, self)
def run(self, signal) -> None:
while signal:
logging.debug(f"received message {self.receive_message()}")
time.sleep(5)
We now have an __init__
method, we removed the callback
method, and now we have the run
method. The run
method spins, waiting on messages to come into the queue. There are a few method calls that happen as we drill from run -> receive_message -> get_message -> receive_message -> Queue.get
. This is fine for this demonstration. In a future implementation, I'll add a boolean flag that can bubble up? Something. I may have to make the whole structure more shallow? I have no idea if that's worth it. I guess we'll find out at the end when I do some performance tests and see where the bottlenecks are.
The MessageBus
has some minor changes.
class MessageBus:
subjects: typing.Dict[object, Subject]
def __init__(self):
self.subjects = {}
def send_message(self, message: Message):
self.subjects[message.subject].post_message(message)
def get_message(self, subject: object, subscriber: object) -> Message:
return self.subjects[subject].receive_message(subscriber)
def subscribe(self, subject: object, subscriber: object):
self.add_subject(subject)
self.subjects[subject].subscribe(subscriber)
def add_subject(self, subject: object):
if subject not in self.subjects:
self.subjects[subject] = Subject(subject)
We've adapted the MessageBus
to use queues (pronounced "keys"). It's structure remains basically the same.
We have a new object, called Subject
that acts as a container for a "subject" and all its constituent subscribers.
class Subject:
name: object
# Each subscriber has their own queue.
subscribers: typing.Dict[object, Queue]
def __init__(self, name: object):
self.subscribers = {}
pass
def subscribe(self, subscriber: object):
self.subscribers[subscriber] = Queue()
def post_message(self, message: Message):
for subscriber, queue in self.subscribers.items():
queue.put(message)
def receive_message(self, subscriber: object):
return self.subscribers[subscriber].get()
The Subject
houses queue management. Each subscriber has its own queue for any given subject. This is probably not good for performance as we scale into the millions of Actor
's. It'll do for now.
Now, when you run the sample code, we see that the actor is running in the background.
$ poetry run python async_message_bus.py
> lolwat
> 2524 Thread-1 (run) received message Message: subject: <__main__.Actor object at 0x7f61815dae10>, message: lolwat
frankly delimit things
> 9833 Thread-1 (run) received message Message: subject: <__main__.Actor object at 0x7f61815dae10>, message: frankly delimit things
other stuff
> 14833 Thread-1 (run) received message Message: subject: <__main__.Actor object at 0x7f61815dae10>, message: other stuff
Because I have logging set to stdout
, the input and output is mangled together. You can get a sense of what is happening, though. A message is sent, it finds its way into the actor
's message queue, the actor
polls the queue every 5 seconds, and when a message arrives it prints the message to the log.
I think I'd like to look at reducing the number of layers happening? There's some redundant naming in Subject
, MessageBus
, and Actor
. I might be able to collapse Subject
into the message bus if I'm careful. That would reduce the total method calls for getting/setting by a layer.
You can find the code here.