1. Machine-Learning-Tutorials
  2. mapreduce
Notebook

Introduction

mrjob lets you write MapReduce jobs in Python 2.5+ and run them on several platforms. You can:

  • Write multi-step MapReduce jobs in pure Python
  • Test on your local machine
  • Run on a Hadoop cluster
  • Run in the cloud using Amazon Elastic MapReduce (EMR)

Setup

From PyPI:

pip install mrjob

From source:

python setup.py install

See Sample Config File section for additional config details.

Processing S3 Logs

Sample mrjob code that processes log files on Amazon S3 based on the S3 logging format:

In [ ]:
%%file mr_s3_log_parser.py
importtime
frommrjob.jobimport MRJob
frommrjob.protocolimport RawValueProtocol, ReprProtocol
importre
classMrS3LogParser(MRJob):
"""Parses the logs from S3 based on the S3 logging format:
 http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html

 Aggregates a user's daily requests by user agent and operation

 Outputs date_time, requester, user_agent, operation, count
 """
 LOGPATS = r'(\S+) (\S+) \[(.*?)\] (\S+) (\S+) ' \
 r'(\S+) (\S+) (\S+) ("([^"]+)"|-) ' \
 r'(\S+) (\S+) (\S+) (\S+) (\S+) (\S+) ' \
 r'("([^"]+)"|-) ("([^"]+)"|-)'
 NUM_ENTRIES_PER_LINE = 17
 logpat = re.compile(LOGPATS)
 (S3_LOG_BUCKET_OWNER, 
 S3_LOG_BUCKET, 
 S3_LOG_DATE_TIME,
 S3_LOG_IP, 
 S3_LOG_REQUESTER_ID, 
 S3_LOG_REQUEST_ID,
 S3_LOG_OPERATION, 
 S3_LOG_KEY, 
 S3_LOG_HTTP_METHOD,
 S3_LOG_HTTP_STATUS, 
 S3_LOG_S3_ERROR, 
 S3_LOG_BYTES_SENT,
 S3_LOG_OBJECT_SIZE, 
 S3_LOG_TOTAL_TIME, 
 S3_LOG_TURN_AROUND_TIME,
 S3_LOG_REFERER, 
 S3_LOG_USER_AGENT) = range(NUM_ENTRIES_PER_LINE)
 DELIMITER = '\t'
 # We use RawValueProtocol for input to be format agnostic
 # and avoid any type of parsing errors
 INPUT_PROTOCOL = RawValueProtocol
 # We use RawValueProtocol for output so we can output raw lines
 # instead of (k, v) pairs
 OUTPUT_PROTOCOL = RawValueProtocol
 # Encode the intermediate records using repr() instead of JSON, so the
 # record doesn't get Unicode-encoded
 INTERNAL_PROTOCOL = ReprProtocol
 defclean_date_time_zone(self, raw_date_time_zone):
"""Converts entry 22/Jul/2013:21:04:17 +0000 to the format
 'YYYY-MM-DD HH:MM:SS' which is more suitable for loading into
 a database such as Redshift or RDS
 Note: requires the chars "[ ]" to be stripped prior to input
 Returns the converted datetime annd timezone
 or None for both values if failed
 TODO: Needs to combine timezone with date as one field
 """
 date_time = None
 time_zone_parsed = None
 # TODO: Probably cleaner to parse this with a regex
 date_parsed = raw_date_time_zone[:raw_date_time_zone.find(":")]
 time_parsed = raw_date_time_zone[raw_date_time_zone.find(":") + 1:
 raw_date_time_zone.find("+") - 1]
 time_zone_parsed = raw_date_time_zone[raw_date_time_zone.find("+"):]
 try:
 date_struct = time.strptime(date_parsed, "%d/%b/%Y")
 converted_date = time.strftime("%Y-%m-%d", date_struct)
 date_time = converted_date + " " + time_parsed
 # Throws a ValueError exception if the operation fails that is
 # caught by the calling function and is handled appropriately
 except ValueError as error:
 raise ValueError(error)
 else:
 return converted_date, date_time, time_zone_parsed
 defmapper(self, _, line):
 line = line.strip()
 match = self.logpat.search(line)
 date_time = None
 requester = None
 user_agent = None
 operation = None
 try:
 for n in range(self.NUM_ENTRIES_PER_LINE):
 group = match.group(1 + n)
 if n == self.S3_LOG_DATE_TIME:
 date, date_time, time_zone_parsed = \
 self.clean_date_time_zone(group)
 # Leave the following line of code if 
 # you want to aggregate by date
 date_time = date + " 00:00:00"
 elif n == self.S3_LOG_REQUESTER_ID:
 requester = group
 elif n == self.S3_LOG_USER_AGENT:
 user_agent = group
 elif n == self.S3_LOG_OPERATION:
 operation = group
 else:
 pass
 except Exception:
 yield (("Error while parsing line: %s", line), 1)
 else:
 yield ((date_time, requester, user_agent, operation), 1)
 defreducer(self, key, values):
 output = list(key)
 output = self.DELIMITER.join(output) + \
 self.DELIMITER + \
 str(sum(values))
 yield None, output
 defsteps(self):
 return [
 self.mr(mapper=self.mapper,
 reducer=self.reducer)
 ]
