Working with the Zato cache and SQLAlchemy

Hey there!

I’ve been reading about the caching module in Zato (forum, blog and docs) and I have some doubts. I have the following REST channels:

GET /genesisng/logins/{id}/get
POST /genesisng/logins/validate
POST /genesisng/logins/create
GET /genesisng/logins/{id}/delete
POST /genesisng/logins/{id}/update
GET /genesisng/logins/list (accepts query string parameters)

GET /genesisng/guests/{id}/get
POST /genesisng/guests/create
GET /genesisng/guests/{id}/delete
POST /genesisng/guests/{id}/update
GET /genesisng/guests/list (accepts query string parameters)
GET /genesisng/guests/{id}/bookings
GET /genesisng/guests/{id}/restore

// And so on

For now I have created two caches, logins and guests, with default values. In case of logins, I understand I have two options:

  1. Go to the /genesisng/logins/{id}/get channel and select the logins cache.
  2. Go to the Get method and add the record to the cache manually:
class Get(Service):
    """Service class to get a login by id."""
    """Channel /genesisng/logins/{id}/get."""

    class SimpleIO:
        input_required = (Integer('id'))
        # Passwords never travel back to the client side
        output_optional = ('id', 'username', 'name', 'surname', 'email',
                           'is_admin')
        skip_empty_keys = True

    def handle(self):
        conn = self.user_config.genesisng.database.connection
        id_ = self.request.input.id

        with closing(self.outgoing.sql.get(conn).session()) as session:
            result = session.query(Login).filter(Login.id == id_).one_or_none()

            if result:
                # Save the record in the cache
                cache = self.cache.get_cache('builtin', 'logins')
                cache.set('id-%s' % id_, result)
                self.response.status_code = OK
                self.response.payload = result
            else:
                self.response.status_code = NOT_FOUND

Now some questions arise:

  1. If I use the web admin to select the logins cache in the get channel, I take it I’d also have to select it in the create, update and delete channels. But how will Zato know when a call to the Update service means cancelling/refreshing/updating the cache for that record?

  2. If I handle it manually, I take it that I have to check, before instructing SQLAlchemy to issue the SELECT statement, whether there is a copy in the cache. And invalidate or update it when I update or delete a record. All of this using the real-world commands of the API.

Thanks.

So I’ve taken the second approach, that is, use the API to manage the cache manually. I have created the logins cache with default values. When trying to store the result from a single-item SELECT on the database I get a TypeError because the SQLAlchemy result is not JSON serializable.

This is the model class:

class Login(Base):
    __tablename__ = 'login'

    id = Column(Integer, primary_key=True)
    username = Column(String(20), index=True, unique=True, nullable=False)
    password = Column(String(255), nullable=False)
    name = Column(String(50), index=True)
    surname = Column(String(50), index=True)
    email = Column(String(255), index=True, unique=True)
    is_admin = Column(Boolean, default=False)

    def __repr__(self):
        return "<Login(id='%s', username='%s', name='%s', surname='%s', email='%s')>" % (
            self.id, self.username, self.name, self.surname, self.email)

And this is the service being executed to get a login:

class Get(Service):
    """Service class to get a login by id."""
    """Channel /genesisng/logins/{id}/get."""

    class SimpleIO:
        input_required = (Integer('id'))
        # Passwords never travel back to the client side
        output_optional = ('id', 'username', 'name', 'surname', 'email',
                           'is_admin')
        skip_empty_keys = True

    def handle(self):
        conn = self.user_config.genesisng.database.connection
        id_ = self.request.input.id

        # Check whether a copy exists in the cache
        cache_key = 'id-%s' % id_
        cache = self.cache.get_cache('builtin', 'logins')
        result = cache.get(cache_key)
        if result:
            self.response.status_code = OK
            self.response.payload = result
            return

        with closing(self.outgoing.sql.get(conn).session()) as session:
            result = session.query(Login).filter(Login.id == id_).one_or_none()

            if result:
                # Save the record in the cache
                cache.set(cache_key, result)
                self.response.status_code = OK
                self.response.payload = result
            else:
                self.response.status_code = NOT_FOUND

I’m using curl to test the service:

