We are currently exploring on the AMQP Queues on Zato 3 for our application which uses Odoo for the backend. Right now with Zato 2.0.7, the queue gets emptied pretty fast pushing all the items into the consumer memory, so that we are forced to do this process in batches (so that the application won’t crash with the huge amount of data) and we are also losing data in case of any server restarts. We figured out that RabbitMQ has the facility to set a prefetch count in order to limit the data being pushed into the consumer memory and thus prevent the queue from getting emptied all at once. We are looking into it using Zato 3 right now.
After some exploration, I have succeeded in setting the prefetch count on the channel from the Zato dequeue service end (as I didn’t find any UI configuration option neither for the channel nor for the queue), as below.
self.request.amqp.msg.channel.basic_qos(prefetch_size=0, prefetch_count=1, a_global=False)
as you can see I am invoking the functionality on the kombu channel object.
But this would be set for every queue item processed, which doesn’t make sense. So I tweaked the Zato code to set the prefetch count at the consumer level, in this file.
def _get_consumer(self, _no_ack=no_ack, _gevent_sleep=sleep): """ Creates a new connection and consumer to an AMQP broker. """ <some code> consumer.qos(prefetch_size=0, prefetch_count=20, apply_global=False) consumer.consume()
This works fine but tweaking the source code obviously is not the way to go, and it also doesn’t have any flexibility to configure differently depending on the queues/consumers. Is there any way to set the prefetch count for a channel or can this be made as a configuration on the channel level from UI (channel definition), so that all the consumers fetching data from a particular queue will adhere to that prefetch value?