Recently I have started using mrjob package to write MapReduce jobs in Python. The official documentation includes examples of simple jobs and instructions to run jobs on Amazon's Elastic Map Reduce.
For one of the projects I had to run MapReduce on 19GB log file and store the results into the database. As it is usually most simple to import the CSV-formatted data, I have written a simple procedure which outputs the MapReduce results in CSV format.
In the input file each row represented an interaction between two persons on the observed day. For example, the following row means that person A contacted person B on January 17th 2014:
A,B,2014-01-17
Given the data in this format I wanted to create an array which describes which days there was an interaction between the observed couple. Therefore, for each user I created an array which represents the number of interactions between the observed couples for each day from January 1st to 31st. After reshaping the data accordingly, I stored the results in CSV format in order to simplify the database import process.
The complete job with included class to get CSV-formatted output is written below.
For one of the projects I had to run MapReduce on 19GB log file and store the results into the database. As it is usually most simple to import the CSV-formatted data, I have written a simple procedure which outputs the MapReduce results in CSV format.
In the input file each row represented an interaction between two persons on the observed day. For example, the following row means that person A contacted person B on January 17th 2014:
A,B,2014-01-17
Given the data in this format I wanted to create an array which describes which days there was an interaction between the observed couple. Therefore, for each user I created an array which represents the number of interactions between the observed couples for each day from January 1st to 31st. After reshaping the data accordingly, I stored the results in CSV format in order to simplify the database import process.
The complete job with included class to get CSV-formatted output is written below.
from mrjob.job import MRJob
import re
from datetime import datetime
from datetime import date
import json
WORD_RE = re.compile(r"[\w']+")
class JSONProtocol(object):
def read(self, line):
k_str, v_str = line.split('\t', 1)
return json.loads(k_str), json.loads(v_str)
def write(self, key, value):
user_A = str(key[0])
user_B = str(key[1])
strinteractionsAll = ";".join(map(str, value))
#return '%s\t%s' % (json.dumps(key), json.dumps(value))
return '%s;%s;%s' % (user_A, user_B, strinteractionsAll)
class MRinteractionsCount(MRJob):
OUTPUT_PROTOCOL = JSONProtocol
def mapper(self, _, line):
interactionRecord = line.split(';')
if not len(interactionRecord) is 3:
return
user_A, user_B, interaction_date = interactionRecord
if not (len(interaction_date) == 19):
return
try:
observationDate = datetime.strptime(interaction_date, '%Y-%m-%d %H:%M:%S').date()
except:
#raise ValueError("Incorrect data format, should be YYYY-MM-DD HH:MM:SS")
return
dailyInteractions =[0] * 31
dateStart = date(2014, 1,1)
delta = observationDate - dateStart
dayIndex = delta.days
dailyInteractions[dayIndex] = 1
yield [user_A, user_B], dailyInteractions
def combiner(self, connection, interactions):
interactionsCountList = list(interactions)
interactionsBetweenUsers = [sum(x) for x in zip(*interactionsCountList)]
yield connection, interactionsBetweenUsers
def reducer(self, connection, interactions):
interactionsCountList = list(interactions)
interactionsBetweenUsers = [sum(x) for x in zip(*interactionsCountList)]
yield connection, interactionsBetweenUsers
if __name__ == '__main__':
MRinteractionsCount.run()