(Migrated) AMQP Connection problem

(This message has been automatically imported from the retired mailing list)

Something is wrong with AMQP Connections,

I wrote a simple amqp service to publish a message it works fine for a =
few seconds sometimes minutes.

class ScreenshotPublisher(Service):
class SimpleIO:
input_required =3D (‘url’, ‘engine’,)
output_required =3D (‘success’,)
output_optional =3D (‘options’,)

def handle(self):
    self.logger.info("Request received")

    # # Request parameters
    msg =3D 'Hello AMQP broker!'
    out_name =3D 'ScreenshotBatch'
    exchange =3D 'screenshots.service.batch.requests'
    routing_key =3D 'process'
    properties =3D {'app_id': 'ESB'}
    headers =3D {'X-Foo': 'bar'}
   =20
    # Send a message to the broker
    self.outgoing.amqp.send(msg, out_name, exchange, routing_key,
        properties, headers)
    self.logger.info("Message sent")
   =20
    self.response.payload.success =3D True

but then the service stops working with the following error :

2015-04-19 23:29:29,749 - ERROR - 8125:Thread-3 - zato.server.base:22 - =
Could not handle broker msg:[Bunch(action=3Du’100803’, args=3D[], =
body=3Du’{“url”: “google.com”, “engine”: “beggra”}’, =
exchange=3Du’screenshots.service.batch.requests’, headers=3D{}, =
kwargs=3D{}, msg_type=3Du’0003’, out_name=3Du’ScreenshotBatch’, =
properties=3D{u’user_id’: u’ivan’}, routing_key=3Du’process’)], =
e:[Traceback (most recent call last):
File “/opt/zato/2.0.3/zato-server/src/zato/server/base/init.py”, =
line 47, in on_broker_msg
getattr(self, handler)(msg)
File =
"/opt/zato/2.0.3/zato-server/src/zato/server/connection/amqp/outgoing.py",=
line 220, in on_broker_msg_OUTGOING_AMQP_PUBLISH
producer.publish(msg.body, routing_key=3Dmsg.routing_key, =
exchange=3Dmsg.exchange, headers=3Dheaders, **properties)
File “/opt/zato/2.0.3/eggs/kombu-2.5.10-py2.7.egg/kombu/messaging.py”, =
line 164, in publish
routing_key, mandatory, immediate, exchange, declare)
File “/opt/zato/2.0.3/eggs/kombu-2.5.10-py2.7.egg/kombu/messaging.py”, =
line 180, in _publish
mandatory=3Dmandatory, immediate=3Dimmediate,
File “/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/channel.py”, =
line 2099, in basic_publish
self._send_method((60, 40), args, msg)
File =
"/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/abstract_channel.py", =
line 58, in _send_method
self.channel_id, method_sig, args, content,
File =
"/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/method_framing.py", =
line 219, in write_method
write_frame(1, channel, payload)
File “/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/transport.py”, =
line 157, in write_frame
pack(’>BHI%dsB’ % size, frame_type, channel, size, payload, 0xce),
File “/usr/lib/python2.7/socket.py”, line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
]

Or this one

2015-04-19 23:28:54,123 - ERROR - 8125:Thread-3 - zato.server.base:22 - =
Could not handle broker msg:[Bunch(action=3Du’100803’, args=3D[], =
body=3Du’{“url”: “google.com”, “engine”: “beggra”}’, =
exchange=3Du’screenshots.service.batch.requests’, headers=3D{}, =
kwargs=3D{}, msg_type=3Du’0003’, out_name=3Du’ScreenshotBatch’, =
properties=3D{u’user_id’: u’ivan’}, routing_key=3Du’process’)], =
e:[Traceback (most recent call last):
File “/opt/zato/2.0.3/zato-server/src/zato/server/base/init.py”, =
line 47, in on_broker_msg
getattr(self, handler)(msg)
File =
"/opt/zato/2.0.3/zato-server/src/zato/server/connection/amqp/outgoing.py",=
line 220, in on_broker_msg_OUTGOING_AMQP_PUBLISH
producer.publish(msg.body, routing_key=3Dmsg.routing_key, =
exchange=3Dmsg.exchange, headers=3Dheaders, **properties)
File “/opt/zato/2.0.3/eggs/kombu-2.5.10-py2.7.egg/kombu/messaging.py”, =
line 164, in publish
routing_key, mandatory, immediate, exchange, declare)
File “/opt/zato/2.0.3/eggs/kombu-2.5.10-py2.7.egg/kombu/messaging.py”, =
line 180, in _publish
mandatory=3Dmandatory, immediate=3Dimmediate,
File “/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/channel.py”, =
line 2099, in basic_publish
self._send_method((60, 40), args, msg)
File =
"/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/abstract_channel.py", =
line 58, in _send_method
self.channel_id, method_sig, args, content,
File =
"/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/method_framing.py", =
line 219, in write_method
write_frame(1, channel, payload)
File “/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/transport.py”, =
line 157, in write_frame
pack(’>BHI%dsB’ % size, frame_type, channel, size, payload, 0xce),
File “/usr/lib/python2.7/socket.py”, line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 104] Connection reset by peer
]

I can see that zato is connected to rabbit, and here is the rabbit log.

working
=3DINFO REPORT=3D=3D=3D=3D 19-Apr-2015::23:21:08 =3D=3D=3D
accepting AMQP connection <0.2485.0> (192.168.9.7:42980 -> =
192.168.9.53:5672)
working
=3DINFO REPORT=3D=3D=3D=3D 19-Apr-2015::23:21:08 =3D=3D=3D
closing AMQP connection <0.2289.0> (192.168.9.7:42645 -> =
192.168.9.53:5672)
working
=3DINFO REPORT=3D=3D=3D=3D 19-Apr-2015::23:21:08 =3D=3D=3D
closing AMQP connection <0.2298.0> (192.168.9.7:42648 -> =
192.168.9.53:5672)
working
=3DINFO REPORT=3D=3D=3D=3D 19-Apr-2015::23:21:15 =3D=3D=3D
accepting AMQP connection <0.2530.0> (192.168.9.7:42992 -> =
192.168.9.53:5672)
zato error
=3DERROR REPORT=3D=3D=3D=3D 19-Apr-2015::23:24:15 =3D=3D=3D
closing AMQP connection <0.2530.0> (192.168.9.7:42992 -> =
192.168.9.53:5672):
{heartbeat_timeout,running}

I=E2=80=99ve changed the connection heartbeat to 10, 60 600 seconds but =
the error still happens, the only way I can make the service to publish =
messages again is to Edit the Connection Definition, without making any =
changes

This seems to be a zato problem, because I wrote a standalone producer =
with kombu and is running on the same server zato is and it keeps =
pushing messages after zato fails. I=E2=80=99ve tested this with zato =
2.0.3

This problem goes away if I set the heartbeat to 0 in the connection =
definition, any other value causes this behavior and it won=E2=80=99t =
reconnect automatically, the only way to make it work again, is by =
forcing zato to reconnect by editing the connection definition.

This problem happened with zato 2.0.3 and RabbitMQ 3.5.0, Erlang R15B01

Is there a way to know when a message was not published? The tracebacks =
are running on a different thread from my service so my service always =
return a success.

