(Migrated) Adding AMQP Prefetch Count

(This message has been automatically imported from the retired mailing list)

Hello,

I need a way to limit the number of acknowledge messages from a queue, I =
have some long running services that are called for each message =
received, the problem is that I can only have a limited number of =
instances running at a time, so if the service receives 1k requests, it =
invokes the same amount to a service, as fast as it can and the service =
starts refusing connections, I could dead letter the messages and =
repeat, but this is not practical, what I need is a way to tell the =
consumer to process only a few number of requests, and when the service =
finishes, get another batch, until it finishes.

This can be achieved using the consumer prefetch =
(http://www.rabbitmq.com/consumer-prefetch.html), so I think the =
implementation should be kind of simple.

The pika method required to set the prefetch count is this =
http://pika.readthedocs.org/en/latest/_modules/pika/adapters/blocking_conn=
ection.html#BlockingChannel.basic_qos, and I think it should be placed =
on this file =
https://github.com/zatosource/zato/blob/master/code/zato-server/src/zato/s=
erver/connection/amqp/channel.py line 61, but we will also need to add a =
field so this can be set for each AMQP channel, if this is set to 0, it =
should work as it is right now.

I=92m running some tests to see if this doesn=92t brake anything, but I =
don=92t know if I should be aware of something in particular that this =
change may break.=

After checking zato code the pika change i thought is not going to work,
because zato handles the amqp channel asynchronously, so it if I put the
prefetch it will consume all the messages anyways.

I’ve been searching at the documentation on how can I limit the number of
times a service runs at any given time, and how can keep a queue of
requests, but I can’t find how can I achieve this.

Any pointers would be really appreciated.
On Sep 9, 2014 3:17 PM, “Ivan Villareal” ivaano@gmail.com wrote:

Hello,

I need a way to limit the number of acknowledge messages from a queue, I
have some long running services that are called for each message received=
,
the problem is that I can only have a limited number of instances running
at a time, so if the service receives 1k requests, it invokes the same
amount to a service, as fast as it can and the service starts refusing
connections, I could dead letter the messages and repeat, but this is not
practical, what I need is a way to tell the consumer to process only a fe=
w
number of requests, and when the service finishes, get another batch, unt=
il
it finishes.

This can be achieved using the consumer prefetch (
http://www.rabbitmq.com/consumer-prefetch.html), so I think the
implementation should be kind of simple.

The pika method required to set the prefetch count is this
http://pika.readthedocs.org/en/latest/_modules/pika/adapters/blocking_con=
nection.html#BlockingChannel.basic_qos,
and I think it should be placed on this file
https://github.com/zatosource/zato/blob/master/code/zato-server/src/zato/=
server/connection/amqp/channel.py line
61, but we will also need to add a field so this can be set for each AMQ=
P
channel, if this is set to 0, it should work as it is right now.

I=E2=80=99m running some tests to see if this doesn=E2=80=99t brake anyth=
ing, but I don=E2=80=99t
know if I should be aware of something in particular that this change may
break.

On 10/09/14 06:39, Ivan Villareal wrote:

Hi Ivan,

After checking zato code the pika change i thought is not going to work,
because zato handles the amqp channel asynchronously, so it if I put the
prefetch it will consume all the messages anyways.

Right, it handles them in asynchronously and I can see how it can get in
the way in your scenario.

I’ve been searching at the documentation on how can I limit the number
of times a service runs at any given time, and how can keep a queue of
requests, but I can’t find how can I achieve this.

This should be fairly easy to implement in git master version using
self.lock - this is a Redis-based distributed lock that services can
hold across all the servers.

Here is a thread regarding unique tasks employing self.lock

https://mailman-mail5.webfaction.com/pipermail/zato-discuss/2013-July/000057.html

Your use case is quite similar:

  • Fetch all the messages from AMQP and store them in Redis or other data
    store

  • Periodically run a service which uses self.lock

  • Within self.lock block check in Redis under a dedicated key if you
    have hit the limit of concurrent invocations

  • If yes, return

  • If not, take the last message from Redis/somehwere else and process it

Now the trick is that within self.lock use you can also use INCR/DECR
Redis commands to maintain a counter of invocations - this lets you set
how many instances of a service there should be running in parallel.

Also, self.lock accepts the name of a lock - this default to the service
using it but can set, say, to: self.lock(queue_name)

On 10/09/14 17:01, Ivan Villareal wrote:

Thank you for your response, the problem is that I’m using 1.1, Yesterday I did a build of master,
but the staging and prod servers are using 1.1, so I need to test a little bit more
before putting the master build on the cloud servers. I’ll give it a try at my local instance to see how it looks.

Hello Ivan,

it is easy to achieve it on 1.1 as well, please have a look at how
self.lock is actually implemented - it is mere 3 lines of code:

https://github.com/zatosource/zato/blob/master/code/zato-server/src/zato/server/service/init.py#L488

Essentially, it is a thin convenience wrapper around retools.lock.Lock
so in 1.1 you can simply import that class and obtain the locks manually
with ease.