Working with the Zato cache and SQLAlchemy


#1

Hey there!

I’ve been reading about the caching module in Zato (forum, blog and docs) and I have some doubts. I have the following REST channels:

GET /genesisng/logins/{id}/get
POST /genesisng/logins/validate
POST /genesisng/logins/create
GET /genesisng/logins/{id}/delete
POST /genesisng/logins/{id}/update
GET /genesisng/logins/list (accepts query string parameters)

GET /genesisng/guests/{id}/get
POST /genesisng/guests/create
GET /genesisng/guests/{id}/delete
POST /genesisng/guests/{id}/update
GET /genesisng/guests/list (accepts query string parameters)
GET /genesisng/guests/{id}/bookings
GET /genesisng/guests/{id}/restore

// And so on

For now I have created two caches, logins and guests, with default values. In case of logins, I understand I have two options:

  1. Go to the /genesisng/logins/{id}/get channel and select the logins cache.
  2. Go to the Get method and add the record to the cache manually:
class Get(Service):
    """Service class to get a login by id."""
    """Channel /genesisng/logins/{id}/get."""

    class SimpleIO:
        input_required = (Integer('id'))
        # Passwords never travel back to the client side
        output_optional = ('id', 'username', 'name', 'surname', 'email',
                           'is_admin')
        skip_empty_keys = True

    def handle(self):
        conn = self.user_config.genesisng.database.connection
        id_ = self.request.input.id

        with closing(self.outgoing.sql.get(conn).session()) as session:
            result = session.query(Login).filter(Login.id == id_).one_or_none()

            if result:
                # Save the record in the cache
                cache = self.cache.get_cache('builtin', 'logins')
                cache.set('id-%s' % id_, result)
                self.response.status_code = OK
                self.response.payload = result
            else:
                self.response.status_code = NOT_FOUND

Now some questions arise:

  1. If I use the web admin to select the logins cache in the get channel, I take it I’d also have to select it in the create, update and delete channels. But how will Zato know when a call to the Update service means cancelling/refreshing/updating the cache for that record?

  2. If I handle it manually, I take it that I have to check, before instructing SQLAlchemy to issue the SELECT statement, whether there is a copy in the cache. And invalidate or update it when I update or delete a record. All of this using the real-world commands of the API.

Thanks.


#2

So I’ve taken the second approach, that is, use the API to manage the cache manually. I have created the logins cache with default values. When trying to store the result from a single-item SELECT on the database I get a TypeError because the SQLAlchemy result is not JSON serializable.

This is the model class:

class Login(Base):
    __tablename__ = 'login'

    id = Column(Integer, primary_key=True)
    username = Column(String(20), index=True, unique=True, nullable=False)
    password = Column(String(255), nullable=False)
    name = Column(String(50), index=True)
    surname = Column(String(50), index=True)
    email = Column(String(255), index=True, unique=True)
    is_admin = Column(Boolean, default=False)

    def __repr__(self):
        return "<Login(id='%s', username='%s', name='%s', surname='%s', email='%s')>" % (
            self.id, self.username, self.name, self.surname, self.email)

And this is the service being executed to get a login:

class Get(Service):
    """Service class to get a login by id."""
    """Channel /genesisng/logins/{id}/get."""

    class SimpleIO:
        input_required = (Integer('id'))
        # Passwords never travel back to the client side
        output_optional = ('id', 'username', 'name', 'surname', 'email',
                           'is_admin')
        skip_empty_keys = True

    def handle(self):
        conn = self.user_config.genesisng.database.connection
        id_ = self.request.input.id

        # Check whether a copy exists in the cache
        cache_key = 'id-%s' % id_
        cache = self.cache.get_cache('builtin', 'logins')
        result = cache.get(cache_key)
        if result:
            self.response.status_code = OK
            self.response.payload = result
            return

        with closing(self.outgoing.sql.get(conn).session()) as session:
            result = session.query(Login).filter(Login.id == id_).one_or_none()

            if result:
                # Save the record in the cache
                cache.set(cache_key, result)
                self.response.status_code = OK
                self.response.payload = result
            else:
                self.response.status_code = NOT_FOUND

I’m using curl to test the service:

