I've learned. I'll share.

Showing posts with label c#. Show all posts
Showing posts with label c#. Show all posts

August 14, 2009

The Code for Reactive Programming in Python

I packaged some python code that you can run for each of the steps I've shown in my articles about Reactive Programming, how you could have invented it and more ways to do it. I apologize that it's not in a more usable form. Go ahead and copy and paste to wherever you like. If you put it online somewhere more convenient, just put a link in the comments. I put the control of which example runs at the very end. Just comment the appropriate line for the example you want to run. And, here it is:

import time
## simplified event stuff 
class Event:
 def __init__(self):
 self.handlers = []
 def handle(self, handler):
 self.handlers.append(handler)
 return self #so += will work
 def fire(self, val = None):
 for handler in self.handlers:
 handler(val)
def echo(val):
 print val
 return val
def simple_click_event_example():
 click = Event()
 click.handle(echo)
 click.fire("left") #prints "left"
def click_event_manipulation_example():
 def left_double_click_from_click(click, threshold):
 dlclick = Event()
 last_lclick_time = [0] #closure hack
 def click_handler(click_value):
 if click_value == "left":
 lclick_time = time.time()
 if (lclick_time - last_lclick_time[0]) < threshold: dlclick.fire("double left") last_lclick_time[0] = lclick_time click.handle(click_handler) return dlclick click = Event() dlclick = left_double_click_from_click(click, .01) dlclick.handle(echo) click.fire("left") time.sleep(.02) click.fire("left") click.fire("right") click.fire("left") #prints "double left" class EventFireRecord: def __init__(self, value, time): self.value = value self.time = time def click_event_maniuplation_refactored_example(): def doubleize_event(evt, threshold, combine): double_evt = Event() last_fire = EventFireRecord(None, 0) def evt_handler(value): fire_time = time.time() if (fire_time - last_fire.time) < threshold: double_evt.fire(combine(last_fire.value, value)) last_fire.__init__(value, fire_time) evt.handle(evt_handler) return double_evt def filter_event(evt, predicate): filtered_evt = Event() def evt_handler(value): if predicate(value): filtered_evt.fire(value) evt.handle(evt_handler) return filtered_evt click = Event() lclick = filter_event(click, lambda value : value == "left") dlclick = doubleize_event(lclick, .01, lambda click1, click2 : "double left") dlclick.handle(echo) click.fire("left") time.sleep(.02) click.fire("left") click.fire("right") click.fire("left") #prints "double click" def click_event_handle_with_result_example(): def handle_with_result(evt, handler_with_result): evt_out = Event() def handler(value): result = handler_with_result(value) if result is not None: evt_out.fire(result) evt.handle(handler) return evt_out def doubleize_r(evt, threshold): last_fire = EventFireRecord(None, 0) def handler(value): fire_time = time.time() try: if (fire_time - last_fire.time) < threshold: return (last_fire.value, value) finally: last_fire.__init__(value, fire_time) return handle_with_result(evt, handler) def filter_r(evt, predicate): def handler(value): if predicate(value): return value return handle_with_result(evt, handler) clicks = Event() dlclicks = doubleize_r(filter_r(click, lambda value : value == "left"), .01) dlclicks.handle(echo) clicks.fire("left") time.sleep(.02) clicks.fire("left") clicks.fire("right") clicks.fire("left") #prints ("left", "left") def click_event_choosing_by_returning_event_example(): def handle_with_result(evt, handler_with_result): evt_out = Event() def handler(value): result = handler_with_result(value) if result is None: pass #ignore elif isinstance(result, Event): result.handle(evt_out.fire) elif isinstance(result, list): for value_out in result: evt_out.fire(value_out) else: evt_out.fire(result) evt.handle(handler) return evt_out def filter_r(evt, predicate): def handler(value): if predicate(value): return value return handle_with_result(evt, handler) def value_filter_r(evt, value): return filter_r(evt, lambda val : val == value) def click_choose_r(keys, clicks): def key_handler(char): #TODO: unsubscribe from event after either "l" or "r" if char == "l": return value_filter_r(clicks, "left") elif char == "r": return value_filter_r(clicks, "right") elif char == "f": return ["fake", "fake"] return handle_with_result(keys, key_handler) keys = Event() clicks = Event() choosen_clicks = click_choose_r(keys, clicks) def click_event_looks_like_streams_example(): class Event: def __init__(self): self.handlers = [] def handle(self, handler): self.handlers.append(handler) return self #so += will work def fire(self, val = None): for handler in self.handlers: handler(val) def bind(evt, handler_with_result): evt_out = Event() def handler(value): result = handler_with_result(value) if result is not None: Event.unit(result).handle(evt_out.fire) evt.handle(handler) return evt_out @classmethod def unit(cls, val): if isinstance(val, cls): return val elif isinstance(val, list): return MockEvent(*val) else: return MockEvent(val) __rshift__ = bind class MockEvent: def __init__(self, *vals): self.vals = vals def handle(self, handler): for val in self.vals: handler(val) def doublize_r(threshold, combine): last_fire = EventFireRecord(None, 0) def handler(value): fire_time = time.time() try: if (fire_time - last_fire.time) < threshold: return combine(last_fire.value, value) finally: last_fire.__init__(value, fire_time) return handler def filter_r(predicate): def handler(value): if predicate(value): return value return handler def value_filter_r(value): return filter_r(lambda val : val == value) def click_choose_r(**clicks_by_char): def key_handler(char): return clicks_by_char.get(char) return key_handler clicks = Event() keys = Event() dlclicks = clicks>> value_filter_r("left")>> doublize_r(.01, lambda l1, l2: "double left")
 keys>> click_choose_r(d = dlclicks, f = ["fake", "fake"])>> echo
 clicks.fire("left")
 clicks.fire("left")
 keys.fire("f") #prints "fake" and then "fake" again
 keys.fire("d")
 clicks.fire("right")
 clicks.fire("right")
 time.sleep(.02)
 clicks.fire("left")
 clicks.fire("left") #print ("double left")
