(Migrated) ZATO and AMQP

(This message has been automatically imported from the retired mailing list)

Hi,

I am trying to consume messages from a RabbitMQ queue in a Zato service.
I can send messages into the queue but they are never picked up in Zato.
According to RabbitMQ they have been sent to the queue to which the Zato
channel is connected but not ack’ed (see screenshots). I can also see a
large amount of data sitting in the tcp buffers on the Zato->RabbitMQ
connection:

Proto Recv-Q Send-Q Local Address Foreign Address
State PID/Program name
tcp 0 0 0.0.0.0:15672 0.0.0.0:*
LISTEN 17140/beam.smp
tcp 0 0 0.0.0.0:55672 0.0.0.0:*
LISTEN 17140/beam.smp
tcp 0 713 10.8.0.1:15672 10.8.0.6:61086
ESTABLISHED 17140/beam.smp
tcp 813305 0 127.0.0.1:38917 127.0.0.1:5672
ESTABLISHED 31428/channel.py
tcp6 0 0 :::5672 :::*
LISTEN 17140/beam.smp
tcp6 0 386880 127.0.0.1:5672 127.0.0.1:38917
ESTABLISHED 17140/beam.smp

I have tested my setup with a standalone python script and this is able
to consume the messages.

I am new to both RabbitMQ and Zato so am now stuck after a day of
struggling - any suggestions as to what I could be doing wrong?

OS:

root@wsdb:~/dev/zato# uname -a
Linux wsdb 3.13.0-27-generic #50-Ubuntu SMP Thu May 15 18:06:16 UTC 2014
x86_64 x86_64 x86_64 GNU/Linux

Zato version:

zato@wsdb:~/esb$ zato --version
Zato 2.0.3.rev-281a9d29

RabbitMQ version:

RabbitMQ 3.2.4

Zato AMQP Definition:

 	Name	Host	Port	Virtual host	Username	Max frame size	Heartbeat 

interval
RABBITMQ 127.0.0.1 5672 / guest 131072 0 Change password Edit Delete

Zato AMQP Channel:

 	Name	Active	Definition	Queue	Consumer tag prefix	Service	 	
	TEST OUT	Yes	RABBITMQ	device.gps	ZATO	amqp-test.gps-service	Edit	Delete

Zato Server logs:

