I came up with this awhile ago when I was playing with cooperative LLM agents. It is the simplest implementation of a message bus that I could think of. The architecture is very simple. Let's start with the following barebones python.
def main():
print("Hello, world.")
if __name__=="__main__":
main()
Let's begin by looking at how we interact with our bus. In the main()
function, add the following:
bus = MessageBus()
def callback(message: str):
print(f"Received message '{message}'")
Now we need to sort out the bus. Instead of queues or anything like that, I've opted to register a callback function that the message bus will call when a message is sent to a particular subject.
class MessageBus:
subjects: typing.Dict[str, list[callable]]
def __init__(self):
self.subjects = {}
def send_message(self, subject: str, message: str):
for callback in self.subjects[subject]:
callback(message)
def subscribe(self, subject: str, callback: callable):
if subject not in self.subjects:
self.subjects[subject] = [callback]
else:
self.subjects[subject].append(callback)
Now let's use our bus. Add the following to the end of the main function:
bus.subscribe("1", callback)
bus.send_message("1", "This is a message, 123")
Now run it and behold! We sent a message!
You can find the complete code here.
What's next? I'm thinking threading, the concept of "agents" that implement producer and consumer interfaces, queues to handle multi-thread data (python's Queue comes to mind), a scheduler for agents that don't need to run continuously, consumers that push stuff into the outside world (like email, text, or push notifications), producers that grab information from the outside world (maybe a headline scraper? or an LLM that summarizes the daily news from India?).