3
\$\begingroup\$

In the code below, I implemented the SpaceSaving frequency estimation algorithm described in this paper. Given a parameter eps, the algorithm finds all elements of a data stream of length n that occur more than n/eps times (with high probability). Here's a screenshot from the paper with the pseudocode:

SpaceSaving algorithm pseudocode

I would appreciate any feedback on my implementation: style, performance, etc.

import math, heapq
class SpaceSavingCounter:
 def __init__(self, eps):
 self.k = math.ceil(1/eps)
 self.n = 0
 self.counts = dict()
 self.queue = []
 def inc(self, x):
 # increment total elements seen
 self.n += 1
 # x is being watched
 if x in self.counts:
 self.counts[x] += 1
 # x is not being watched
 else:
 # make room for x
 if self.n > self.k:
 while True:
 count, tstamp, key = self.pop()
 assert self.counts[key] >= count
 if self.counts[key] == count:
 del self.counts[key]
 break
 else:
 self.push(self.counts[key], tstamp, key)
 else:
 count = 0
 # watch x
 self.counts[x] = count + 1
 self.push(count, self.n, x)
 def push(self, count, tstamp, key):
 heapq.heappush(
 self.queue,
 (count, tstamp, key)
 )
 def pop(self):
 return heapq.heappop(self.queue)
def test_SpaceSavingCounter():
 seq = [1,5,3,4,2,7,7,1,3,1,3,1,3,1,3]
 counter = SpaceSavingCounter(1 / 1.9)
 for x in seq:
 counter.inc(x)
 assert counter.counts.keys() == {1,3}
asked Oct 7, 2018 at 3:44
\$\endgroup\$
1
  • 1
    \$\begingroup\$ It's fine if you're just implementing this for fun/school/practice, but if you're in Python 3+ (based on your tag), why not just use functools.lru_cache? It's not identical (and maybe that's the reason), but in many cases it serves the same purpose, and it's built-in (aka, fast and reliable) \$\endgroup\$ Commented Oct 8, 2018 at 15:17

2 Answers 2

3
\$\begingroup\$

Thanks for putting this code online! Two updates for people who may want to use it:

  1. Correctness: The "if self.n> self.k:" is probably mistaken: for instance, when the sequence starts with k times the same element, we are later stuck with a data structure that contains a single counter. Instead, we can use self.k as representing the number of remaining counters with value 0: specifically, replace this line by "if self.k==0:" and just after the "else:" insert "self.k=self.k-1".

  2. Small optimization: above "def push", replace by "self.push(self.counts[x], self.n, x)", i.e. use the latest value of x since we just updated it

answered Aug 12, 2019 at 5:15
\$\endgroup\$
2
\$\begingroup\$

Structure seems pretty good, I'd mainly make the follwing changes:

  • Use underscores and more desciptive names for internal variables and functions
  • push and pop look like the would have stack-like behaviour, so I turned them into internal methods with more appropriate names.
  • inc would better be named and work like update from Counter, so I changed that and put the logic into the method _update_element for each single element.
  • I refactored the large if by flattening it out and splitting off the replacement logic.

Also, I made the constructor use the capacity instead of the epsilon, but that's just a matter of preference (a solution supporting both, via keyword arguments, would be in order).

class SpaceSavingCounter:
 """
 Efficient `Counter`-like structure for approximating the top `m` elements of a stream, in O(m)
 space (https://www.cse.ust.hk/~raywong/comp5331/References/EfficientComputationOfFrequentAndTop-kElementsInDataStreams.pdf).
 Specifically, the resulting counter will contain the correct counts for the top k elements with
 k ≈ m. The interface is the same as `collections.Counter`.
 """
 def __init__(self, m):
 self._m = m
 self._elements_seen = 0
 self._counts = Counter() # contains the counts for all elements
 self._queue = [] # contains the estimated hits for the counted elements
 def _update_element(self, x):
 self._elements_seen += 1
 if x in self._counts:
 self._counts[x] += 1
 elif len(self._counts) < self._m:
 self._counts[x] = 1
 self._heappush(1, self._elements_seen, x)
 else:
 self._replace_least_element(x)
 def _replace_least_element(self, e):
 while True:
 count, tstamp, key = self._heappop()
 assert self._counts[key] >= count
 if self._counts[key] == count:
 break
 else:
 self._heappush(self._counts[key], tstamp, key)
 del self._counts[key]
 self._counts[e] = count + 1
 self._heappush(count, self._elements_seen, e)
 def _heappush(self, count, tstamp, key):
 heapq.heappush(self._queue, (count, tstamp, key))
 def _heappop(self):
 return heapq.heappop(self._queue)

The other thing is that I would expect the interface to be the same as Counter. I have turned your counts into _counts: Counter anyway, so lets add some methods that just delegate work:

 def most_common(self, n=None):
 return self._counts.most_common(n)
 def elements(self):
 return self._counts.elements()
 def __len__(self):
 return len(self._counts)
 def __getitem__(self, key):
 return self._counts[key]
 def __iter__(self):
 return iter(self._counts)
 def __contains__(self, item):
 return item in self._counts
 def __reversed__(self):
 return reversed(self._counts)
 def items(self):
 return self._counts.items()
 def keys(self):
 return self._counts.keys()
 def values(self):
 return self._counts.values()

I left out anything that would modify the values; that seems not necessary for this use case (or is left as an exercise).

Finally, a couple more test cases:

def test_SpaceSavingCounter():
 ssc = SpaceSavingCounter(2)
 ssc.update([1, 5, 3, 4, 2, 7, 7, 1, 3, 1, 3, 1, 3, 1, 3])
 assert ssc.keys() == {1, 3}
 ssc = SpaceSavingCounter(2)
 ssc.update([1, 1, 1, 1, 3, 3, 3, 3, 2, 2, 2, 2, 2])
 assert ssc.keys() == {3, 2}
 ssc = SpaceSavingCounter(1)
 ssc.update([1, 1, 1, 1, 3, 3, 3, 3, 2, 2, 2, 2, 2])
 assert ssc.keys() == {2}
 ssc = SpaceSavingCounter(3)
 ssc.update([1, 1, 1, 1, 3, 3, 3, 3, 2, 2, 2, 2, 2])
 assert ssc.keys() == {1, 2, 3}
 ssc = SpaceSavingCounter(2)
 ssc.update([])
 assert ssc.keys() == set()
```
answered Apr 27, 2021 at 9:23
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.