2015-06-01 18:53:24,705 - INFO - 31267:MainThread - zato.common.repo:22

  • Location [/opt/zato/esb/server1/config/repo] is not a Bazaar branch.
    Will turn it into one.
    2015-06-01 18:53:24,919 - INFO - 31267:Dummy-1 - gunicorn.main:22 -
    Starting gunicorn 18.0
    2015-06-01 18:53:24,923 - INFO - 31267:Dummy-1 - gunicorn.main:22 -
    Listening at: http://127.0.0.1:17010 (31267)
    2015-06-01 18:53:24,923 - INFO - 31267:Dummy-1 - gunicorn.main:22 -
    Using worker: gevent
    2015-06-01 18:53:24,929 - INFO - 31292:Dummy-1 - gunicorn.main:22 -
    Booting worker with pid: 31292
    2015-06-01 18:53:24,951 - INFO - 31293:Dummy-1 - gunicorn.main:22 -
    Booting worker with pid: 31293
    2015-06-01 18:53:53,491 - INFO - 31292:Dummy-1 - zato.server.odb:22 -
    Server id:[1], name:[server1] is now a connector server for cluster
    id:[1], name:[quickstart-286359]
    2015-06-01 18:53:53,494 - INFO - 31292:Dummy-1 -
    zato.server.base.parallel:22 - Initializing connectors
    2015-06-01 18:53:53,508 - INFO - 31292:Dummy-1 -
    zato.server.base.parallel:22 - No AMQP channels to start
    2015-06-01 18:53:53,514 - INFO - 31292:Dummy-1 -
    zato.server.base.parallel:22 - No AMQP outgoing connections to start
    2015-06-01 18:53:53,522 - INFO - 31292:Dummy-1 -
    zato.server.base.parallel:22 - No JMS WebSphere MQ channels to start
    2015-06-01 18:53:53,548 - INFO - 31292:Dummy-1 -
    zato.server.base.parallel:22 - No JMS WebSphere MQ outgoing connections
    to start
    2015-06-01 18:53:53,564 - INFO - 31292:Dummy-1 -
    zato.server.base.parallel:22 - No Zero MQ channels to start
    2015-06-01 18:53:53,583 - INFO - 31292:Dummy-3 - zato.broker.client:22 -
    Starting broker client, host:[localhost], port:[6379],
    name:[parallel-K06NS6VC0XDKE6S6Q3P6A0MMVF63],
    topics:[[’/zato/to-parallel/all’, ‘/zato/to-parallel/any’,
    ’/zato/to-singleton’]]
    2015-06-01 18:53:56,134 - INFO - 31292:Dummy-8 -
    zato.helpers.input-logger:784 - {u’impl_name’:
    u’zato.server.service.internal.helpers.InputLogger’, u’name’:
    u’zato.helpers.input-logger’, u’cid’: u’K061G2B4YRV0TV2CC0ATBAQXYQZJ’,
    u’invocation_time’: datetime.datetime(2015, 6, 1, 18, 53, 56, 132219),
    u’job_type’: None, u’data_format’: None, u’slow_threshold’: 99999,
    u’request.payload’: u’Sample payload for a startup service (first
    worker)’, u’wsgi_environ’: {u’zato.request_ctx.async_msg’:
    Bunch(action=u’101802’, channel=u’startup-service’,
    cid=u’K061G2B4YRV0TV2CC0ATBAQXYQZJ’, msg_type=u’0001’, payload=u’Sample
    payload for a startup service (first worker)’,
    service=u’zato.helpers.input-logger’), u’zato.request_ctx.fanout_cid’:
    None, u’zato.request_ctx.in_reply_to’: None,
    u’zato.request_ctx.parallel_exec_cid’: None}, u’environ’: {}, u’usage’:
    25, u’channel’: u’startup-service’}
    2015-06-01 18:53:59,671 - INFO - 31292:Dummy-12 -
    zato.helpers.input-logger:784 - {u’impl_name’:
    u’zato.server.service.internal.helpers.InputLogger’, u’name’:
    u’zato.helpers.input-logger’, u’cid’: u’K05XZXMR7Z7EV3M8TFN9HY3HBAA2’,
    u’invocation_time’: datetime.datetime(2015, 6, 1, 18, 53, 59, 668867),
    u’job_type’: None, u’data_format’: None, u’slow_threshold’: 99999,
    u’request.payload’: u’Sample payload for a startup service (first
    worker)’, u’wsgi_environ’: {u’zato.request_ctx.async_msg’:
    Bunch(action=u’101802’, channel=u’startup-service’,
    cid=u’K05XZXMR7Z7EV3M8TFN9HY3HBAA2’, msg_type=u’0001’, payload=u’Sample
    payload for a startup service (first worker)’,
    service=u’zato.helpers.input-logger’), u’zato.request_ctx.fanout_cid’:
    None, u’zato.request_ctx.in_reply_to’: None,
    u’zato.request_ctx.parallel_exec_cid’: None}, u’environ’: {}, u’usage’:
    26, u’channel’: u’startup-service’}
    2015-06-01 18:54:08,556 - INFO - 31293:Dummy-2 - zato.broker.client:22 -
    Starting broker client, host:[localhost], port:[6379],
    name:[parallel-K079C8Y6MA9JMXXAQ4VDG1TV6WDJ],
    topics:[[’/zato/to-parallel/all’, ‘/zato/to-parallel/any’]]
    2015-06-01 18:54:08,577 - INFO - 31292:Dummy-15 -
    zato.helpers.input-logger:784 - {u’impl_name’:
    u’zato.server.service.internal.helpers.InputLogger’, u’name’:
    u’zato.helpers.input-logger’, u’cid’: u’K04NQKNV5BSZ43RSX5GEZQR0N558’,
    u’invocation_time’: datetime.datetime(2015, 6, 1, 18, 54, 8, 572533),
    u’job_type’: None, u’data_format’: None, u’slow_threshold’: 99999,
    u’request.payload’: u’Sample payload for a startup service (any
    worker)’, u’wsgi_environ’: {u’zato.request_ctx.async_msg’:
    Bunch(action=u’101802’, channel=u’startup-service’,
    cid=u’K04NQKNV5BSZ43RSX5GEZQR0N558’, msg_type=u’0001’, payload=u’Sample
    payload for a startup service (any worker)’,
    service=u’zato.helpers.input-logger’), u’zato.request_ctx.fanout_cid’:
    None, u’zato.request_ctx.in_reply_to’: None,
    u’zato.request_ctx.parallel_exec_cid’: None}, u’environ’: {}, u’usage’:
    27, u’channel’: u’startup-service’}
    2015-06-01 18:54:10,381 - INFO - 31292:Dummy-16 -
    zato.helpers.input-logger:784 - {u’impl_name’:
    u’zato.server.service.internal.helpers.InputLogger’, u’name’:
    u’zato.helpers.input-logger’, u’cid’: u’K0694VZKTTND05QJ6CRXBQTMMY7W’,
    u’invocation_time’: datetime.datetime(2015, 6, 1, 18, 54, 10, 379125),
    u’job_type’: None, u’data_format’: None, u’slow_threshold’: 99999,
    u’request.payload’: u’Sample payload for a startup service (any
    worker)’, u’wsgi_environ’: {u’zato.request_ctx.async_msg’:
    Bunch(action=u’101802’, channel=u’startup-service’,
    cid=u’K0694VZKTTND05QJ6CRXBQTMMY7W’, msg_type=u’0001’, payload=u’Sample
    payload for a startup service (any worker)’,
    service=u’zato.helpers.input-logger’), u’zato.request_ctx.fanout_cid’:
    None, u’zato.request_ctx.in_reply_to’: None,
    u’zato.request_ctx.parallel_exec_cid’: None}, u’environ’: {}, u’usage’:
    28, u’channel’: u’startup-service’}
    2015-06-01 18:55:31,747 - INFO - 31293:Dummy-13 -
    zato.hot-deploy.create:22 - Creating tar archive
    2015-06-01 18:55:31,748 - INFO - 31293:Dummy-13 -
    zato.hot-deploy.create:22 - Creating tar archive
    2015-06-01 18:55:31,821 - INFO - 31293:Dummy-13 -
    zato.hot-deploy.create:22 - Uploaded package id:[1],
    payload_name:[amqp_test.py]
    2015-06-01 18:55:32,813 - INFO - 31292:Dummy-35 -
    zato.hot-deploy.create:22 - Uploaded package id:[1],
    payload_name:[amqp_test.py]
    2015-06-01 18:56:24,181 - INFO -
    31428:amqp-consuming-connector-K07YS9R5GB1EGVM1K7PS5ZMYZC5R -
    zato.broker.thread_client:22 - Starting broker client, host:[localhost],
    port:[6379],
    name:[amqp-consuming-connector-K07YS9R5GB1EGVM1K7PS5ZMYZC5R],
    topics:[[’/zato/connector/amqp/all’, ‘/zato/connector/amqp/consuming/all’]]

