Wednesday, September 10, 2014

mrjob: Writing Protocol for CSV Output File Format

Recently I have started using mrjob package to write MapReduce jobs in Python. The official documentation includes examples of simple jobs and instructions to run jobs on Amazon's Elastic Map Reduce.

For one of the projects I had to run MapReduce on 19GB log file and store the results into the database. As it is usually most simple to import the CSV-formatted data, I have written a simple procedure which outputs the MapReduce results in CSV format.

In the input file each row represented an interaction between two persons on the observed day. For example, the following row means that person A contacted person B on January 17th 2014:

A,B,2014-01-17

Given the data in this format I wanted to create an array which describes which days there was an interaction between the observed couple. Therefore, for each user I created an array which represents the number of interactions between the observed couples for each day from January 1st to 31st. After reshaping the data accordingly, I stored the results in CSV format in order to simplify the database import process.

The complete job with included class to get CSV-formatted output is written below.

from mrjob.job import MRJob
import re
from datetime import datetime
from datetime import date
import json

WORD_RE = re.compile(r"[\w']+")

class JSONProtocol(object):

    def read(self, line):
        k_str, v_str = line.split('\t', 1)
        return json.loads(k_str), json.loads(v_str)

    def write(self, key, value):
        user_A = str(key[0])
        user_B = str(key[1])
        strinteractionsAll = ";".join(map(str, value))

        #return '%s\t%s' % (json.dumps(key), json.dumps(value))
        return '%s;%s;%s' % (user_A, user_B, strinteractionsAll)


class MRinteractionsCount(MRJob):
    
    OUTPUT_PROTOCOL = JSONProtocol
    
    def mapper(self, _, line):
        interactionRecord = line.split(';')
        if  not len(interactionRecord) is 3:
            return
        user_A, user_B, interaction_date = interactionRecord
        if not (len(interaction_date) == 19):
            return
        try:
            observationDate = datetime.strptime(interaction_date, '%Y-%m-%d %H:%M:%S').date()
        except:
            #raise ValueError("Incorrect data format, should be YYYY-MM-DD HH:MM:SS")
            return
        
        dailyInteractions =[0] * 31
        
        dateStart = date(2014, 1,1)
        delta = observationDate - dateStart
        dayIndex = delta.days
        
        dailyInteractions[dayIndex] = 1
        
            
        yield [user_A, user_B], dailyInteractions
        

    def combiner(self, connection, interactions):
        interactionsCountList = list(interactions)
        interactionsBetweenUsers = [sum(x) for x in zip(*interactionsCountList)]

        yield connection, interactionsBetweenUsers

    def reducer(self, connection, interactions):
        interactionsCountList = list(interactions)
        interactionsBetweenUsers = [sum(x) for x in zip(*interactionsCountList)]

        yield connection, interactionsBetweenUsers

if __name__ == '__main__':

    MRinteractionsCount.run()