Deactivating an AMQP channel is not closing the connections


#1

I am using Zato 3.0 with 1 Server and 4 Gunicon workers and I am using RabbitMQ as my AMQP application.

when I configure an AMQP channel with prefetch count 1 and poll size as 1, 4 connections & 4 channels are getting created in the AMQP. I think this is because of the 4 workers and each worker will create a connection and channel.

What I have observed is that Deactivating an AMQP channel is not closing the connections and channels as it used to happen in Zato 2.0. :slight_smile:

Can you let me know what could possibly be the issue.


#2

Can you clarify what you mean by ‘deactivating an AMQP channel’?

Also, exactly how do you confirm that there are still TCP connections to RabbitMQ?


#3

Hi,

By deactivating an AMQP channel I mean to set the Active flag to false in the AMQP channel entry.
image

And I am verifying the active connections in the RabbitMQ application as shown below:
image

When I adjust the pool size against a channel the number of entries shown in the Connection/Channels tab are being adjusted as per the count. But When I uncheck the Active flag against an AMQP channel then I could see that the Zato log says all the AMQP channels are stopped. But the same is not getting reflected in the RabbitMQ.

Connections/Channels tab in RabbitMQ still shows the active connections.

Also, we had one more observation, if we set the Pool size of the AMQP channel to 0 then all the conections are being terminated. But later we are not able to use the same channel record again.

Please let me know if you need further details on this issue.


#4

I don’t understand what you mean by ‘not being able to use the same channel record again’?


#5

If I make the pool size in AMQP channel to 0 and then if I change the pool size to be other than 0.
Then I am seeing the following error in the zato log.

2018-08-28 13:35:54,734 - ERROR - 534:DummyThread-67 - zato.broker:57 - Could not handle broker msg:Bunch(ack_mode='ack', action='101001', cluster_id=1, consumer_tag_prefix='test_amqp_channel', data_format='json', def_id=1, def_name='AMQP_definition', id=2, is_active=True, msg_type='0002', name='Test_AMQP_Channel', old_name='Test_AMQP_Channel', pool_size=2, prefetch_count=0, queue='TEST_AMQP_Queue', service='test-amqp.dequeue', service_name='test-amqp.dequeue'), e:`Traceback (most recent call last):
File “/opt/zato/3.0/code/zato-broker/src/zato/broker/init.py”, line 52, in on_broker_msg
getattr(self, handler)(msg)
File “/opt/zato/3.0/code/zato-server/src/zato/server/base/worker/amqp_.py”, line 102, in on_broker_msg_CHANNEL_AMQP_EDIT
self.amqp_api.edit_channel(msg.def_name, msg)
File “/opt/zato/3.0/code/zato-server/src/zato/server/connection/connector/init.py”, line 362, in edit_channel
self.connectors[name].edit_channel(config)
File “/opt/zato/3.0/code/zato-server/src/zato/server/connection/amqp_.py”, line 446, in edit_channel
self.delete_channel(config)
File "/opt/zato/3.0/code/zato-server/src/zato/server/connection/amqp
.py", line 460, in _delete_channel
consumers = self._consumers[config.name]
KeyError: ‘Test_AMQP_Channel’


#6

All of this is dealt with - feel free to install the latest updates.


#7

Thanks for making the necessary changes. I will perform the update and let you know the results.

Meanwhile I have an observation and not sure whether this is related to this topic or not. Sometimes I will see the below error message while starting the Zato server and it will show worker failed to boot:

2018-08-29 05:32:37,086 - INFO - 2566:MainThread - zato.common.util.posix_ipc_:141 - Waiting for parent/key /pubsub/pid current (timeout: 10s)
2018-08-29 05:32:38,097 - INFO - 2566:MainThread - zato.common.util.posix_ipc_:141 - Waiting for parent/key /pubsub/pid current (timeout: 10s)
2018-08-29 05:32:39,122 - INFO - 2566:MainThread - zato.common.util.posix_ipc_:141 - Waiting for parent/key /pubsub/pid current (timeout: 10s)
2018-08-29 05:32:40,144 - INFO - 2566:MainThread - zato.common.util.posix_ipc_:141 - Waiting for parent/key /pubsub/pid current (timeout: 10s)
2018-08-29 05:32:41,165 - INFO - 2566:MainThread - zato.common.util.posix_ipc_:141 - Waiting for parent/key /pubsub/pid current (timeout: 10s)
2018-08-29 05:32:42,193 - INFO - 2566:MainThread - zato.common.util.posix_ipc_:141 - Waiting for parent/key /pubsub/pid current (timeout: 10s)
2018-08-29 05:32:43,203 - INFO - 2566:MainThread - zato.common.util.posix_ipc_:141 - Waiting for parent/key /pubsub/pid current (timeout: 10s)
2018-08-29 05:32:44,215 - INFO - 2566:MainThread - zato.common.util.posix_ipc_:141 - Waiting for parent/key /pubsub/pid current (timeout: 10s)
2018-08-29 05:32:45,249 - INFO - 2566:MainThread - zato.common.util.posix_ipc_:141 - Waiting for parent/key /pubsub/pid current (timeout: 10s)
2018-08-29 05:32:46,165 - WARNING - 2566:MainThread - zato.common.util.posix_ipc_:149 - Could not get parent/key /pubsub/pid current after 10s
2018-08-29 05:32:46,166 - ERROR - 2566:MainThread - gunicorn.main:182 - Exception in worker process:
Traceback (most recent call last):
File “/opt/zato/3.0/code/local/lib/python2.7/site-packages/gunicorn/arbiter.py”, line 495, in spawn_worker
self.cfg.post_fork(self, worker)
File “/opt/zato/3.0/code/zato-server/src/zato/server/base/parallel/init.py”, line 784, in post_fork
ParallelServer.start_server(worker.app.zato_wsgi_app, arbiter.zato_deployment_key)
File “/opt/zato/3.0/code/zato-server/src/zato/server/base/parallel/init.py”, line 469, in start_server
self.worker_store.set_broker_client(self.broker_client)
File “/opt/zato/3.0/code/zato-server/src/zato/server/base/worker/init.py”, line 322, in set_broker_client
self.after_broker_client_set()
File “/opt/zato/3.0/code/zato-server/src/zato/server/base/worker/init.py”, line 316, in after_broker_client_set
self.init_pubsub()
File “/opt/zato/3.0/code/zato-server/src/zato/server/base/worker/init.py”, line 791, in init_pubsub
self.pubsub.subscribe(config)
File “/opt/zato/3.0/code/zato-server/src/zato/server/pubsub/init.py”, line 1030, in subscribe
config.server_pid = self.server.server_startup_ipc.get_pubsub_pid()
File "/opt/zato/3.0/code/zato-common/src/zato/common/util/posix_ipc
.py", line 170, in get_pubsub_pid
return self.get_key(self.pubsub_pid, ‘current’, timeout)
File "/opt/zato/3.0/code/zato-common/src/zato/common/util/posix_ipc
.py", line 150, in get_key
raise KeyError(msg)
KeyError: u’Could not get parent/key /pubsub/pid current after 10s’

And this will get resolved after a couple of zato restarts. Can you please let me know what could be the issue.

From the log message, I assumed that the issue might be with connecting to either Redis or RabbitMQ, but both the applications are running when I verified.

Thanks in Advance.


#8

Hi Dariusz,

Thanks for you help. I see that the issues in AMQP channels are resolved with the latest sources.