if __name__ == '__main__':
 MrS3LogParser.run()

Running Amazon Elastic MapReduce Jobs

Run an Amazon Elastic MapReduce (EMR) job on the given input (must be a flat file hierarchy), placing the results in the output (output directory must not exist):

In [ ]:
!pythonmr_s3_log_parser.py-remrs3://bucket-source/--output-dir=s3://bucket-dest/

Run a MapReduce job locally on the specified input file, sending the results to the specified output file:

In [ ]:
!pythonmr_s3_log_parser.pyinput_data.txt>output_data.txt

Unit Testing S3 Logs

Accompanying unit test:

In [ ]:
%%file test_mr_s3_log_parser.py
fromStringIOimport StringIO
importunittest2asunittest
frommr_s3_log_parserimport MrS3LogParser
classMrTestsUtil:
 defrun_mr_sandbox(self, mr_job, stdin):
 # inline runs the job in the same process so small jobs tend to
 # run faster and stack traces are simpler
 # --no-conf prevents options from local mrjob.conf from polluting
 # the testing environment
 # "-" reads from standard in
 mr_job.sandbox(stdin=stdin)
 # make_runner ensures job cleanup is performed regardless of
 # success or failure
 with mr_job.make_runner() as runner:
 runner.run()
 for line in runner.stream_output():
 key, value = mr_job.parse_output_line(line)
 yield value
 
classTestMrS3LogParser(unittest.TestCase):
 mr_job = None
 mr_tests_util = None
 RAW_LOG_LINE_INVALID = \
 '00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \
 '00000388225bcc00000 ' \
 's3-storage [22/Jul/2013:21:03:27 +0000] ' \
 '00.111.222.33 ' \
 RAW_LOG_LINE_VALID = \
 '00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \
 '00000388225bcc00000 ' \
 's3-storage [22/Jul/2013:21:03:27 +0000] ' \
 '00.111.222.33 ' \
 'arn:aws:sts::000005646931:federated-user/user 00000AB825500000 ' \
 'REST.HEAD.OBJECT user/file.pdf ' \
 '"HEAD /user/file.pdf?versionId=00000XMHZJp6DjM9x500000' \
 '00000SDZk ' \
 'HTTP/1.1" 200 - - 4000272 18 - "-" ' \
 '"Boto/2.5.1 (darwin) USER-AGENT/1.0.14.0" ' \
 '00000XMHZJp6DjM9x5JVEAMo8MG00000'
 DATE_TIME_ZONE_INVALID = "AB/Jul/2013:21:04:17 +0000"
 DATE_TIME_ZONE_VALID = "22/Jul/2013:21:04:17 +0000"
 DATE_VALID = "2013-07-22"
 DATE_TIME_VALID = "2013-07-22 21:04:17"
 TIME_ZONE_VALID = "+0000"
 def__init__(self, *args, **kwargs):
 super(TestMrS3LogParser, self).__init__(*args, **kwargs)
 self.mr_job = MrS3LogParser(['-r', 'inline', '--no-conf', '-'])
 self.mr_tests_util = MrTestsUtil()
 deftest_invalid_log_lines(self):
 stdin = StringIO(self.RAW_LOG_LINE_INVALID)
 for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin):
 self.assertEqual(result.find("Error"), 0)
 deftest_valid_log_lines(self):
 stdin = StringIO(self.RAW_LOG_LINE_VALID)
 for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin):
 self.assertEqual(result.find("Error"), -1)
 deftest_clean_date_time_zone(self):
 date, date_time, time_zone_parsed = \
 self.mr_job.clean_date_time_zone(self.DATE_TIME_ZONE_VALID)
 self.assertEqual(date, self.DATE_VALID)
 self.assertEqual(date_time, self.DATE_TIME_VALID)
 self.assertEqual(time_zone_parsed, self.TIME_ZONE_VALID)
 # Use a lambda to delay the calling of clean_date_time_zone so that
 # assertRaises has enough time to handle it properly
 self.assertRaises(ValueError,
 lambda: self.mr_job.clean_date_time_zone(
 self.DATE_TIME_ZONE_INVALID))
if __name__ == '__main__':
 unittest.main()

Running S3 Logs Unit Test

Run the mrjob test:

In [ ]:
!pythontest_mr_s3_log_parser.py-v

Sample Config File

In [ ]:
runners:
 emr:
 aws_access_key_id: __ACCESS_KEY__
 aws_secret_access_key: __SECRET_ACCESS_KEY__
 aws_region: us-east-1
 ec2_key_pair: EMR
 ec2_key_pair_file: ~/.ssh/EMR.pem
 ssh_tunnel_to_job_tracker: true
 ec2_master_instance_type: m3.xlarge
 ec2_instance_type: m3.xlarge
 num_ec2_instances: 5
 s3_scratch_uri: s3://bucket/tmp/
 s3_log_uri: s3://bucket/tmp/logs/
 enable_emr_debugging: True
 bootstrap:
 - sudo apt-get install -y python-pip
 - sudo pip install --upgrade simplejson

AltStyle によって変換されたページ (->オリジナル) /