## basic consumer (receiver) using generators
receive = object()
def receiver_example():
 def receiver(gen_rcvr):
 def gen_and_start_rcvr(*args, **kargs):
 rcvr = gen_rcvr(*args, **kargs)
 rcvr.send(None)
 return rcvr
 return gen_and_start_rcvr
 @receiver
 def sum_r(title):
 total = 0
 while True:
 total += yield receive
 print "%s: %d" % (title, total)
 @receiver
 def count_r(title):
 count = 0
 while True:
 yield receive
 count += 1
 print "%s: %d" % (title, count)
 num_key = Event()
 sum_nums = sum_r("total nums")
 num_key.handle(sum_nums.send)
 num_key.fire(1) #prints "total nums: 1"
 num_key.fire(2) #prints "total nums: 3" 
 num_key.fire(3) #prints "total nums: 6"
## make retiterators that can also output values via an event fire
def remitter_example():
 class Remitter:
 def __init__(self, receiver_from_event_out):
 self.receiverFromEventOut = receiver_from_event_out
 def __rrshift__(self, event_in):
 event_out = Event()
 rcvr = self.receiverFromEventOut(event_out)
 event_in.handle(rcvr.send)
 return event_out
 def remitter(gen_rcvr):
 def gen_remitter(*args, **kargs):
 def receiver_from_event_out(event_out):
 rcvr = gen_rcvr(event_out, *args, **kargs)
 rcvr.send(None)
 return rcvr
 return Remitter(receiver_from_event_out)
 return gen_remitter
 @remitter
 def double_detect_r(double_click_event, threshold):
 last_click_time = 0
 while True:
 yield receive
 current_click_time = time.time()
 if (current_click_time - last_click_time) < threshold: double_click_event.fire() last_click_time = current_click_time @remitter def print_r(_, message): while True: val = yield receive print message mouse_click = Event() mouse_click>> print_r("left")
 mouse_click>> double_detect_r(.01)>> print_r("double left")
 mouse_click.fire() #prints "left"
 time.sleep(.02)
 mouse_click.fire() #prints "left"
 mouse_click.fire() #prints "left" and "double left"
