I've learned. I'll share.
August 12, 2009
More ways to do Reactive Programming in Python
In my last post, I covered a little bit of Rx and how you could a have invented it. But you might invent a different way of doing the same thing. And since most languages don't have anything like LINQ, you might be interested in ways to do things in your programming language that don't require monads.
Let's explore some other ways to do Reactive Programming (Rx).
What is Rx?
Just to remind you what we're trying to accomplish, Rx builds event handlers. The LINQ version of Rx works by making an event look like a query (or stream).It makes sense if you think about it. An event is a stream, of event occurences. A list or enumerator or iterator is also a stream, of values. So if you squint hard enough, you see that events and enumerators are sort of the same thing. In fact, lists, streams, enumerators, events, channels, pipes, sockets, file handles, actors sending you messages are all pretty much the same thing: they are all producers of values.
Consumers and Receivers
Now what do you do with producers of values? You consume them, of course! Usually with something that looks like this (in python):sum = 0 for val in vals: sum += val print sumWhat we've created here is a consumer of vals. We can write it this way, as ordinary code, because vals is very flexible: it's anything that's iterable/enumerable. But what if instead of forcing the producer to be flexible, we forced the consumer to be flexible? What if we could write the consumer like this:
total = 0 while True: total += receive print totalHmm... it sort of looks like the opposite of an iterator/generator/enumerator block. A mathematician might say something about "duals" at this point, but I'm not mathematician, so let's just go ahead and try and implement it. In fact, we'll use python generators to do just that. We'll call this a "receiver" and we'll spell "receive" as "yield receive":
class Event:
def __init__(self):
self.handlers = []
def handle(self, handler):
self.handlers.append(handler)
def fire(self, val = None):
for handler in self.handlers:
handler(val)
receive = object()
def receiver(gen_rcvr):
def gen_and_start_rcvr(*args, **kargs):
rcvr = gen_rcvr(*args, **kargs)
rcvr.send(None)
return rcvr
return gen_and_start_rcvr
@receiver
def sum_r(title):
total = 0
while True:
total += yield receive
print "%s: %d" % (title, total)
@receiver
def count_r(title):
count = 0
while True:
yield receive
count += 1
print "%s: %d" % (title, count)
num_key = Event()
sum_nums = sum_r("total nums")
num_key.handle(sum_nums.send)
num_key.fire(1) #prints "total nums: 1"
num_key.fire(2) #prints "total nums: 3"
num_key.fire(3) #prints "total nums: 6"
It actually works! And because our consumer is very flexible, any producer, like an event, can use it. In fact, it's just a fancy event callback, just like everyrthing else in Rx land.
Remitters
But if we take this one step further and make a receiver wrap an event, we can make a receiver that's also a producer. We'll call it a "remitter", which is sort of like a receiver and an emitter.
class Remitter:
def __init__(self, receiver_from_event_out):
self.receiverFromEventOut = receiver_from_event_out
def __rrshift__(self, event_in):
event_out = Event()
rcvr = self.receiverFromEventOut(event_out)
event_in.handle(rcvr.send)
return event_out
def remitter(gen_rcvr):
def gen_remitter(*args, **kargs):
def receiver_from_event_out(event_out):
rcvr = gen_rcvr(event_out, *args, **kargs)
rcvr.send(None)
return rcvr
return Remitter(receiver_from_event_out)
return gen_remitter
@remitter
def double_detect_r(double_click_event, threshold):
last_click_time = 0
while True:
(yield)
current_click_time = time.time()
if (current_click_time - last_click_time) < threshold: double_click_event.fire() last_click_time = current_click_time @remitter def print_r(_, message): while True: val = (yield) print message mouse_click = Event() mouse_click>> print_r("left")
mouse_click>> double_detect_r(.01)>> print_r("double left")
mouse_click.fire() #prints "left"
time.sleep(.02)
mouse_click.fire() #prints "left"
mouse_click.fire() #prints "left" and "double left"
Great. But it is kind of annoying passing in the event like that. What if we had the remitter yield values out and yield values in?
Remitters that yield out and in
We could do that using little state machines built from python generators. "yield receive" will mean receive and "yield" of anything else will mean "emit".
from collections import defaultdict
class Remitter:
def __init__(self, ritr):
self.ritr = ritr
self.eventOut = Event()
def send(self, val_in):
ritr = self.ritr
event_out = self.eventOut
while True:
val_out = ritr.send(val_in)
if val_out is receive:
break
else:
event_out.fire(val_out)
def handle(self, handler):
self.eventOut.handle(handler)
def handlein(self, *events):
for event in events:
event.handle(self.send)
def __rrshift__(self, event_in):
try:
self.handlein(*event_in)
except:
self.handlein(event_in)
return self
def remitter(gen_rcvr):
def gen_remitter(*args, **kargs):
ritr = gen_rcvr(*args, **kargs)
ritr.send(None)
return Remitter(ritr)
return gen_remitter
@remitter
def double_detect_r(threshold):
last_click_time = 0
while True:
yield receive
current_click_time = time.time()
if (current_click_time - last_click_time) < threshold: yield last_click_time = current_click_time @remitter def map_r(f, *args, **kargs): while True: val = yield receive yield f(val, *args, **kargs) @remitter def print_r(format): while True: val = yield receive print message % val def label_r(label): return map_r(lambda val : (label, val)) @remitter def label_count_r(): count_by_label = defaultdict(int) while True: (label, val) = yield receive count_by_label[label] += 1 yield count_by_label.copy() def fix_click_counts(count_by_label, single_label, double_label): count_by_label[single_label] -= (count_by_label[double_label] * 2) #every double click "cancels" a single click return count_by_label.copy() def print_label_counts(count_by_label, *labels): print ", ".join("%d %s" % (count, label) for (label, count) in count_by_label.iteritems()) mouse_clicks = Event() ([mouse_clicks>> label_r("single"),
mouse_clicks>> double_detect_r(.01)>> label_r("double")]
>> label_count_r()>> map_r(fix_click_counts, "single", "double")>> map_r(print_label_counts))
#prints
#0 double, 1 single
#0 double, 2 single
#0 double, 3 single
#1 double, 1 single
mouse_clicks.fire()
time.sleep(.02)
mouse_clicks.fire()
mouse_clicks.fire()
Sweet. That looks pretty nice. But, it relies on the fact that Python allows you to yield values in to a generator. What if we have a programming language that only allows yielding values out (like any enumerator)?
Remitters that yield in by yielding out
We'll introduce a simple hack to work around that. We'll yield out a mutable "receive" value that "receives" in the value for us.
class Receive:
def __init__(self, val = None):
self.d = val
class Remitter:
def __init__(self, receive, ritr):
self.receive = receive
self.ritr = ritr
self.eventOut = Event()
def send(self, val_in):
self.receive.d = val_in
ritr = self.ritr
event_out = self.eventOut
while True:
val_out = ritr.next()
if isinstance(val_out, Receive):
break
else:
event_out.fire(val_out)
def handle(self, handler):
self.eventOut.handle(handler)
def handlein(self, *events):
for event in events:
event.handle(self.send)
def __rrshift__(self, event_in):
try:
self.handlein(*event_in)
except:
self.handlein(event_in)
return self
def remitter(gen_rcvr):
def gen_remitter(*args, **kargs):
receive = Receive()
ritr = gen_rcvr(receive, *args, **kargs)
ritr.send(None)
return Remitter(receive, ritr)
return gen_remitter
@remitter
def double_detect_r(receive, threshold):
last_click_time = 0
while True:
yield receive
current_click_time = time.time()
gap = current_click_time - last_click_time
if gap < threshold: yield gap last_click_time = current_click_time @remitter def average_r(receive): total = 0.0 count = 0 while True: yield receive total += receive.d count += 1 yield total/count @remitter def print_r(receive, format): while True: yield receive print format % (receive.d) mouse_clicks = Event() mouse_clicks>> double_detect_r(.05)>> average_r()>> print_r("double click; average gap is %s seconds")
mouse_clicks.fire()
time.sleep(.1)
mouse_clicks.fire()
time.sleep(.01)
mouse_clicks.fire() #prints #double click; average gap is 0.01... seconds
time.sleep(.02)
mouse_clicks.fire() #double click; average gap is 0.015... seconds
It works! And it should work in any language with iterator blocks. You could even use this C# instead of using LINQ Rx, but then you'll have to type "yield return receive" :(.