Understanding the pub/sub system


#1

Hello everybody!

I am testing the pub/sub system included in Zato.

I have a service genesisng.services.availability.Confirm that creates a reservation. The service, hot-deployed and working fine, at the end of a successful execution, publishes a message to a topic named /genesisng/bookings/new:

# Publish a message to ``/genesisng/bookings/new`` topic name.
topic_name = '/genesisng/bookings/new'
data = 'id:%s' % booking['response'].id
priority = self.user_config.genesisng.availability.pubsub_priority
msg_id = self.pubsub.publish(topic_name, data=data, priority=priority)
self.logger.info('Added message with id %s to topic %s for booking with id %s' % (msg_id, topic_name, booking['response'].id))

The topic name /genesisng/bookings/new was created using the web admin menu option Pub/sub > Topics with default values.

This part is working fine and I can see the published messsages via the web admin option Pub/sub > Topics.

Now I am trying to code a subscriber named genesisng.pubsub.sender.Sender as a Zato service (using Python). This service will consume messages from the topic /genesisng/bookings/new and compose and send an email to the guest via SMTP on the localhost.

Questions:

  • Do I need to create an endpoint for the publisher and another endpoint for the subscriber? From the documentation, given that both are Python services executed inside Zato, I understand that it is not necessary (it seems to be using the zato.pubsub.default.internal.endpoint default value).

  • How do I deploy my subscriber, i.e. how do I tell Zato that service is a subscriber to the pub/sub system and has to be executed as such? I see that the web admin menu option Pub/sub > Subscriptions allows adding subscribers, but only through AMQP, REST and SOAP and require and endpoint. And, given that my subscriber service is made in Python as a Zato service, I understand it should not be necessary to either create an endpoint nor a subscriber (that’s for external subscribers). Moreover, endpoints are of types AMQP, REST, SOAP and WebSockets, which is not my case.

This is my sender so far:

# -*- coding: utf-8 -*-
from __future__ import absolute_import, division
from __future__ import print_function, unicode_literals
from zato.server.service import Service


class Sender(Service):
    def handle(self):
        self.logger.info('Executing sender subscriber service')
        topic_name = '/genesisng/bookings/new'
        sub_key = self.pubsub.subscribe(topic_name, service=self)
        self.logger.info('Sender service subscribed to topic name %s and received subscription key %s',
                         topic_name, sub_key)

        # Get available messages
        messages = self.pubsub.get_messages(topic_name, sub_key)
        self.logger.info('Messages received from topic name %s are %s',
                         topic_name, messages)

        # Compose a new reservation email and send it to the guest
        self.logger.info('Sending email to guest')

I am afraid that I am a bit overwhelmed by all the pub/sub and broker documentation because it aims at a very complex system which I do not require, and most probably the answer is very simple and located somewhere in such documentation (I must have missed it).

Any help will be appreciated. I’ll keep re-reading it meanwhile.

Thanks.


#2

Hello @jsabater,

your understanding is correct and there is only one thing that I can add.

Currently, it is not possible for a service to be a subscriber. This will be surely added, and possibly quite soon too.

When it is, the syntax will be akin to:

class MyService(Service):
    subscribe_to = ['topic1', 'topic2', 'topic3']

Likely with more options for cases when topic names are not known yet during deployment, but the point being that it will not be required to use web-admin to subscribe services to topics.

In your case, the only way would be to have a REST endpoint and Zato publish messages to itself via REST. This is doable but I am providing it as a way to accomplish what you need before the aforementioned works are done.


#3

Hi, @dsuch!

In this pub/sub REST scenario you are mentioning, would things be as follows?

  • A Zato service publishes to a topic name (just as described in the opening post of this thread).
  • A daemon, external to Zato, would have to be a subscriber to that Zato topic name using a REST endpoint.

Is this correct?

Nonetheless, I really don’t feel like creating my own daemons right now for this prototype, so I’ll have to put this on hold. Incidentally, when do you think a Zato service will be able to be a subscriber? 3.1 or beyond?