## make retiterators out of generators that can send and receive
def remitter_example_yield_out():
 from collections import defaultdict
 class Remitter:
 def __init__(self, ritr):
 self.ritr = ritr
 self.eventOut = Event()
 def send(self, val_in):
 ritr = self.ritr
 event_out = self.eventOut
 while True:
 val_out = ritr.send(val_in)
 if val_out is receive:
 break
 else:
 event_out.fire(val_out) 
 def handle(self, handler):
 self.eventOut.handle(handler)
 def handlein(self, *events):
 for event in events:
 event.handle(self.send)
 def __rrshift__(self, event_in):
 try:
 self.handlein(*event_in)
 except:
 self.handlein(event_in)
 return self
 def remitter(gen_rcvr):
 def gen_remitter(*args, **kargs):
 ritr = gen_rcvr(*args, **kargs)
 ritr.send(None)
 return Remitter(ritr)
 return gen_remitter
 @remitter
 def double_detect_r(threshold):
 last_click_time = 0
 while True:
 yield receive
 current_click_time = time.time()
 if (current_click_time - last_click_time) < threshold: yield last_click_time = current_click_time @remitter def map_r(f, *args, **kargs): while True: val = yield receive yield f(val, *args, **kargs) @remitter def print_r(format): while True: val = yield receive print message % val def label_r(label): return map_r(lambda val : (label, val)) @remitter def label_count_r(): count_by_label = defaultdict(int) while True: (label, val) = yield receive count_by_label[label] += 1 yield count_by_label.copy() def fix_click_counts(count_by_label, single_label, double_label): count_by_label[single_label] -= (count_by_label[double_label] * 2) #every double left "cancels" a single click return count_by_label.copy() def print_label_counts(count_by_label, *labels): print ", ".join("%d %s" % (count, label) for (label, count) in count_by_label.iteritems()) mouse_clicks = Event() ([mouse_clicks>> label_r("single"),
 mouse_clicks>> double_detect_r(.01)>> label_r("double")] 
>> label_count_r()>> map_r(fix_click_counts, "single", "double")>> map_r(print_label_counts))
 #prints
 #0 double, 1 single
 #0 double, 2 single
 #0 double, 3 single
 #1 double, 1 single
 mouse_clicks.fire() 
 time.sleep(.02)
 mouse_clicks.fire() 
 mouse_clicks.fire()
def remitter_without_yield_in_hack_example():
 class Receive:
 def __init__(self, val = None):
 self.d = val
 class Remitter:
 def __init__(self, receive, ritr):
 self.receive = receive
 self.ritr = ritr
 self.eventOut = Event()
 def send(self, val_in):
 self.receive.d = val_in
 ritr = self.ritr
 event_out = self.eventOut
 while True:
 val_out = ritr.next()
 if isinstance(val_out, Receive):
 break
 else:
 event_out.fire(val_out)
 def handle(self, handler):
 self.eventOut.handle(handler)
 def handlein(self, *events):
 for event in events:
 event.handle(self.send)
 def __rrshift__(self, event_in):
 try:
 self.handlein(*event_in)
 except:
 self.handlein(event_in)
 return self
 def remitter(gen_rcvr):
 def gen_remitter(*args, **kargs):
 receive = Receive()
 ritr = gen_rcvr(receive, *args, **kargs)
 ritr.send(None)
 return Remitter(receive, ritr)
 return gen_remitter
 @remitter
 def double_detect_r(receive, threshold):
 last_click_time = 0
 while True:
 yield receive
 current_click_time = time.time()
 gap = current_click_time - last_click_time
 if gap < threshold: yield gap last_click_time = current_click_time @remitter def average_r(receive): total = 0.0 count = 0 while True: yield receive total += receive.d count += 1 yield total/count @remitter def print_r(receive, format): while True: yield receive print format % (receive.d) mouse_clicks = Event() mouse_clicks>> double_detect_r(.05)>> average_r()>> print_r("double left; average gap is %s seconds")
 mouse_clicks.fire() 
 time.sleep(.1)
 mouse_clicks.fire() 
 time.sleep(.01)
 mouse_clicks.fire() #prints #double left; average gap is 0.01... seconds
 time.sleep(.02) 
 mouse_clicks.fire() #double left; average gap is 0.015... seconds