Zato Connection log:

2015-06-01 18:56:24,672 - INFO - 31428:Thread-4 - zato_connector:22 -
Started an AMQP consumer for [127.0.0.1:5672/ (TEST OUT)], queue
[device.gps], tag
[ZATO:127.0.0.1:localhost:31428:16255109833582329472000000000000000000000]

On 01/06/15 21:12, MJS wrote:

I am new to both RabbitMQ and Zato so am now stuck after a day of
struggling - any suggestions as to what I could be doing wrong?

Hi Michael,

I cannot reproduce it on the same version of Zato and RabbitMQ on Ubuntu

$ uname -a
Linux box 3.16.0-38-generic #52~14.04.1-Ubuntu SMP Fri May 8 09:43:57
UTC 2015 x86_64 x86_64 x86_64 GNU/Linux

After sending a test message from Rabbit’s management GUI at
http://localhost:15672 to the zato.helpers.input-logger service I can
see the following log output.

Can you please send in a self-contained program which publishes messages
that Zato cannot pick up?

2015-06-01 21:31:39,492 - INFO - 20359:Dummy-16 -
zato.helpers.input-logger:784 - {u’impl_name’:
u’zato.server.service.internal.helpers.InputLogger’, u’name’:
u’zato.helpers.input-logger’, u’cid’: u’K07ZE9T23MWY7R9TQ63DV58W8EFG’,
u’invocation_time’: datetime.datetime(2015, 6, 1, 19, 31, 39, 492217),
u’job_type’: None, u’data_format’: u’’, u’slow_threshold’: 99999,
u’request.payload’: u’Testing AMQP’, u’wsgi_environ’:
{u’zato.request_ctx.async_msg’: Bunch(action=u’101003’,
cid=u’K07ZE9T23MWY7R9TQ63DV58W8EFG’, data_format=u’’, msg_type=u’0001’,
payload=u’Testing AMQP’, service=u’zato.helpers.input-logger’),
u’zato.request_ctx.fanout_cid’: None, u’zato.request_ctx.in_reply_to’:
None, u’zato.request_ctx.parallel_exec_cid’: None}, u’environ’: {},
u’usage’: 106, u’channel’: u’amqp’}

