Recipes#
Asynchronous Data Updates with ORM Events#
This recipe presents one technique of optimistically pushing new data into the cache when an update is sent to a database.
Using SQLAlchemy for database querying, suppose a simple cache-decorated function returns the results of a database query:
@region.cache_on_arguments()
def get_some_data(argument):
# query database to get data
data = Session().query(DBClass).filter(DBClass.argument == argument).all()
return data
We would like this particular function to be re-queried when the data
has changed. We could call get_some_data.invalidate(argument, hard=False)
at the point at which the data changes, however this only
leads to the invalidation of the old value; a new value is not generated until
the next call, and also means at least one client has to block while the
new value is generated. We could also call
get_some_data.refresh(argument)
, which would perform the data refresh
at that moment, but then the writer is delayed by the re-query.
A third variant is to instead offload the work of refreshing for this query
into a background thread or process. This can be acheived using
a system such as the CacheRegion.async_creation_runner
.
However, an expedient approach for smaller use cases is to link cache refresh
operations to the ORM session’s commit, as below:
from sqlalchemy import event
from sqlalchemy.orm import Session
def cache_refresh(session, refresher, *args, **kwargs):
"""
Refresh the functions cache data in a new thread. Starts refreshing only
after the session was committed so all database data is available.
"""
assert isinstance(session, Session), \
"Need a session, not a sessionmaker or scoped_session"
@event.listens_for(session, "after_commit")
def do_refresh(session):
t = Thread(target=refresher, args=args, kwargs=kwargs)
t.daemon = True
t.start()
Within a sequence of data persistence, cache_refresh
can be called
given a particular SQLAlchemy Session
and a callable to do the work:
def add_new_data(session, argument):
# add some data
session.add(something_new(argument))
# add a hook to refresh after the Session is committed.
cache_refresh(session, get_some_data.refresh, argument)
Note that the event to refresh the data is associated with the Session
being used for persistence; however, the actual refresh operation is called
with a different Session
, typically one that is local to the refresh
operation, either through a thread-local registry or via direct instantiation.
Prefixing all keys in Redis#
If you use a redis instance as backend that contains other keys besides the ones set by dogpile.cache, it is a good idea to uniquely prefix all dogpile.cache keys, to avoid potential collisions with keys set by your own code. This can easily be done using a key mangler function:
from dogpile.cache import make_region
region = make_region(
key_mangler=lambda key: "myapp:dogpile:" + key
)
Encoding/Decoding data into another format#
Since dogpile is managing cached data, you may be concerned with the size of your payloads. A possible method of helping minimize payloads is to use a ProxyBackend to recode the data on-the-fly or otherwise transform data as it enters or leaves persistent storage.
In the example below, we define 2 classes to implement msgpack encoding. Msgpack
(http://msgpack.org/) is a serialization format that works exceptionally well
with json-like data and can serialize nested dicts into a much smaller payload
than Python’s own pickle. _EncodedProxy
is our base class
for building data encoders, and inherits from dogpile’s own ProxyBackend. You
could just use one class. This class passes 4 of the main key/value functions
into a configurable decoder and encoder. The MsgpackProxy
class simply
inherits from _EncodedProxy
and implements the necessary value_decode
and value_encode
functions.
Encoded ProxyBackend Example:
from dogpile.cache.proxy import ProxyBackend
import msgpack
class _EncodedProxy(ProxyBackend):
"""base class for building value-mangling proxies"""
def value_decode(self, value):
raise NotImplementedError("override me")
def value_encode(self, value):
raise NotImplementedError("override me")
def set(self, k, v):
v = self.value_encode(v)
self.proxied.set(k, v)
def get(self, key):
v = self.proxied.get(key)
return self.value_decode(v)
def set_multi(self, mapping):
"""encode to a new dict to preserve unencoded values in-place when
called by `get_or_create_multi`
"""
mapping_set = {}
for (k, v) in mapping.iteritems():
mapping_set[k] = self.value_encode(v)
return self.proxied.set_multi(mapping_set)
def get_multi(self, keys):
results = self.proxied.get_multi(keys)
translated = []
for record in results:
try:
translated.append(self.value_decode(record))
except Exception as e:
raise
return translated
class MsgpackProxy(_EncodedProxy):
"""custom decode/encode for value mangling"""
def value_decode(self, v):
if not v or v is NO_VALUE:
return NO_VALUE
# you probably want to specify a custom decoder via `object_hook`
v = msgpack.unpackb(payload, encoding="utf-8")
return CachedValue(*v)
def value_encode(self, v):
# you probably want to specify a custom encoder via `default`
v = msgpack.packb(payload, use_bin_type=True)
return v
# extend our region configuration from above with a 'wrap'
region = make_region().configure(
'dogpile.cache.pylibmc',
expiration_time = 3600,
arguments = {
'url': ["127.0.0.1"],
},
wrap = [MsgpackProxy, ]
)