Thanks.


#4

I meant that you could create a new REST endpoint pointing to your own REST channel, without an external process.

As for when it will be ready - in time for 3.1 for sure but I hope it can be done earlier. I just do not have a timeline for it yet.


#5

Okay, so I would have to call the service via curl in a cron job or similar, and everytime it would be called, it would execute self.pubsub.get_messages(), process them and finish.

Ah, that is very good news! No worries, there’s many other features I still have to try and add to the prototype. Cheers! :ok_hand:


#6

If I understand it correctly, you have service1 that publishes messages and service2 that should receive them and the latter will be able to in v. 3.1 but in 3.0 this is not possible.

What you can do is to create a new REST channel for service2. Then, create a new REST pub/sub endpoint whose underlying outgoing REST connection will point to that channel, such as:

Then, when service1 publishes a message, pub/sub will know that there is a REST subscriber for it so it will move service1’s message to that subscriber’s queue.

Next, a different part of pub/sub will deliver the message to that subscriber - as it happens, the recipient will be the very same Zato server with service2 accessible via localhost:17010 (or another address of your choice).

The end result is that service2 will get a message published by service1 and no external tools will have to be employed.

Once the other functionality is implemented, the one where services will be able to subscribe themselves and to become topic subscribers directly, the flow of information will be quite similar though there will be no overhead of HTTP and no need to define a pub/sub endpoint in web-admin either.


#7

Yes, the idea is that I have an application with a number of services (publishers) that, after doing whatever they have to, add a message to a queue so that another set of services (consumers) do additional tasks afterwards, asynchronously, or however you want to call it.

For example, in the tourism world, after a client has placed a reservation, the system will have to, among other things, compose a confirmation email to be sent to the client. That task will be made by a consumer after the main task (saving the reservation and returning the result to the controller so that a confirmation page can be rendered to the client in his/her browser).

This has nothing to do with a third set of services which are executed regularly via cron jobs (e.g. Zato’s scheduler), for example to prepare summaries of tasks for people that start their shift the next morning.

Of course this is all business logic and maybe not so much a thing for a Service Bus such as Zato, therefore the discussion :slight_smile:

Back to the topic, I think that the ability to just have a simple service which is subscribed to one or more topic names and is activated/executed/woken up whenever a new message enters one of such queues will be a very nice addition. In my opinion, one queue per service has always been enough, but as long as you have a way to tell which topic name brought you back to live then it’s fine.

I think I understand your explanation on how to set a similar scenario in version 3.0 by, metaphorically speaking, getting out of Zato via REST endpoint then coming back in via REST channel as well. I think it will work but the addition planned for version 3.1 will be, of course, a much nicer way of developing the system :+1:


#8

I understand the business logic and Zato can support it for sure.

Yes, this is what the functionality about to be implemented will allow to.

Also, you can already make use of pub/sub hooks - when you create a topic you can assign its hook service whose methods will be invoked at particular moments.

E.g. right before a message is published to a topic which lets you modify it or perhaps reject it altogether. Another example is to invoke a method right before a message is about to be published to a recipient which lets you, for instance, to transform it to another data format in which way you can have multiple subscribers each receiving their messages in the same or different formats.

But you are not limited to these scenarios - you can as well invoke other services or carry out any possible operations in pub/sub hooks.

Each subscriber gets messages from its own queue of messages pertaining to a particular topic.

That is, let’s say there is a topic named /genesisng/bookings/new. When a message is published to that topic it will be first saved to the topic’s underlying storage. Next, each subscriber will receive it in their own queue for that topic.

That is, one subscriber has one queue for each of the topics it is subscribed to.


#9

Brilliant approach. Thanks for the explanation.

Regarding the hooks, I’ve never had the need for a system such as the one you describe, but I see it makes a lot of sense in an enterprise service bus scenario, so congratulations for the good work :ok_hand:

P.S. I just noticed I did not finish the last sentence in my previous post, so I edited it :smiley: