(This message has been automatically imported from the retired mailing list)
Something is wrong with AMQP Connections,
I wrote a simple amqp service to publish a message it works fine for a =
few seconds sometimes minutes.
class ScreenshotPublisher(Service):
class SimpleIO:
input_required =3D (‘url’, ‘engine’,)
output_required =3D (‘success’,)
output_optional =3D (‘options’,)
def handle(self):
self.logger.info("Request received")
# # Request parameters
msg =3D 'Hello AMQP broker!'
out_name =3D 'ScreenshotBatch'
exchange =3D 'screenshots.service.batch.requests'
routing_key =3D 'process'
properties =3D {'app_id': 'ESB'}
headers =3D {'X-Foo': 'bar'}
=20
# Send a message to the broker
self.outgoing.amqp.send(msg, out_name, exchange, routing_key,
properties, headers)
self.logger.info("Message sent")
=20
self.response.payload.success =3D True
but then the service stops working with the following error :
2015-04-19 23:29:29,749 - ERROR - 8125:Thread-3 - zato.server.base:22 - =
Could not handle broker msg:[Bunch(action=3Du’100803’, args=3D[], =
body=3Du’{“url”: “google.com”, “engine”: “beggra”}’, =
exchange=3Du’screenshots.service.batch.requests’, headers=3D{}, =
kwargs=3D{}, msg_type=3Du’0003’, out_name=3Du’ScreenshotBatch’, =
properties=3D{u’user_id’: u’ivan’}, routing_key=3Du’process’)], =
e:[Traceback (most recent call last):
File “/opt/zato/2.0.3/zato-server/src/zato/server/base/init.py”, =
line 47, in on_broker_msg
getattr(self, handler)(msg)
File =
"/opt/zato/2.0.3/zato-server/src/zato/server/connection/amqp/outgoing.py",=
line 220, in on_broker_msg_OUTGOING_AMQP_PUBLISH
producer.publish(msg.body, routing_key=3Dmsg.routing_key, =
exchange=3Dmsg.exchange, headers=3Dheaders, **properties)
File “/opt/zato/2.0.3/eggs/kombu-2.5.10-py2.7.egg/kombu/messaging.py”, =
line 164, in publish
routing_key, mandatory, immediate, exchange, declare)
File “/opt/zato/2.0.3/eggs/kombu-2.5.10-py2.7.egg/kombu/messaging.py”, =
line 180, in _publish
mandatory=3Dmandatory, immediate=3Dimmediate,
File “/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/channel.py”, =
line 2099, in basic_publish
self._send_method((60, 40), args, msg)
File =
"/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/abstract_channel.py", =
line 58, in _send_method
self.channel_id, method_sig, args, content,
File =
"/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/method_framing.py", =
line 219, in write_method
write_frame(1, channel, payload)
File “/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/transport.py”, =
line 157, in write_frame
pack(’>BHI%dsB’ % size, frame_type, channel, size, payload, 0xce),
File “/usr/lib/python2.7/socket.py”, line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
]
Or this one
2015-04-19 23:28:54,123 - ERROR - 8125:Thread-3 - zato.server.base:22 - =
Could not handle broker msg:[Bunch(action=3Du’100803’, args=3D[], =
body=3Du’{“url”: “google.com”, “engine”: “beggra”}’, =
exchange=3Du’screenshots.service.batch.requests’, headers=3D{}, =
kwargs=3D{}, msg_type=3Du’0003’, out_name=3Du’ScreenshotBatch’, =
properties=3D{u’user_id’: u’ivan’}, routing_key=3Du’process’)], =
e:[Traceback (most recent call last):
File “/opt/zato/2.0.3/zato-server/src/zato/server/base/init.py”, =
line 47, in on_broker_msg
getattr(self, handler)(msg)
File =
"/opt/zato/2.0.3/zato-server/src/zato/server/connection/amqp/outgoing.py",=
line 220, in on_broker_msg_OUTGOING_AMQP_PUBLISH
producer.publish(msg.body, routing_key=3Dmsg.routing_key, =
exchange=3Dmsg.exchange, headers=3Dheaders, **properties)
File “/opt/zato/2.0.3/eggs/kombu-2.5.10-py2.7.egg/kombu/messaging.py”, =
line 164, in publish
routing_key, mandatory, immediate, exchange, declare)
File “/opt/zato/2.0.3/eggs/kombu-2.5.10-py2.7.egg/kombu/messaging.py”, =
line 180, in _publish
mandatory=3Dmandatory, immediate=3Dimmediate,
File “/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/channel.py”, =
line 2099, in basic_publish
self._send_method((60, 40), args, msg)
File =
"/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/abstract_channel.py", =
line 58, in _send_method
self.channel_id, method_sig, args, content,
File =
"/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/method_framing.py", =
line 219, in write_method
write_frame(1, channel, payload)
File “/opt/zato/2.0.3/eggs/amqp-1.0.11-py2.7.egg/amqp/transport.py”, =
line 157, in write_frame
pack(’>BHI%dsB’ % size, frame_type, channel, size, payload, 0xce),
File “/usr/lib/python2.7/socket.py”, line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 104] Connection reset by peer
]
I can see that zato is connected to rabbit, and here is the rabbit log.
working
=3DINFO REPORT=3D=3D=3D=3D 19-Apr-2015::23:21:08 =3D=3D=3D
accepting AMQP connection <0.2485.0> (192.168.9.7:42980 -> =
192.168.9.53:5672)
working
=3DINFO REPORT=3D=3D=3D=3D 19-Apr-2015::23:21:08 =3D=3D=3D
closing AMQP connection <0.2289.0> (192.168.9.7:42645 -> =
192.168.9.53:5672)
working
=3DINFO REPORT=3D=3D=3D=3D 19-Apr-2015::23:21:08 =3D=3D=3D
closing AMQP connection <0.2298.0> (192.168.9.7:42648 -> =
192.168.9.53:5672)
working
=3DINFO REPORT=3D=3D=3D=3D 19-Apr-2015::23:21:15 =3D=3D=3D
accepting AMQP connection <0.2530.0> (192.168.9.7:42992 -> =
192.168.9.53:5672)
zato error
=3DERROR REPORT=3D=3D=3D=3D 19-Apr-2015::23:24:15 =3D=3D=3D
closing AMQP connection <0.2530.0> (192.168.9.7:42992 -> =
192.168.9.53:5672):
{heartbeat_timeout,running}
I=E2=80=99ve changed the connection heartbeat to 10, 60 600 seconds but =
the error still happens, the only way I can make the service to publish =
messages again is to Edit the Connection Definition, without making any =
changes
This seems to be a zato problem, because I wrote a standalone producer =
with kombu and is running on the same server zato is and it keeps =
pushing messages after zato fails. I=E2=80=99ve tested this with zato =
2.0.3