So we have the simplest message bus from here, but now I'd like to actually do something interesting with it. I'd like to be able to send information from one process to another. We'll call a process that can produce and consume messages from the bus an "Agent". I think people also call these "Actors"? Maybe I should just call them actors, too. :D
Let's start with the main
function. The way we call things has changed a little:
def main():
bus = MessageBus()
actor = Actor()
bus.add_subject(actor)
while True:
bus.send_message(actor, input("> "))
We spin up a MessageBus
and an Actor
object. It seems convenient to be able to message an object by its reference. I may change my mind on this, or maybe we'll have a separate subject list of just objects that are subscribe? That way we could message all subscribed actors?
Being a C# dev by day means I like me some interfaces. In python we achieve the "interface effect" – where we hit an error during compile time – using abstract classes. If an actor can consume and produce messages, we need an interface for each.
class IProducer(ABC):
@abstractmethod
def send_message(self, message: Message):
pass
class IConsumer(ABC):
@abstractmethod
def receive_message(self, message: str):
pass
@abstractmethod
def callback(self, message: str):
pass
I've added the callback
method for purely naming purposes. Within the MessageBus
we reference this method. Let's look at the altered MessageBus
:
class MessageBus:
subjects: typing.Dict[object, list[callable]]
def __init__(self):
self.subjects = {}
def send_message(self, subject: object, message: str):
print(f"Sending message {message} to {subject}")
for callback in self.subjects[subject]:
callback(message)
def subscribe(self, subject: str, callback: callable):
self.add_subject(subject)
self.subjects[subject].append(callback)
@multimethod
def add_subject(self, subject: str):
if subject not in self.subjects:
self.subjects[subject] = []
@multimethod
def add_subject(self, subject: IConsumer):
if subject not in self.subjects:
self.subjects[subject] = [subject.callback]
From the top down: we add a print statement to the "send_message" for debugging. Note that we're still calling the callback
synchronously. The subscribe
method has not changed. Then we have a pair of "multimethods". In C#, we call this pattern "multiple dispatch" or "function overload". These aren't precisely the same thing, read here if you're curious. One of the multimethods handles a string subject, and one handles an obect
. In this case, we expect the object to conform to the IConsumer
interface, specifically it should have a callback
method. Maybe I'll change this method name later? Not sure.
Our Actor
class is pretty straight forward:
class Actor(IProducer, IConsumer):
def send_message(self, message: Message):
pass
def receive_message(self, message: str):
print(f"{self} received message {message}")
def callback(self, message: str):
self.receive_message(message)
We're leaving send_message
blank for now. I'll use this in the near future when we take on async Actor
execution. Our receive_message
method simply prints out whatever message was received. And note that our callback
method is simply a passthrough at this point, which is why I'm considering dropping the method entirely.
Now if we run this we get a prompt. Check it out:
$ poetry run python agent_message_bus.py
> hello
Sending message hello to <__main__.Actor object at 0x7fab20af8d50>
<__main__.Actor object at 0x7fab20af8d50> received message hello
> goodbye
Sending message goodbye to <__main__.Actor object at 0x7fab20af8d50>
<__main__.Actor object at 0x7fab20af8d50> received message goodbye
> this is a message
Sending message this is a message to <__main__.Actor object at 0x7fab20af8d50>
<__main__.Actor object at 0x7fab20af8d50> received message this is a message
>
Next, I think I want to look at getting multiple agents running in tandem.