The Simplest Message Bus 2, Agents

· 565 words · 3 minute read

So we have the simplest message bus from here, but now I'd like to actually do something interesting with it. I'd like to be able to send information from one process to another. We'll call a process that can produce and consume messages from the bus an "Agent". I think people also call these "Actors"? Maybe I should just call them actors, too. :D

Let's start with the main function. The way we call things has changed a little:

  def main():
      bus = MessageBus()

      actor = Actor()

      bus.add_subject(actor)

      while True:
          bus.send_message(actor, input("> "))

We spin up a MessageBus and an Actor object. It seems convenient to be able to message an object by its reference. I may change my mind on this, or maybe we'll have a separate subject list of just objects that are subscribe? That way we could message all subscribed actors?

Being a C# dev by day means I like me some interfaces. In python we achieve the "interface effect" – where we hit an error during compile time – using abstract classes. If an actor can consume and produce messages, we need an interface for each.

  class IProducer(ABC):

      @abstractmethod
      def send_message(self, message: Message):
          pass


  class IConsumer(ABC):

      @abstractmethod
      def receive_message(self, message: str):
          pass

      @abstractmethod
      def callback(self, message: str):
          pass

I've added the callback method for purely naming purposes. Within the MessageBus we reference this method. Let's look at the altered MessageBus:

class MessageBus:
    subjects: typing.Dict[object, list[callable]]

    def __init__(self):
        self.subjects = {}

    def send_message(self, subject: object, message: str):
        print(f"Sending message {message} to {subject}")
        for callback in self.subjects[subject]:
            callback(message)

    def subscribe(self, subject: str, callback: callable):
        self.add_subject(subject)
        self.subjects[subject].append(callback)

    @multimethod
    def add_subject(self, subject: str):
        if subject not in self.subjects:
            self.subjects[subject] = []

    @multimethod
    def add_subject(self, subject: IConsumer):
        if subject not in self.subjects:
            self.subjects[subject] = [subject.callback]

From the top down: we add a print statement to the "send_message" for debugging. Note that we're still calling the callback synchronously. The subscribe method has not changed. Then we have a pair of "multimethods". In C#, we call this pattern "multiple dispatch" or "function overload". These aren't precisely the same thing, read here if you're curious. One of the multimethods handles a string subject, and one handles an obect. In this case, we expect the object to conform to the IConsumer interface, specifically it should have a callback method. Maybe I'll change this method name later? Not sure.

Our Actor class is pretty straight forward:

class Actor(IProducer, IConsumer):
    def send_message(self, message: Message):
        pass

    def receive_message(self, message: str):
        print(f"{self} received message {message}")

    def callback(self, message: str):
        self.receive_message(message)

We're leaving send_message blank for now. I'll use this in the near future when we take on async Actor execution. Our receive_message method simply prints out whatever message was received. And note that our callback method is simply a passthrough at this point, which is why I'm considering dropping the method entirely.

Now if we run this we get a prompt. Check it out:

$ poetry run python agent_message_bus.py
> hello
Sending message hello to <__main__.Actor object at 0x7fab20af8d50>
<__main__.Actor object at 0x7fab20af8d50> received message hello
> goodbye
Sending message goodbye to <__main__.Actor object at 0x7fab20af8d50>
<__main__.Actor object at 0x7fab20af8d50> received message goodbye
> this is a message
Sending message this is a message to <__main__.Actor object at 0x7fab20af8d50>
<__main__.Actor object at 0x7fab20af8d50> received message this is a message
>

Next, I think I want to look at getting multiple agents running in tandem.