Hi,

I have configured a scheduled job on my ZATO instance to run once every
15 minutes.

The job is a cron-style job with a cron definition of ‘0/15 * * * * *’.

The first few times the job works as expected executing a single
instance of the defined service. After a short while the scheduler
starts executing the scheduled service twice.

The following output from my scheduler.log shows this:

2015-08-17 19:30:00,002 - DEBUG - 17268:Dummy-2410 - zato_scheduler:22 -
Executing wsme.ssm.poll-channels, {u'name': u'wsme.ssm.poll-channels', u'current_run': 2, u'cid': u'K0702QVFXFGVCVYW48ABCQQFZAWS', u'start_time': '2015-08-17T19:15:00', u'cron_definition': None, u'max_repeats': None, u'cb_kwargs': {u'service': u'ssm-batch-channel.request-scan', u'extra': u''}, u'max_repeats_reached': False, u'type': u'cron_style', u'id': 11L}
2015-08-17 19:30:00,003 - DEBUG - 17268:Dummy-2411 - zato_scheduler:22 -
Executing wsme.ssm.poll-channels, {u'name': u'wsme.ssm.poll-channels', u'current_run': 18, u'cid': u'K06ZKWDB5NPB8KPMMFF3R0AQZTJV', u'start_time': '2015-08-17T16:45:00', u'cron_definition': u'*/15 * * * *', u'max_repeats': None, u'cb_kwargs': {u'service': u'ssm-batch-channel.request-scan', u'extra': ''}, u'max_repeats_reached': False, u'type': u'cron_style', u'id': 11}
2015-08-17 19:30:00,005 - INFO - 17268:Dummy-2410 - zato_scheduler:22 -
Job executed wsme.ssm.poll-channels, {u'name': u'wsme.ssm.poll-channels', u'current_run': 2, u'cid': u'K0702QVFXFGVCVYW48ABCQQFZAWS', u'start_time': '2015-08-17T19:15:00', u'cron_definition': None, u'max_repeats': None, u'cb_kwargs': {u'service': u'ssm-batch-channel.request-scan', u'extra': u''}, u'max_repeats_reached': False, u'type': u'cron_style', u'id': 11L}
2015-08-17 19:30:00,006 - INFO - 17268:Dummy-2411 - zato_scheduler:22 -
Job executed wsme.ssm.poll-channels, {u'name': u'wsme.ssm.poll-channels', u'current_run': 18, u'cid': u'K06ZKWDB5NPB8KPMMFF3R0AQZTJV', u'start_time': '2015-08-17T16:45:00', u'cron_definition': u'*/15 * * * *', u'max_repeats': None, u'cb_kwargs': {u'service': u'ssm-batch-channel.request-scan', u'extra': ''}, u'max_repeats_reached': False, u'type': u'cron_style', u'id': 11}