if __name__ == "__main__":
 #simple_click_event_example()
 #click_event_manipulation_example()
 #click_event_maniuplation_refactored_example()
 #click_event_handle_with_result_example()
 #click_event_choosing_by_returning_event_example()
 #click_event_looks_like_streams_example()
 #remitter_example()
 #remitter_example_yield_out()
 remitter_without_yield_in_hack_example()

August 12, 2009

More ways to do Reactive Programming in Python

In my last post, I covered a little bit of Rx and how you could a have invented it. But you might invent a different way of doing the same thing. And since most languages don't have anything like LINQ, you might be interested in ways to do things in your programming language that don't require monads.

Let's explore some other ways to do Reactive Programming (Rx).

What is Rx?

Just to remind you what we're trying to accomplish, Rx builds event handlers. The LINQ version of Rx works by making an event look like a query (or stream).

It makes sense if you think about it. An event is a stream, of event occurences. A list or enumerator or iterator is also a stream, of values. So if you squint hard enough, you see that events and enumerators are sort of the same thing. In fact, lists, streams, enumerators, events, channels, pipes, sockets, file handles, actors sending you messages are all pretty much the same thing: they are all producers of values.

Consumers and Receivers

Now what do you do with producers of values? You consume them, of course! Usually with something that looks like this (in python):
sum = 0
for val in vals:
 sum += val
 print sum
What we've created here is a consumer of vals. We can write it this way, as ordinary code, because vals is very flexible: it's anything that's iterable/enumerable. But what if instead of forcing the producer to be flexible, we forced the consumer to be flexible? What if we could write the consumer like this:
total = 0
while True:
 total += receive
 print total
Hmm... it sort of looks like the opposite of an iterator/generator/enumerator block. A mathematician might say something about "duals" at this point, but I'm not mathematician, so let's just go ahead and try and implement it. In fact, we'll use python generators to do just that. We'll call this a "receiver" and we'll spell "receive" as "yield receive":
class Event:
 def __init__(self):
 self.handlers = []
 def handle(self, handler):
 self.handlers.append(handler)
 def fire(self, val = None):
 for handler in self.handlers:
 handler(val)
receive = object()
def receiver(gen_rcvr):
 def gen_and_start_rcvr(*args, **kargs):
 rcvr = gen_rcvr(*args, **kargs)
 rcvr.send(None)
 return rcvr
 return gen_and_start_rcvr
@receiver
def sum_r(title):
 total = 0
 while True:
 total += yield receive
 print "%s: %d" % (title, total)
@receiver
def count_r(title):
 count = 0
 while True:
 yield receive
 count += 1
 print "%s: %d" % (title, count)
num_key = Event()
sum_nums = sum_r("total nums")
num_key.handle(sum_nums.send)
num_key.fire(1) #prints "total nums: 1"
num_key.fire(2) #prints "total nums: 3" 
num_key.fire(3) #prints "total nums: 6"
It actually works! And because our consumer is very flexible, any producer, like an event, can use it. In fact, it's just a fancy event callback, just like everyrthing else in Rx land.

Remitters

But if we take this one step further and make a receiver wrap an event, we can make a receiver that's also a producer. We'll call it a "remitter", which is sort of like a receiver and an emitter.
class Remitter:
 def __init__(self, receiver_from_event_out):
 self.receiverFromEventOut = receiver_from_event_out
 def __rrshift__(self, event_in):
 event_out = Event()
 rcvr = self.receiverFromEventOut(event_out)
 event_in.handle(rcvr.send)
 return event_out
def remitter(gen_rcvr):
 def gen_remitter(*args, **kargs):
 def receiver_from_event_out(event_out):
 rcvr = gen_rcvr(event_out, *args, **kargs)
 rcvr.send(None)
 return rcvr
 return Remitter(receiver_from_event_out)
 return gen_remitter
@remitter
def double_detect_r(double_click_event, threshold):
 last_click_time = 0
 while True:
 (yield)
 current_click_time = time.time()
 if (current_click_time - last_click_time) < threshold: double_click_event.fire() last_click_time = current_click_time @remitter def print_r(_, message): while True: val = (yield) print message mouse_click = Event() mouse_click>> print_r("left")
mouse_click>> double_detect_r(.01)>> print_r("double left")
mouse_click.fire() #prints "left"
time.sleep(.02)
mouse_click.fire() #prints "left"
mouse_click.fire() #prints "left" and "double left"
Great. But it is kind of annoying passing in the event like that. What if we had the remitter yield values out and yield values in?

Remitters that yield out and in

We could do that using little state machines built from python generators. "yield receive" will mean receive and "yield" of anything else will mean "emit".
from collections import defaultdict
class Remitter:
 def __init__(self, ritr):
 self.ritr = ritr
 self.eventOut = Event()
 def send(self, val_in):
 ritr = self.ritr
 event_out = self.eventOut
 while True:
 val_out = ritr.send(val_in)
 if val_out is receive:
 break
 else:
 event_out.fire(val_out) 
 def handle(self, handler):
 self.eventOut.handle(handler)
 def handlein(self, *events):
 for event in events:
 event.handle(self.send)
 def __rrshift__(self, event_in):
 try:
 self.handlein(*event_in)
 except:
 self.handlein(event_in)
 return self
def remitter(gen_rcvr):
 def gen_remitter(*args, **kargs):
 ritr = gen_rcvr(*args, **kargs)
 ritr.send(None)
 return Remitter(ritr)
 return gen_remitter
@remitter
def double_detect_r(threshold):
 last_click_time = 0
 while True:
 yield receive
 current_click_time = time.time()
 if (current_click_time - last_click_time) < threshold: yield last_click_time = current_click_time @remitter def map_r(f, *args, **kargs): while True: val = yield receive yield f(val, *args, **kargs) @remitter def print_r(format): while True: val = yield receive print message % val def label_r(label): return map_r(lambda val : (label, val)) @remitter def label_count_r(): count_by_label = defaultdict(int) while True: (label, val) = yield receive count_by_label[label] += 1 yield count_by_label.copy() def fix_click_counts(count_by_label, single_label, double_label): count_by_label[single_label] -= (count_by_label[double_label] * 2) #every double click "cancels" a single click return count_by_label.copy() def print_label_counts(count_by_label, *labels): print ", ".join("%d %s" % (count, label) for (label, count) in count_by_label.iteritems()) mouse_clicks = Event() ([mouse_clicks>> label_r("single"),
 mouse_clicks>> double_detect_r(.01)>> label_r("double")] 
>> label_count_r()>> map_r(fix_click_counts, "single", "double")>> map_r(print_label_counts))
 
#prints
#0 double, 1 single
#0 double, 2 single
#0 double, 3 single
#1 double, 1 single
mouse_clicks.fire() 
time.sleep(.02)
mouse_clicks.fire() 
mouse_clicks.fire()
Sweet. That looks pretty nice. But, it relies on the fact that Python allows you to yield values in to a generator. What if we have a programming language that only allows yielding values out (like any enumerator)?

Remitters that yield in by yielding out

We'll introduce a simple hack to work around that. We'll yield out a mutable "receive" value that "receives" in the value for us.
class Receive:
 def __init__(self, val = None):
 self.d = val
class Remitter:
 def __init__(self, receive, ritr):
 self.receive = receive
 self.ritr = ritr
 self.eventOut = Event()
 def send(self, val_in):
 self.receive.d = val_in
 ritr = self.ritr
 event_out = self.eventOut
 while True:
 val_out = ritr.next()
 if isinstance(val_out, Receive):
 break
 else:
 event_out.fire(val_out)
 def handle(self, handler):
 self.eventOut.handle(handler)
 def handlein(self, *events):
 for event in events:
 event.handle(self.send)
 def __rrshift__(self, event_in):
 try:
 self.handlein(*event_in)
 except:
 self.handlein(event_in)
 return self
def remitter(gen_rcvr):
 def gen_remitter(*args, **kargs):
 receive = Receive()
 ritr = gen_rcvr(receive, *args, **kargs)
 ritr.send(None)
 return Remitter(receive, ritr)
 return gen_remitter
@remitter
def double_detect_r(receive, threshold):
 last_click_time = 0
 while True:
 yield receive
 current_click_time = time.time()
 gap = current_click_time - last_click_time
 if gap < threshold: yield gap last_click_time = current_click_time @remitter def average_r(receive): total = 0.0 count = 0 while True: yield receive total += receive.d count += 1 yield total/count @remitter def print_r(receive, format): while True: yield receive print format % (receive.d) mouse_clicks = Event() mouse_clicks>> double_detect_r(.05)>> average_r()>> print_r("double click; average gap is %s seconds")
 
