In the code below, I implemented the SpaceSaving frequency estimation algorithm described in this paper. Given a parameter eps
, the algorithm finds all elements of a data stream of length n
that occur more than n/eps
times (with high probability). Here's a screenshot from the paper with the pseudocode:
SpaceSaving algorithm pseudocode
I would appreciate any feedback on my implementation: style, performance, etc.
import math, heapq
class SpaceSavingCounter:
def __init__(self, eps):
self.k = math.ceil(1/eps)
self.n = 0
self.counts = dict()
self.queue = []
def inc(self, x):
# increment total elements seen
self.n += 1
# x is being watched
if x in self.counts:
self.counts[x] += 1
# x is not being watched
else:
# make room for x
if self.n > self.k:
while True:
count, tstamp, key = self.pop()
assert self.counts[key] >= count
if self.counts[key] == count:
del self.counts[key]
break
else:
self.push(self.counts[key], tstamp, key)
else:
count = 0
# watch x
self.counts[x] = count + 1
self.push(count, self.n, x)
def push(self, count, tstamp, key):
heapq.heappush(
self.queue,
(count, tstamp, key)
)
def pop(self):
return heapq.heappop(self.queue)
def test_SpaceSavingCounter():
seq = [1,5,3,4,2,7,7,1,3,1,3,1,3,1,3]
counter = SpaceSavingCounter(1 / 1.9)
for x in seq:
counter.inc(x)
assert counter.counts.keys() == {1,3}
2 Answers 2
Thanks for putting this code online! Two updates for people who may want to use it:
Correctness: The "if self.n> self.k:" is probably mistaken: for instance, when the sequence starts with k times the same element, we are later stuck with a data structure that contains a single counter. Instead, we can use self.k as representing the number of remaining counters with value 0: specifically, replace this line by "if self.k==0:" and just after the "else:" insert "self.k=self.k-1".
Small optimization: above "def push", replace by "self.push(self.counts[x], self.n, x)", i.e. use the latest value of x since we just updated it
Structure seems pretty good, I'd mainly make the follwing changes:
- Use underscores and more desciptive names for internal variables and functions
push
andpop
look like the would have stack-like behaviour, so I turned them into internal methods with more appropriate names.inc
would better be named and work likeupdate
fromCounter
, so I changed that and put the logic into the method_update_element
for each single element.- I refactored the large if by flattening it out and splitting off the replacement logic.
Also, I made the constructor use the capacity instead of the epsilon, but that's just a matter of preference (a solution supporting both, via keyword arguments, would be in order).
class SpaceSavingCounter:
"""
Efficient `Counter`-like structure for approximating the top `m` elements of a stream, in O(m)
space (https://www.cse.ust.hk/~raywong/comp5331/References/EfficientComputationOfFrequentAndTop-kElementsInDataStreams.pdf).
Specifically, the resulting counter will contain the correct counts for the top k elements with
k ≈ m. The interface is the same as `collections.Counter`.
"""
def __init__(self, m):
self._m = m
self._elements_seen = 0
self._counts = Counter() # contains the counts for all elements
self._queue = [] # contains the estimated hits for the counted elements
def _update_element(self, x):
self._elements_seen += 1
if x in self._counts:
self._counts[x] += 1
elif len(self._counts) < self._m:
self._counts[x] = 1
self._heappush(1, self._elements_seen, x)
else:
self._replace_least_element(x)
def _replace_least_element(self, e):
while True:
count, tstamp, key = self._heappop()
assert self._counts[key] >= count
if self._counts[key] == count:
break
else:
self._heappush(self._counts[key], tstamp, key)
del self._counts[key]
self._counts[e] = count + 1
self._heappush(count, self._elements_seen, e)
def _heappush(self, count, tstamp, key):
heapq.heappush(self._queue, (count, tstamp, key))
def _heappop(self):
return heapq.heappop(self._queue)
The other thing is that I would expect the interface to be the same as Counter
. I have turned your counts
into _counts: Counter
anyway, so lets add some methods that just delegate work:
def most_common(self, n=None):
return self._counts.most_common(n)
def elements(self):
return self._counts.elements()
def __len__(self):
return len(self._counts)
def __getitem__(self, key):
return self._counts[key]
def __iter__(self):
return iter(self._counts)
def __contains__(self, item):
return item in self._counts
def __reversed__(self):
return reversed(self._counts)
def items(self):
return self._counts.items()
def keys(self):
return self._counts.keys()
def values(self):
return self._counts.values()
I left out anything that would modify the values; that seems not necessary for this use case (or is left as an exercise).
Finally, a couple more test cases:
def test_SpaceSavingCounter():
ssc = SpaceSavingCounter(2)
ssc.update([1, 5, 3, 4, 2, 7, 7, 1, 3, 1, 3, 1, 3, 1, 3])
assert ssc.keys() == {1, 3}
ssc = SpaceSavingCounter(2)
ssc.update([1, 1, 1, 1, 3, 3, 3, 3, 2, 2, 2, 2, 2])
assert ssc.keys() == {3, 2}
ssc = SpaceSavingCounter(1)
ssc.update([1, 1, 1, 1, 3, 3, 3, 3, 2, 2, 2, 2, 2])
assert ssc.keys() == {2}
ssc = SpaceSavingCounter(3)
ssc.update([1, 1, 1, 1, 3, 3, 3, 3, 2, 2, 2, 2, 2])
assert ssc.keys() == {1, 2, 3}
ssc = SpaceSavingCounter(2)
ssc.update([])
assert ssc.keys() == set()
```
functools.lru_cache
? It's not identical (and maybe that's the reason), but in many cases it serves the same purpose, and it's built-in (aka, fast and reliable) \$\endgroup\$