Does anyone have any suggestions on what could be causing this?

Regards

What version are you using ?

It happens on 2.0.3, but was supposedly fixed on 2.0.5 (I haven’t had a
chance to upgrade yet ;-(

T

On 18 August 2015 at 03:35, Michael Sweeney michael@sweeney.co.za wrote:

Hi,

I have configured a scheduled job on my ZATO instance to run once every 15
minutes.

The job is a cron-style job with a cron definition of ‘0/15 * * * * *’.

The first few times the job works as expected executing a single instance
of the defined service. After a short while the scheduler starts executing
the scheduled service twice.

The following output from my scheduler.log shows this:

2015-08-17 19:30:00,002 - DEBUG - 17268:Dummy-2410 - zato_scheduler:22 -
Executing wsme.ssm.poll-channels, {u'name': u'wsme.ssm.poll-channels', u'current_run': 2, u'cid': u'K0702QVFXFGVCVYW48ABCQQFZAWS', u'start_time': '2015-08-17T19:15:00', u'cron_definition': None, u'max_repeats': None, u'cb_kwargs': {u'service': u'ssm-batch-channel.request-scan', u'extra': u''}, u'max_repeats_reached': False, u'type': u'cron_style', u'id': 11L}
2015-08-17 19:30:00,003 - DEBUG - 17268:Dummy-2411 - zato_scheduler:22 -
Executing wsme.ssm.poll-channels, {u'name': u'wsme.ssm.poll-channels', u'current_run': 18, u'cid': u'K06ZKWDB5NPB8KPMMFF3R0AQZTJV', u'start_time': '2015-08-17T16:45:00', u'cron_definition': u'*/15 * * * *', u'max_repeats': None, u'cb_kwargs': {u'service': u'ssm-batch-channel.request-scan', u'extra': ''}, u'max_repeats_reached': False, u'type': u'cron_style', u'id': 11}
2015-08-17 19:30:00,005 - INFO - 17268:Dummy-2410 - zato_scheduler:22 -
Job executed wsme.ssm.poll-channels, {u'name': u'wsme.ssm.poll-channels', u'current_run': 2, u'cid': u'K0702QVFXFGVCVYW48ABCQQFZAWS', u'start_time': '2015-08-17T19:15:00', u'cron_definition': None, u'max_repeats': None, u'cb_kwargs': {u'service': u'ssm-batch-channel.request-scan', u'extra': u''}, u'max_repeats_reached': False, u'type': u'cron_style', u'id': 11L}
2015-08-17 19:30:00,006 - INFO - 17268:Dummy-2411 - zato_scheduler:22 -
Job executed wsme.ssm.poll-channels, {u'name': u'wsme.ssm.poll-channels', u'current_run': 18, u'cid': u'K06ZKWDB5NPB8KPMMFF3R0AQZTJV', u'start_time': '2015-08-17T16:45:00', u'cron_definition': u'*/15 * * * *', u'max_repeats': None, u'cb_kwargs': {u'service': u'ssm-batch-channel.request-scan', u'extra': ''}, u'max_repeats_reached': False, u'type': u'cron_style', u'id': 11}

Does anyone have any suggestions on what could be causing this?

Regards