mouse_clicks.fire() 
time.sleep(.1)
mouse_clicks.fire() 
time.sleep(.01)
mouse_clicks.fire() #prints #double click; average gap is 0.01... seconds
time.sleep(.02) 
mouse_clicks.fire() #double click; average gap is 0.015... seconds
It works! And it should work in any language with iterator blocks. You could even use this C# instead of using LINQ Rx, but then you'll have to type "yield return receive" :(.

Conclusion

Rx is all about making flexible consumers of values, which basically amounts to making event callbacks. LINQ Rx does so with monads, but that's not the only way. Here, we have shown how we can turn a generator or iterator block inside out and make it consume values rather than produce values. Using these is an alternative to LINQ Rx that might be more appropriate for your programming language. There are lots of other things to work out, like unhandling an event, error handling, catching the end of a stream, etc. But this is pretty good, simple foundation to show that the essense of reactive programming is making it easy to make flexible value consumers (basically event handlers). In both the case of Rx, and the code above, we've done so by making a little DSL in the host language.

Next time...

There are still other ways of making flexible consumers. If we had continuations or used CPS we could just use the current continuation as our consumer. So, scheme and Ruby ought to have easy solutions to this problem. We can do a similar thing with macros in any Lisp language that doesn't have continuations, like Clojure. In fact, I'd like to explore how to do Rx in clojure next time. And at some point, we need to figure out how concurrency fits into all of this.

P.S.

While I was researching all of this stuff, I was surprised to find that my friend, Wes Dyer, is right at the heart of it. You can see a video of him here. That was a surprise because I've never talked with him about this. In fact, I've only heard from him once in the last year because he's "gone dark" . I'm sure his work on Rx has something to do with that :). I just want to make it clear that all of my thoughts are mine alone, and not based on any communication with him. Don't blame him for my crazy stuff :).

Rx Simplified (Reactive Programming in Python)

Lately, there's been interest in "reactive programming", especially with Rx. What is Rx? I've seen descriptions like "reactive programming with LINQ", "the dual of the enumerator/iterator" and even "a variation of the continuation monad". Oh right...uh...monad? dual? what's going on?

If you like things like "monads", "duals", and category theory, go watch this video, especially until the end. It's pretty funny.

But if those things make your eyes glaze over and you're left wondering what Rx really is, I want to give you a simple explanation of what Rx is all about. In fact, I'll show how you could have invented it yourself. We'll do so with simple event-based code written in Python.

Step 1: write simple event handlers

Imagine we have a mouse click event that fires either "left" or "right", and we want to make a new event that fires "double left" when there's a double left click. We might write something like this (including a simple Event class).
import time
class Event:
 def __init__(self):
 self.handlers = []
 def handle(self, handler):
 self.handlers.append(handler)
 def fire(self, val = None):
 for handler in self.handlers:
 handler(val)
def echo(val):
 print val
 return val
def left_double_click_from_click(click, threshold):
 dlclick = Event()
 last_lclick_time = [0] #closure hack
 def click_handler(click_value):
 if click_value == "left":
 lclick_time = time.time()
 if (lclick_time - last_lclick_time[0]) < threshold: dlclick.fire("double left") last_lclick_time[0] = lclick_time click.handle(click_handler) return dlclick click = Event() dlclick = left_double_click_from_click(click, .01) dlclick.handle(echo) click.fire("left") time.sleep(.02) click.fire("left") click.fire("right") click.fire("left") #prints "double click" 

Step 2: refactor event handlers

It works and it's pretty simple. But, we could refactor quite a bit. If we do so, we might write something like this (notice I like the suffix "_r" for "reactive"):
class EventFireRecord:
 def __init__(self, value, time):
 self.value = value
 self.time = time