$ curl -v -g "http://127.0.0.1:11223/genesisng/logins/1/get"; echo ""
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 11223 (#0)
> GET /genesisng/logins/1/get HTTP/1.1
> Host: 127.0.0.1:11223
> User-Agent: curl/7.58.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: Zato
< Date: Tue, 06 Nov 2018 12:14:16 GMT
< Connection: close
< Transfer-Encoding: chunked
< Content-Type: application/json
< X-Zato-CID: 9fdd6abef941eec212af8271
< 
* Closing connection 0
{"response": {"username": "jsabater", "surname": "Sabater", "name": "Jaume", "id": 1, "is_admin": true, "email": "jsabater@gmail.com"}}

This is the trace of the exception:

2018-11-06 12:14:16,860 DEBG 'zato-server2' stdout output:
2018-11-06 12:14:16,859 - WARNING - 140:DummyThread-15 - zato.server.connection.cache:727 - Could not run `SET` after_state_changed in cache `logins`, data:`{u'msg_type': '0002', 'expires_at': 0.0, u'expiry': 0.0, u'source_worker_id': u'1.2.140.8f44618863f394688dc495f7', u'value': <Login(id='1', username='jsabater', name='Jaume', surname='Sabater', email='jsabater@gmail.com')>, u'key': u'id-1', u'action': '106420', u'cache_name': u'logins'}`, e:`Traceback (most recent call last):
  File "/opt/zato/3.0/code/zato-server/src/zato/server/connection/cache.py", line 724, in after_state_changed
    self.server.broker_client.publish(data)
  File "/opt/zato/3.0/code/zato-broker/src/zato/broker/client.py", line 164, in publish
    self.pub_client.publish(topic, dumps(msg))
  File "/opt/zato/3.0/code/local/lib/python2.7/site-packages/anyjson/__init__.py", line 141, in dumps
    return implementation.dumps(value, *args, **kwargs)
  File "/opt/zato/3.0/code/local/lib/python2.7/site-packages/anyjson/__init__.py", line 87, in dumps
    return self._encode(data, *args, **kwargs)
  File "/opt/zato/3.0/code/local/lib/python2.7/site-packages/simplejson/__init__.py", line 354, in dumps
    return _default_encoder.encode(obj)
  File "/opt/zato/3.0/code/local/lib/python2.7/site-packages/simplejson/encoder.py", line 262, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/opt/zato/3.0/code/local/lib/python2.7/site-packages/simplejson/encoder.py", line 340, in iterencode
    return _iterencode(o, 0)
  File "/opt/zato/3.0/code/local/lib/python2.7/site-packages/simplejson/encoder.py", line 239, in default
    raise TypeError(repr(o) + " is not JSON serializable")
TypeError: <Login(id='1', username='jsabater', name='Jaume', surname='Sabater', email='jsabater@gmail.com')> is not JSON serializable

I take it that the result that SQLAlchemy returns cannot be stored into the cache directly (I recall it’s a NamedTuple, or something) but I have not been able to find any example of it on the docs.

Using dictalchemy seems to help with the issue. This is my Get service right now (working but unpolished):

class Get(Service):
    """Service class to get a login by id."""
    """Channel /genesisng/logins/{id}/get."""

    class SimpleIO:
        input_required = (Integer('id'))
        # Passwords never travel back to the client side
        output_optional = ('id', 'username', 'name', 'surname', 'email',
                           'is_admin')
        skip_empty_keys = True

    def handle(self):
        conn = self.user_config.genesisng.database.connection
        id_ = self.request.input.id

        # Check whether a copy exists in the cache
        cache_key = 'id-%s' % id_
        cache = self.cache.get_cache('builtin', 'logins')
        login = Login()
        cache_data = cache.get(cache_key)
        result = login.fromdict(cache_data, allow_pk=True) if cache_data else None
        if result:
            self.response.status_code = OK
            self.response.headers['Cache-Control'] = 'public'
            self.response.payload = result
            return

        with closing(self.outgoing.sql.get(conn).session()) as session:
            result = session.query(Login).filter(Login.id == id_).one_or_none()

            if result:
                # Save the record in the cache
                cache.set(cache_key, result.asdict())
                self.response.status_code = OK
                self.response.payload = result
            else:
                self.response.status_code = NOT_FOUND

This is the execution of curl right now:

$ curl -v -g "http://127.0.0.1:11223/genesisng/logins/1/get"; echo ""
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 11223 (#0)
> GET /genesisng/logins/1/get HTTP/1.1
> Host: 127.0.0.1:11223
> User-Agent: curl/7.58.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: Zato
< Date: Tue, 06 Nov 2018 14:23:40 GMT
< Connection: close
< Transfer-Encoding: chunked
< Content-Type: application/json
< X-Zato-CID: 74b8ff3ff0a558a5394884c1
< Cache-Control: public
< 
* Closing connection 0
{"response": {"username": "jsabater", "surname": "Sabater", "name": "Jaume", "id": 1, "is_admin": true, "email": "jsabater@gmail.com"}}

Is this they way to go?