I have written a function that randomly picks one element from a set with different probabilities for each element, anyway it takes a dictionary as argument, the keys of the dictionary are the choices and can be of any type, the values of the dictionary are weights and must be non-zero integers. It randomly chooses a choice based on weights and returns the choice.
I have tried a bunch of different methods and used the most efficient method I have found in this implementation, I intend to call the function with same arguments repeatedly and the dictionaris aren't supposed to change during execution, so I thought memoization would be a good idea, so I did memoize it - using a dict
, and the performance improvement is huge, as expected.
Code
import random
from bisect import bisect
from itertools import accumulate
from typing import Any
cache = dict()
def weighted_choice(dic: dict) -> Any:
mark = id(dic)
if mark not in cache:
if not isinstance(dic, dict):
raise TypeError('The argument of the function should be a dictionary')
choices, weights = zip(*dic.items())
if set(map(type, weights)) != {int}:
raise TypeError('The values of the argument must be integers')
if 0 in weights:
raise ValueError('The values of the argument shouldn\'t contain 0')
accreted = list(accumulate(weights))
cache[mark] = (choices, accreted)
else:
choices, accreted = cache[mark]
chosen = random.random() * accreted[-1]
return choices[bisect(accreted, chosen)]
Example
LETTER_FREQUENCY = {
"a": 80183,
"b": 17558,
"c": 42724,
"d": 28914,
"e": 100093,
"f": 11308,
"g": 20298,
"h": 24064,
"i": 80469,
"j": 1440,
"k": 6103,
"l": 52184,
"m": 28847,
"n": 63640,
"o": 64730,
"p": 29325,
"q": 1741,
"r": 66553,
"s": 54738,
"t": 63928,
"u": 34981,
"v": 8877,
"w": 6020,
"x": 2975,
"y": 19245,
"z": 2997
}
weighted_choice(LETTER_FREQUENCY) # returns a letter and the bigger the weight the more likely the letter will be returned
Performance
#nonmemoized
In [57]: def weighted_choice(dic: dict) -> Any:
...: choices, weights = zip(*dic.items())
...: accreted = list(accumulate(weights))
...: chosen = random.random() * accreted[-1]
...: return choices[bisect(accreted, chosen)]
In [58]: %timeit weighted_choice(LETTER_FREQUENCY)
4.03 μs ± 293 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
#memoized
In [59]: cache = dict()
...:
...: def weighted_choice(dic: dict) -> Any:
...: mark = id(dic)
...: if mark not in cache:
...: if not isinstance(dic, dict):
...: raise TypeError('The argument of the function should be a dictionary')
...: choices, weights = zip(*dic.items())
...: if set(map(type, weights)) != {int}:
...: raise TypeError('The values of the argument must be integers')
...: if 0 in weights:
...: raise ValueError('The values of the argument shouldn\'t contain 0')
...: accreted = list(accumulate(weights))
...: cache[mark] = (choices, accreted)
...: else:
...: choices, accreted = cache[mark]
...: chosen = random.random() * accreted[-1]
...: return choices[bisect(accreted, chosen)]
In [60]: %timeit weighted_choice(LETTER_FREQUENCY)
702 ns ± 25.5 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
I want to know how to properly implement the memoization, in this function there are two properties that need to be remembered for each input, assuming the variables will never change which in practice is true; The properties are choices
and accreted
(the accumulated weights), they are unique to each input, determined by the inputs and won't change.
However the function is non-deterministic, it should return different outputs for same inputs, there is one and only one non-deterministic variable, the chosen
variable, and the output is determined by the random variable.
The only memoization technique I know of is functools.lrucache
decorator and the functions wrapped by it return same outputs for same inputs which is obviously not suitable here, unless I write a helper function that processes the inputs which as you see consists the bulk of the function, I think this is excessive and Python function calls are expensive.
So what is the proper way to memoize the function?
1 Answer 1
I'd prefer a separation between "distribution objects" and sampling.
class WeightedDiscrete:
def __init__(self, weighted_items: dict):
if not isinstance(weighted_items, dict):
raise TypeError('The argument of the function should be a dictionary')
items, weights = weighted_items.keys(), weighted_items.values()
if any(not isinstance(w, int) for w in weights):
raise TypeError('The values of the argument must be integers')
if any(w <= 0 for w in weights):
raise ValueError('The values of the argument be less than 0')
self.items = list(items)
self.accreted = list(accumulate(weights))
def choice(self):
chosen = random.random() * self.accreted[-1]
return self.items[bisect(self.accreted, chosen)]
Given that, you can manage caching the WeightedDiscrete
object given different weighted_items
separately, and the cache design becomes an orthogonal problem.
lru_cache
unfortunately still won't work, since dicts are unhashable, but for the specific use a custom id-based cache might be appropriate:
class IdCache:
def __init__(self):
self.cache = dict()
def __setitem__(self, key, value):
self.cache[id(key)] = value
def __getitem__(self, key):
return self.cache[id(key)]
def __contains__(self, key):
return id(key) in self.cache
cache = IdCache()
def get_distribution(weighted_items):
if weighted_items not in cache:
w = WeightedDiscrete(weighted_items)
cache[weighted_items] = w
return cache[weighted_items]
In case you have a lot of different distributions, the WeightedDiscrete
class could perhaps be further optimized by inheriting form a named tuple to reduce space.
Explore related questions
See similar questions with these tags.
p
argument to numpy's choice ? \$\endgroup\$