def doubleize_r(evt, threshold, combine):
 double_evt = Event()
 last_fire = EventFireRecord(None, 0)
 def evt_handler(value):
 fire_time = time.time()
 if (fire_time - last_fire.time) < threshold: double_evt.fire(combine(last_fire.value, value)) last_fire.__init__(value, fire_time) evt.handle(evt_handler) return double_evt def filter_r(evt, predicate): filtered_evt = Event() def evt_handler(value): if predicate(value): filtered_evt.fire(value) evt.handle(evt_handler) return filtered_evt click = Event() dlclick = doubleize_r(filter_r(click, lambda value : value == "left"), .01, lambda l1, l2: "double left") dlclick.handle(echo) click.fire("left") time.sleep(.02) click.fire("left") click.fire("right") click.fire("left") #prints "double left" 
That looks better and is more generic. The logic of "double click" is now contained all on one line. But, we could do even better. Notice that we repeat ourselves a little with filter_r and doublize_r. The pattern looks like this:
evt_out = Event()
def handler(value):
 ...
 evt_out.fire(value)
 ...
evt_in.handle(handler)
return evt_out
What if we refactor to pull out that common pattern by making a special handler that returns a value and a special "handle_with_result" that looks like this pattern?
def handler(value):
 ...
 return value
evt_out = handle_with_result(evt_in, handler)

Step 3: make a higher-level "handle" function

If we do that, our code might look like this:
def handle_with_result(evt, handler_with_result):
 evt_out = Event()
 def handler(value):
 result = handler_with_result(value)
 if result is not None:
 evt_out.fire(result)
 evt.handle(handler)
 return evt_out
def doubleize_event(evt, threshold, combine):
 last_fire = EventFireRecord(None, 0)
 def handler(value):
 fire_time = time.time()
 try:
 if (fire_time - last_fire.time) < threshold: return combine(last_fire.value, value) finally: last_fire.__init__(value, fire_time) return handle_with_result(evt, handler) def filter_event(evt, predicate): def handler(value): if predicate(value): return value return handle_with_result(evt, handler) click = Event() dlclick = doubleize_event(filter_event(click, lambda value : value == "left"), .01, lambda l1, l2 : "double left") dlclick.handle(echo) click.fire("left") time.sleep(.02) click.fire("left") click.fire("right") click.fire("left") #prints "double left" 
It works, and our code looks better than ever. handle_with_result is very useful.

But, we are now missing something: what if we want to return multiple values? Or do something more interesting, like listen to an keyboard event and return left-clicks if the user clicks "l" and right clicks if they type "r" and two "fake" clicks if they type "f". We'd like to write something like this:

def choose_clicks(keys, clicks):
 def key_handler(char):
 if char == "l":
 return filter_event("left", clicks)
 elif char == "r":
 return filter_event("right", clicks)
 elif char == "f":
 return ["fake", "fake"]
 retrn handle_with_result(keys, key_handler)
If we change handle_with_result to handle this, we might get something like this:
def handle_with_result(evt, handler_with_result):
 evt_out = Event()
 def handler(value):
 result = handler_with_result(value)
 if result is None:
 pass #ignore
 elif isinstance(result, Event):
 result.handle(evt_out.fire)
 elif isinstance(result, list):
 for value_out in result:
 evt_out.fire(value_out)
 else:
 evt_out.fire(result)
 evt.handle(handler)
 return evt_out
def filter_r(evt, predicate):
 def handler(value):
 if predicate(value):
 return value
 return handle_with_result(evt, handler)
def value_filter_r(evt, value):
 return filter_r(evt, lambda val : val == value)
def choose_clicks(keys, clicks):
 def key_handler(char):
 #TODO: unsubscribe from event after either "l" or "r"
 if char == "l":
 return value_filter_r(clicks, "left")
 elif char == "r":
 return value_filter_r(clicks, "right")
 elif char == "f":
 return ["fake", "fake"]
 return handle_with_result(keys, key_handler)
keys = Event()
clicks = Event()
choosen_clicks = choose_clicks(keys, clicks)
choosen_clicks.handle(echo)
clicks.fire("left")
keys.fire("a")
clicks.fire("right")
keys.fire("l")
clicks.fire("left") # print "left"
clicks.fire("right")
clicks.fire("left") # print "left"
keys.fire("f") #prints "fake" and then "fake" again
Great. Now if we just add a little bit of syntax sugar to this, we can make events look like streams:

Step 4: add some syntax sugar

