My MapReduce tester is clearly ported from Shell, short of args=None
for line in args or read_input()
, what's a better way of importing->testing the function outside of subprocess
?
Or does it not matter, i.e.: my "hack" is fine?
test_mapreduce.py
from unittest import TestCase, main as unittest_main
from subprocess import check_output as run
from os import path
class TestMapReduce(TestCase):
top_path = ''
map_reduce = lambda self, mapper_name, reducer_name, datafile_name: run(
['python', path.join(self.top_path, reducer_name), # Reduce
run(['sort', # Shuffle, could be replaced with python `sorted`
run(['python', path.join(self.top_path, mapper_name), # Map
path.join(self.top_path, 'data', datafile_name)])])])
@classmethod
def setUpClass(cls):
if not path.isfile('setup.py'):
cls.top_path = path.join('..', '..')
if not path.isfile(path.join(cls.top_path, 'setup.py')):
raise AssertionError("Haven't found right directory to `cd` into")
def test_with_student_test_posts(self):
print self.map_reduce('mapper.py', 'reducer.py', 'student_test_posts.csv')
if __name__ == '__main__':
unittest_main()
mapper.py
#!/usr/bin/env python
from fileinput import input as read_input
def mapper():
for line in read_input():
data = line.strip().split('\t')
if len(data) != 6:
continue
date, time, store, item, cost, payment = data
print "{0}\t{1}".format(store, cost)
if __name__ == '__main__':
mapper()
PS: Should I refactor to use the map
and reduce
inbuilt functions?
2 Answers 2
It's strange to use map_reduce = lambda ...
to define a method. This is the same, written the common way:
def map_reduce(self, mapper_name, reducer_name, datafile_name):
run(
['python', path.join(self.top_path, reducer_name), # Reduce
run(['sort', # Shuffle, could be replaced with python `sorted`
run(['python', path.join(self.top_path, mapper_name), # Map
path.join(self.top_path, 'data', datafile_name)])])])
And this hack of calling python -> sort -> python
is not fine at all. Python can certainly sort. Then your pipeline would become python -> python -> python
, and at that point it's beyond silly to call subprocesses for this. You should do the whole thing in a single Python process, instead of 3 different processes.
Refactoring with map
and reduce
Here's one way to refactor mapper
to use Python's map
function:
def line2cols(line):
return line.strip().split('\t')
def has6cols(cols):
return len(cols) == 6
def cols2out(cols):
return '{}\t{}'.format(*cols)
def mapper():
return map(cols2out, filter(has6cols, map(line2cols, read_input())))
And here's an example reducer using Python's reduce
:
def reducer(seq):
def f(a, b):
if len(a) > len(b):
return a
return b
return reduce(f, seq, '')
This is quite stupid, it just finds the longest string in the sequence.
I hope this helps.
UPDATE
It's a bit difficult to understand what you're trying to do.
My MapReduce tester is clearly ported from Shell, short of args=None for line in args or read_input(), what's a better way of importing->testing the function outside of subprocess?
When I read this I didn't quite get what you're talking about shell. In the code I saw you're calling Python, twice, which is clearly not fine.
Let me try again, to guess what you're trying to do. Maybe you have a Python mapper script, and you have a Python reducer script, which you use in some framework? And you want to write some unit tests to check that these scripts in fact work? I mean the scripts as black boxes, as in, you want to test the complete scripts, rather than the underlying Python functions / classes? I'm really just guessing here, maybe I'm completely wrong.
If this is indeed what you want, then don't. Don't try to test the scripts, test the underlying implementation. If the implementation passes, the scripts should print correct output too. If you want to test the script outputs in addition to the underlying implementation, then you'd be just testing the basic ability to print, which seems rather pointless.
-
\$\begingroup\$ Referenced python's
sorted
in my comment, and isn'tlambda
vsdef
merely a combination of personal style + one-liners vs multi-liners in Python? \$\endgroup\$A T– A T2014年08月17日 08:21:41 +00:00Commented Aug 17, 2014 at 8:21 -
\$\begingroup\$ If you could show me how to rework my flow to not use
subprocess
at all that'd be great =) \$\endgroup\$A T– A T2014年08月17日 08:22:18 +00:00Commented Aug 17, 2014 at 8:22 -
\$\begingroup\$ Just because you can do something doesn't mean you should.
lambda
is useful and ergonomic in some situations, but this is not one of those. If you really prefer this way, you can, but I advise against. \$\endgroup\$janos– janos2014年08月17日 08:52:51 +00:00Commented Aug 17, 2014 at 8:52 -
\$\begingroup\$ To rework without
subprocess
, that would take a rewrite. Code Review is about reviewing code, not rewriting it. This article looks like a good starting point: mikecvet.wordpress.com/2010/07/02/parallel-mapreduce-in-python \$\endgroup\$janos– janos2014年08月17日 09:02:02 +00:00Commented Aug 17, 2014 at 9:02 -
\$\begingroup\$ Here is the solution I touched upon in my initial question - codereview.stackexchange.com/a/60281/13407 - is that as bad a solution as I implied? \$\endgroup\$A T– A T2014年08月17日 09:26:36 +00:00Commented Aug 17, 2014 at 9:26
Forgot all about this problem, only took a look at my project today, and figured out this solution:
test_mapper.py
from unittest import TestCase, main as unittest_main
from StringIO import StringIO
from map_reduce_udacity.mapper import mapper
class TestMapper(TestCase):
def test_one_line(self):
self.assertEqual(
mapper(StringIO("2012-01-01 09:00 San Jose Men's Clothing 214.05 Amex")),
['San Jose\t214.05']
)
if __name__ == '__main__':
unittest_main()
mapper.py
#!/usr/bin/env python
from fileinput import input as read_input
from os.path import abspath, join as path_join, dirname
from StringIO import StringIO
from collections import OrderedDict
def pick(r, ks, headers=None):
headers = headers or 'date', 'time', 'store', 'item', 'cost', 'payment'
return filter(lambda v: v is not None,
map(lambda t: t[0] in ks and t[1] or None,
OrderedDict(zip(headers, r)).iteritems()))
def mapper(args=None):
out = map(lambda row: '\t'.join(pick(row, ('store', 'cost'))),
filter(lambda data: len(data) == 6,
map(lambda line: line.strip().split('\t'),
args or read_input())))
print 'out =', out
return out
Just to show the advantage of the function written this way more explicitly:
from os.path import abspath, join as path_join, dirname
from StringIO import StringIO
if __name__ == '__main__':
# First way of running:
mapper(StringIO("2012-01-01 09:00 San Jose Men's Clothing 214.05 Amex"))
# Second way of running:
with open(abspath(path_join(dirname(__file__), '..', 'data', 'head_50_purchases.txt'))) as f:
mapper(f.readlines())
# Third way of running:
# [from your e.g. cmd.exe or bash or subprocess call]
# $ python mapper.py ../data/head_50_purchases.txt
Explore related questions
See similar questions with these tags.
reducer.py
? Some samplestudent_test_posts.csv
would be nice too \$\endgroup\$