On Apr 19, 2015, at 11:58 PM, Ivan Villareal ivaano@gmail.com wrote:
=20
Something is wrong with AMQP Connections,
=20
I wrote a simple amqp service to publish a message it works fine for a =
few seconds sometimes minutes.
=20
class ScreenshotPublisher(Service):
class SimpleIO:
input_required =3D (‘url’, ‘engine’,)
output_required =3D (‘success’,)
output_optional =3D (‘options’,)
=20
def handle(self):
self.logger.info(“Request received”)
=20
# # Request parameters
msg =3D 'Hello AMQP broker!'
out_name =3D 'ScreenshotBatch’
exchange =3D 'screenshots.service.batch.requests’
routing_key =3D 'process’
properties =3D {‘app_id’: ‘ESB’}
headers =3D {‘X-Foo’: ‘bar’}
=20
# Send a message to the broker
self.outgoing.amqp.send(msg, out_name, exchange, routing_key,
properties, headers)
self.logger.info(“Message sent”)
=20
self.response.payload.success =3D True
=20
=20
but then the service stops working with the following error :
=20
2015-04-19 23:29:29,749 - ERROR - 8125:Thread-3 - zato.server.base:22 =

  • Could not handle broker msg:[Bunch(action=3Du’100803’, args=3D[], =
    body=3Du’{“url”: “google.com http://google.com/”, “engine”: =
    “beggra”}’, exchange=3Du’screenshots.service.batch.requests’, =
    headers=3D{}, kwargs=3D{}, msg_type=3Du’0003’, =
    out_name=3Du’ScreenshotBatch’, properties=3D{u’user_id’: u’ivan’}, =
    routing_key=3Du’process’)], e:[Traceback (most recent call last):

File “/opt/zato/2.0.3/zato-server/src/zato/server/base/init.py”, =
line 47, in on_broker_msg
getattr(self, handler)(msg)
File =
"/opt/zato/2.0.3/zato-server/src/zato/server/connection/amqp/outgoing.py",=
line 220, in on_broker_msg_OUTGOING_AMQP_PUBLISH
producer.publish(msg.body, routing_key=3Dmsg.routing_key, =
exchange=3Dmsg.exchange, headers=3Dheaders, **properties)
File =
"/opt/zato/2.0.3/eggs/kombu-2.5.10-py2.7.egg/kombu/messaging.py", line =
164, in publish
routing_key, mandatory, immediate, exchange, declare)
File =
"/opt/zato/2.0.3/eggs/kombu-2.5.10-py2.7.egg/kombu/messaging.py", line =
180, in _publish
mandatory=3Dmandatory, immediate=3Dimmediate,
File “/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/channel.py”, =
line 2099, in basic_publish
self._send_method((60, 40), args, msg)
File =
"/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/abstract_channel.py", =
line 58, in _send_method
self.channel_id, method_sig, args, content,
File =
"/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/method_framing.py", =
line 219, in write_method
write_frame(1, channel, payload)
File “/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/transport.py”, =
line 157, in write_frame
pack(’>BHI%dsB’ % size, frame_type, channel, size, payload, 0xce),
File “/usr/lib/python2.7/socket.py”, line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
]
=20
=20
Or this one
=20
=20
2015-04-19 23:28:54,123 - ERROR - 8125:Thread-3 - zato.server.base:22 =

  • Could not handle broker msg:[Bunch(action=3Du’100803’, args=3D[], =
    body=3Du’{“url”: “google.com http://google.com/”, “engine”: =
    “beggra”}’, exchange=3Du’screenshots.service.batch.requests’, =
    headers=3D{}, kwargs=3D{}, msg_type=3Du’0003’, =
    out_name=3Du’ScreenshotBatch’, properties=3D{u’user_id’: u’ivan’}, =
    routing_key=3Du’process’)], e:[Traceback (most recent call last):

File “/opt/zato/2.0.3/zato-server/src/zato/server/base/init.py”, =
line 47, in on_broker_msg
getattr(self, handler)(msg)
File =
"/opt/zato/2.0.3/zato-server/src/zato/server/connection/amqp/outgoing.py",=
line 220, in on_broker_msg_OUTGOING_AMQP_PUBLISH
producer.publish(msg.body, routing_key=3Dmsg.routing_key, =
exchange=3Dmsg.exchange, headers=3Dheaders, **properties)
File =
"/opt/zato/2.0.3/eggs/kombu-2.5.10-py2.7.egg/kombu/messaging.py", line =
164, in publish
routing_key, mandatory, immediate, exchange, declare)
File =
"/opt/zato/2.0.3/eggs/kombu-2.5.10-py2.7.egg/kombu/messaging.py", line =
180, in _publish
mandatory=3Dmandatory, immediate=3Dimmediate,
File “/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/channel.py”, =
line 2099, in basic_publish
self._send_method((60, 40), args, msg)
File =
"/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/abstract_channel.py", =
line 58, in _send_method
self.channel_id, method_sig, args, content,
File =
"/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/method_framing.py", =
line 219, in write_method
write_frame(1, channel, payload)
File “/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/transport.py”, =
line 157, in write_frame
pack(’>BHI%dsB’ % size, frame_type, channel, size, payload, 0xce),
File “/usr/lib/python2.7/socket.py”, line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 104] Connection reset by peer
]
=20
I can see that zato is connected to rabbit, and here is the rabbit =
log.
=20
=20
working
=3DINFO REPORT=3D=3D=3D=3D 19-Apr-2015::23:21:08 =3D=3D=3D
accepting AMQP connection <0.2485.0> (192.168.9.7:42980 -> =
192.168.9.53:5672)
working
=3DINFO REPORT=3D=3D=3D=3D 19-Apr-2015::23:21:08 =3D=3D=3D
closing AMQP connection <0.2289.0> (192.168.9.7:42645 -> =
192.168.9.53:5672)
working
=3DINFO REPORT=3D=3D=3D=3D 19-Apr-2015::23:21:08 =3D=3D=3D
closing AMQP connection <0.2298.0> (192.168.9.7:42648 -> =
192.168.9.53:5672)
working
=3DINFO REPORT=3D=3D=3D=3D 19-Apr-2015::23:21:15 =3D=3D=3D
accepting AMQP connection <0.2530.0> (192.168.9.7:42992 -> =
192.168.9.53:5672)
zato error
=3DERROR REPORT=3D=3D=3D=3D 19-Apr-2015::23:24:15 =3D=3D=3D
closing AMQP connection <0.2530.0> (192.168.9.7:42992 -> =
192.168.9.53:5672):
{heartbeat_timeout,running}
=20
=20
I=E2=80=99ve changed the connection heartbeat to 10, 60 600 seconds =
but the error still happens, the only way I can make the service to =
publish messages again is to Edit the Connection Definition, without =
making any changes
=20
=20
This seems to be a zato problem, because I wrote a standalone producer =
with kombu and is running on the same server zato is and it keeps =
pushing messages after zato fails. I=E2=80=99ve tested this with zato =
2.0.3
=20

On 20/04/15 20:09, Ivan Villareal wrote:

Is there a way to know when a message was not published? The tracebacks
are running on a different thread from my service so my service always
return a success.

Hi Ivan,

right now it’s not possible - a work towards that direction started in
GH master and in 2.1 AMQP/WMQ messages will be published in the same
thread/greenlet so you will be able to catch any exception right away.

2.0 added it for ZeroMQ but not for AMQP and WMQ yet.

the only way I can make the service to publish messages again is to
Edit the Connection Definition, without making any changes

Can you please send relevant snippets from admin.log at the time you
issue both Create and Edit? This is the log file that keep I/O of
admin-related internal services invoked from web-admin.

thanks,