class Event:
 def __init__(self):
 self.handlers = []
 def handle(self, handler):
 self.handlers.append(handler)
 return self #so += will work
 def fire(self, val = None):
 for handler in self.handlers:
 handler(val)
 def bind(evt, handler_with_result):
 evt_out = Event()
 def handler(value):
 result = handler_with_result(value)
 if result is not None:
 Event.unit(result).handle(evt_out.fire)
 evt.handle(handler)
 return evt_out
 @classmethod
 def unit(cls, val):
 if isinstance(val, cls):
 return val
 elif isinstance(val, list):
 return MockEvent(*val)
 else:
 return MockEvent(val)
 __rshift__ = bind
class MockEvent:
 def __init__(self, *vals):
 self.vals = vals
 def handle(self, handler):
 for val in self.vals:
 handler(val) 
def doublize_r(threshold, combine):
 last_fire = EventFireRecord(None, 0)
 def handler(value):
 fire_time = time.time()
 try:
 if (fire_time - last_fire.time) < threshold: return combine(last_fire.value, value) finally: last_fire.__init__(value, fire_time) return handler def filter_r(predicate): def handler(value): if predicate(value): return value return handler def value_filter_r(value): return filter_r(lambda val : val == value) def click_choose_r(**clicks_by_char): def key_handler(char): return clicks_by_char.get(char) return key_handler clicks = Event() keys = Event() dlclicks = clicks>> value_filter_r("left")>> doublize_r(.01, lambda l1, l2: "double click")
keys>> click_choose_r(d = dlclicks, f = ["fake", "fake"])>> echo
clicks.fire("left")
clicks.fire("left")
keys.fire("f") #prints "fake" and then "fake" again
keys.fire("d")
clicks.fire("right")
clicks.fire("right")
time.sleep(.02)
clicks.fire("left")
clicks.fire("left") #print ("left", "left")

So what have we made?

Wonderful. We've made events look like streams by making a slick way of creating event handlers. In fact, if you look closely at what I did in that last step, you'll notice that I renamed "handle_with_result" to "bind" and moved some code into a method called "unit". That's all it takes to turn Event into a monad, which is exactly what Rx does. Congratulations, we just reinvented monads and Rx, just by refactoring our event handler code and in the process we've discovered what Rx really is: a fancy way of writing event handlers, specifically event handlers that fire events that trigger other event handlers that fire events, and so on in big chain that looks like a query. So when your eyes glaze over about "duals" and "monads" and "reactive programming", just say to yourself: I'm making a fancy event handler. Because in the end, that's all you're really doing.

In fact, if you want to do so in Python, now you have a basic implementation to start with! Of course, this is just a toy implementation. It lack error handling, unsubscribing, end-of-stream, and concurrency. But it ain't bad for just 50 lines of code. And it lets you see the essence of Rx fairly easily.

Oh, and what's the big deal with monads, you ask? Nothing much. It's just that if you provide "bind" and "unit" (called "select many" and "select" in LINQ, I think), LINQ gives you some nice syntax sugar that makes your event handler look like a query. It's really pretty slick, especially now that they've added "extension methods".

And next time...

In future posts, I'll explore different ways of making slick event handlers, but without monads. And hopefully we'll get make this handle concurrency, which is what asynchronous programming is all about. In fact, I expect we'll start to see a serious blurring of lines between Rx, message passing, and data flow programming.

For now, when you start working with Rx, just remember: I'm making a big, fancy event handler. An "Observable" is just like and event and an "Observer" is just like an event handler.

P.S.

What's with the lame names? They started off with cool names like "Reactive" and "Rx" and then give us Observable, Observer, Subscribe, OnNext, OnDone, and OnError. Yuck. Think what an opportunity they missed! We could have had names like Emitter, Reactor, Chain, Emit, Extinguish, and Explode. Judge for yourself:
observable.Subscribe(observer)
observer.OnNext(val)
observer.OnDone()
observer.OnError(e)
or
emitter.chain(reactor)
reactor.emit("foo")
reactor.extinguish()
reactor.explode(e)

Subscribe to: Comments (Atom)

Blog Archive

Google Analytics

AltStyle によって変換されたページ (->オリジナル) /