$ curl -v -g "http://127.0.0.1:11223/genesisng/logins/1/get"; echo ""
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 11223 (#0)
> GET /genesisng/logins/1/get HTTP/1.1
> Host: 127.0.0.1:11223
> User-Agent: curl/7.58.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: Zato
< Date: Tue, 06 Nov 2018 12:14:16 GMT
< Connection: close
< Transfer-Encoding: chunked
< Content-Type: application/json
< X-Zato-CID: 9fdd6abef941eec212af8271
< 
* Closing connection 0
{"response": {"username": "jsabater", "surname": "Sabater", "name": "Jaume", "id": 1, "is_admin": true, "email": "jsabater@gmail.com"}}

This is the trace of the exception:

2018-11-06 12:14:16,860 DEBG 'zato-server2' stdout output:
2018-11-06 12:14:16,859 - WARNING - 140:DummyThread-15 - zato.server.connection.cache:727 - Could not run `SET` after_state_changed in cache `logins`, data:`{u'msg_type': '0002', 'expires_at': 0.0, u'expiry': 0.0, u'source_worker_id': u'1.2.140.8f44618863f394688dc495f7', u'value': <Login(id='1', username='jsabater', name='Jaume', surname='Sabater', email='jsabater@gmail.com')>, u'key': u'id-1', u'action': '106420', u'cache_name': u'logins'}`, e:`Traceback (most recent call last):
  File "/opt/zato/3.0/code/zato-server/src/zato/server/connection/cache.py", line 724, in after_state_changed
    self.server.broker_client.publish(data)
  File "/opt/zato/3.0/code/zato-broker/src/zato/broker/client.py", line 164, in publish
    self.pub_client.publish(topic, dumps(msg))
  File "/opt/zato/3.0/code/local/lib/python2.7/site-packages/anyjson/__init__.py", line 141, in dumps
    return implementation.dumps(value, *args, **kwargs)
  File "/opt/zato/3.0/code/local/lib/python2.7/site-packages/anyjson/__init__.py", line 87, in dumps
    return self._encode(data, *args, **kwargs)
  File "/opt/zato/3.0/code/local/lib/python2.7/site-packages/simplejson/__init__.py", line 354, in dumps
    return _default_encoder.encode(obj)
  File "/opt/zato/3.0/code/local/lib/python2.7/site-packages/simplejson/encoder.py", line 262, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/opt/zato/3.0/code/local/lib/python2.7/site-packages/simplejson/encoder.py", line 340, in iterencode
    return _iterencode(o, 0)
  File "/opt/zato/3.0/code/local/lib/python2.7/site-packages/simplejson/encoder.py", line 239, in default
    raise TypeError(repr(o) + " is not JSON serializable")
TypeError: <Login(id='1', username='jsabater', name='Jaume', surname='Sabater', email='jsabater@gmail.com')> is not JSON serializable

I take it that the result that SQLAlchemy returns cannot be stored into the cache directly (I recall it’s a NamedTuple, or something) but I have not been able to find any example of it on the docs.


#3

Using dictalchemy seems to help with the issue. This is my Get service right now (working but unpolished):

class Get(Service):
    """Service class to get a login by id."""
    """Channel /genesisng/logins/{id}/get."""

    class SimpleIO:
        input_required = (Integer('id'))
        # Passwords never travel back to the client side
        output_optional = ('id', 'username', 'name', 'surname', 'email',
                           'is_admin')
        skip_empty_keys = True

    def handle(self):
        conn = self.user_config.genesisng.database.connection
        id_ = self.request.input.id

        # Check whether a copy exists in the cache
        cache_key = 'id-%s' % id_
        cache = self.cache.get_cache('builtin', 'logins')
        login = Login()
        cache_data = cache.get(cache_key)
        result = login.fromdict(cache_data, allow_pk=True) if cache_data else None
        if result:
            self.response.status_code = OK
            self.response.headers['Cache-Control'] = 'public'
            self.response.payload = result
            return

        with closing(self.outgoing.sql.get(conn).session()) as session:
            result = session.query(Login).filter(Login.id == id_).one_or_none()

            if result:
                # Save the record in the cache
                cache.set(cache_key, result.asdict())
                self.response.status_code = OK
                self.response.payload = result
            else:
                self.response.status_code = NOT_FOUND

This is the execution of curl right now:

$ curl -v -g "http://127.0.0.1:11223/genesisng/logins/1/get"; echo ""
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 11223 (#0)
> GET /genesisng/logins/1/get HTTP/1.1
> Host: 127.0.0.1:11223
> User-Agent: curl/7.58.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: Zato
< Date: Tue, 06 Nov 2018 14:23:40 GMT
< Connection: close
< Transfer-Encoding: chunked
< Content-Type: application/json
< X-Zato-CID: 74b8ff3ff0a558a5394884c1
< Cache-Control: public
< 
* Closing connection 0
{"response": {"username": "jsabater", "surname": "Sabater", "name": "Jaume", "id": 1, "is_admin": true, "email": "jsabater@gmail.com"}}

Is this they way to go?