I have searched around but I was not able to find a complete implementation of the memoize pattern in a multithreaded context. It's the first time playing with thread synchronization. The code seems to works, but I am not able to test it extensively, in particular to find if there are race conditions. Do you think there is a better implementation, maybe using more advanced synchronization mechanism?
from threading import Thread, Lock, current_thread
import thread
def memoize(obj):
import functools
cache = obj.cache = {}
lock_cache = obj.lock_cache = Lock()
@functools.wraps(obj)
def memoizer(*args, **kwargs):
key = str(args) + str(kwargs)
cached_result = None
item_lock = None
with lock_cache:
# do the minimal needed things here: just look inside the cache,
# return if present, otherwise create and acquire lock specific to a
# particular computation. Postpone the actual computation, do not
# block other computation.
cached_result = cache.get(key, None)
if cached_result is not None:
if type(cached_result) is not thread.LockType:
return cached_result
else:
item_lock = Lock()
item_lock.acquire()
cache[key] = item_lock
if item_lock is not None:
# if we are here it means we have created the lock: do the
# computation, return and release.
result = obj(*args, **kwargs)
with lock_cache:
cache[key] = result
item_lock.release()
return result
if type(cached_result) is thread.LockType:
# if we are here it means that other thread have created the
# locks and are doing the computation, just wait
with cached_result:
return cache[key]
return memoizer
Here a little test:
import time
@memoize
def f(x):
time.sleep(2)
return x * 10
lock_write = Lock()
def worker(x):
result = f(x)
with lock_write:
print current_thread(), x, result
threads = []
start = time.time()
for i in (0, 1, 1, 2, 0, 0, 0, 1, 3):
t = Thread(target=worker, args=(i,))
threads.append(t)
t.start()
time.sleep(1)
for i in (0, 1, 1, 2, 0, 0, 0, 1, 3):
t = Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print time.time() - start
I get the correct results and it runs in 2 seconds, as expected.
3 Answers 3
1. Review
There's no docstring. What does
memoize
do, and how do I use it?The code uses
str(args) + str(kwargs)
as the cache key. But this means that, for example, the number1
and the string"1"
will collide.The code is not portable to Python 3, where there's no
thread
module.It's not clear to me why
functools
is only imported insidememoize
, butthreading
is imported at top level.It's not clear to me why the code assigns to
obj.cache
andobj.lock_cache
. What ifobj
already has acache
property?The locking logic is hard to follow and needs comments. As far as I can see,
lock_cache
protects thecache
dictionary; anditem_cache
protects the cache slot that is currently being computed. Its necessary to have two locks because the call toobj
might be recursive and so need to re-entermemoizer
.The code uses three states for each key: (i) not present in the cache; (ii) currently being computed; (iii) present in the cache. The way it distinguishes between (ii) and (iii) is by checking to see if the type is
thread.LockType
. But this means that you can't usememoize
on a function that might return a lock. It would be better to have a more robust mechanism for distinguishing these cases.The naming is inconsistent:
lock_cache
versusitem_lock
.key
is looked up incache
multiple times on some code paths.
2. Revised code
from functools import wraps
from threading import Lock
class CacheEntry: pass
def memoized(f):
"""Decorator that caches a function's result each time it is called.
If called later with the same arguments, the cached value is
returned, and not re-evaluated. Access to the cache is
thread-safe.
"""
cache = {} # Map from key to CacheEntry
cache_lock = Lock() # Guards cache
@wraps(f)
def memoizer(*args, **kwargs):
key = args, tuple(kwargs.items())
result_lock = None
with cache_lock:
try:
entry = cache[key]
except KeyError:
entry = cache[key] = CacheEntry()
result_lock = entry.lock = Lock() # Guards entry.result
result_lock.acquire()
if result_lock:
# Compute the new entry without holding cache_lock, to avoid
# deadlock if the call to f is recursive (in which case
# memoizer is re-entered).
result = entry.result = f(*args, **kwargs)
result_lock.release()
return result
else:
# Wait on entry.lock without holding cache_lock, to avoid
# blocking other threads that need to use the cache.
with entry.lock:
return entry.result
return memoizer
Notes:
This is untested.
This doesn't delete
result_lock
after the result is computed. This simplifies the logic (for example, there's no need to takecache_lock
again and no need for a type check) at the cost of 40 bytes to keep theLock
object around.
-
\$\begingroup\$ thank you. Maybe there is a problem, suppose I call:
f(1); f(1); f(2)
. The last call is blocked bycache_lock
acquired by the second call which is blocked byentry.lock
acquired by the first call. Butf(1)
andf(2)
should run in parallel. \$\endgroup\$Ruggero Turra– Ruggero Turra2015年05月24日 23:05:30 +00:00Commented May 24, 2015 at 23:05 -
\$\begingroup\$ I have updated the test code: with my solution I get 2 second, with yours 4. \$\endgroup\$Ruggero Turra– Ruggero Turra2015年05月24日 23:27:11 +00:00Commented May 24, 2015 at 23:27
-
\$\begingroup\$ This shows why you need comments in multi-threaded code! \$\endgroup\$Gareth Rees– Gareth Rees2015年05月24日 23:29:29 +00:00Commented May 24, 2015 at 23:29
-
\$\begingroup\$ Thank you, your last version works. I have commented my code. \$\endgroup\$Ruggero Turra– Ruggero Turra2015年05月24日 23:39:38 +00:00Commented May 24, 2015 at 23:39
For the posterity:
- Multiple fixes have been applied during the following years in the
functools.lru_cache
built-in memoize function (one example) - CPython
test_functools.py
includes tests that cover many multithreaded cases.
So it seems that you can now simply use this syntax to create a thread-safe memoizer:
import functools
import time
@functools.cache
def f(x):
time.sleep(2)
return x * 10
Dictionaries don't guarantee consistent ordrering of .items()
, so tuple(kwargs.items())
might return differently-ordered tuples for the same dict, potentially causing duplication in your cache. frozenset(kwargs.items())
will work, but assumes the kwarg values are hashable—but, it looks like you already assume that your args are hashable, so this is probably fine here.
Turns out a general-purpose argument-based cache key for potentially non-hashable arguments is pretty hard to come by, so if you really want a general-purpose memoizer, you might consider adding an optional key function parameter to your decorator (Python 3 syntax):
def freeze(*args, **kwargs):
return args, frozenset(kwargs.items())
def memoized(f=None, *, key_func=freeze):
# Allow this parameterized decorator to work without being called
if func is None:
return partial(memoized, key_func=key_func)
...
@wraps(f)
def memoizer(*args, **kwargs):
key = key_func(*args, **kwargs)
...
return memoizer
Then you can use it with our without that parameter:
@memoized
def f():
pass
@memoized(key_func=magic_serializer)
def g():
pass
# Parentheses are legal but unnecessary here
@memoized()
def h():
pass
(Or you just make sure everything is immutable and hashable and all your problems will disappear :)
I've got a recursive version of freeze
that handles most common Python data structures lying around somewhere; I'll see if I can dig it up later. (Ping me if you're curious and it looks like I've forgotten.)
Explore related questions
See similar questions with these tags.