Wednesday, September 10, 2014

mrjob: Writing Protocol for CSV Output File Format

Recently I have started using mrjob package to write MapReduce jobs in Python. The official documentation includes examples of simple jobs and instructions to run jobs on Amazon's Elastic Map Reduce.

For one of the projects I had to run MapReduce on 19GB log file and store the results into the database. As it is usually most simple to import the CSV-formatted data, I have written a simple procedure which outputs the MapReduce results in CSV format.

In the input file each row represented an interaction between two persons on the observed day. For example, the following row means that person A contacted person B on January 17th 2014:

A,B,2014-01-17

Given the data in this format I wanted to create an array which describes which days there was an interaction between the observed couple. Therefore, for each user I created an array which represents the number of interactions between the observed couples for each day from January 1st to 31st. After reshaping the data accordingly, I stored the results in CSV format in order to simplify the database import process.

The complete job with included class to get CSV-formatted output is written below.

from mrjob.job import MRJob
import re
from datetime import datetime
from datetime import date
import json

WORD_RE = re.compile(r"[\w']+")

class JSONProtocol(object):

    def read(self, line):
        k_str, v_str = line.split('\t', 1)
        return json.loads(k_str), json.loads(v_str)

    def write(self, key, value):
        user_A = str(key[0])
        user_B = str(key[1])
        strinteractionsAll = ";".join(map(str, value))

        #return '%s\t%s' % (json.dumps(key), json.dumps(value))
        return '%s;%s;%s' % (user_A, user_B, strinteractionsAll)


class MRinteractionsCount(MRJob):
    
    OUTPUT_PROTOCOL = JSONProtocol
    
    def mapper(self, _, line):
        interactionRecord = line.split(';')
        if  not len(interactionRecord) is 3:
            return
        user_A, user_B, interaction_date = interactionRecord
        if not (len(interaction_date) == 19):
            return
        try:
            observationDate = datetime.strptime(interaction_date, '%Y-%m-%d %H:%M:%S').date()
        except:
            #raise ValueError("Incorrect data format, should be YYYY-MM-DD HH:MM:SS")
            return
        
        dailyInteractions =[0] * 31
        
        dateStart = date(2014, 1,1)
        delta = observationDate - dateStart
        dayIndex = delta.days
        
        dailyInteractions[dayIndex] = 1
        
            
        yield [user_A, user_B], dailyInteractions
        

    def combiner(self, connection, interactions):
        interactionsCountList = list(interactions)
        interactionsBetweenUsers = [sum(x) for x in zip(*interactionsCountList)]

        yield connection, interactionsBetweenUsers

    def reducer(self, connection, interactions):
        interactionsCountList = list(interactions)
        interactionsBetweenUsers = [sum(x) for x in zip(*interactionsCountList)]

        yield connection, interactionsBetweenUsers

if __name__ == '__main__':

    MRinteractionsCount.run()

Sunday, June 10, 2012

neo4j: Using Cypher Query Language with .NET


After reading the neo4j REST documentation I've created  JSONCypherQuery class with some custom methods in in order to use custom Cypher queries in .NET application.

Creating Classes for Serialization


Below is an example request and response from ne04j REST API documentation


Example request
  • POST http://localhost:7474/db/data/cypher
  • Accept: application/json
  • Content-Type: application/json
{"query": "start x  = node(27) match x -[r]-> n return type(r), n.name?,
n.age?", "params": {}},
Example response
  • 200: OK
  • Content-Type: application/json
{
  "data" : [ [ "know", "him", 25 ], [ "know", "you", null ] ],
  "columns" : [ "type(r)", "n.name?", "n.age?" ]
}

To create a request string formatted the same way as
{"query": "start x  = node(27) match x -[r]-> n return type(r), n.name?, n.age?","params": {}},

we can create a class


class JSONQueryCommand
    {
        [JsonProperty("query")]
        public string Query { get; set; }
        [JsonProperty("params")]
        public string Parameters { get; set; }
    }

and method which posts a request and gets a response as string.

Handling Requests and Responses

The method below transforms the Cypher query string into appropriate web request:


private static string CreateRequest(string query)
        {
            string response = "";
            try
            {
                //Connect
                //http://localhost:7474/db/data/ext/CypherPlugin/graphdb/execute_query
                HttpWebRequest req = (HttpWebRequest)WebRequest.Create("http://localhost:7474/db/data/ext/CypherPlugin/graphdb/execute_query");
                req.Method = "POST";
                req.ContentType = "application/json";
                string parameters = null;
                JSONQueryCommand cmd = new JSONQueryCommand();
                cmd.Query = query;
                cmd.Parameters = parameters;
                string json = JsonConvert.SerializeObject(cmd);
                using (var streamWriter = new StreamWriter(req.GetRequestStream()))
                {

                    streamWriter.Write(json);
                }
                var httpResponse = (HttpWebResponse)req.GetResponse();
                using (var streamReader = new StreamReader(httpResponse.GetResponseStream()))
                {
                    var responseText = streamReader.ReadToEnd();
                    response = responseText;
                    //Now you have your response.
                    //or false depending on information in the response    
                }
            }
            catch (Exception ex)
            {
            }
            return response;
        }

In order to handle the received string and extract the data we need additional methods. Below is the simple method that extracts scalar value:


public static object GetScalar(string request)
        {
            string response = CreateRequest(request);
            var joResponse = JObject.Parse(response);
            var jaData = (JArray)joResponse["data"];
            var dataArray = jaData.First();
            var firstDataElement = dataArray.First();
            JValue jResult = (JValue)firstDataElement;
            object result = jResult.Value;
            return result;
        }

Example

After creating these methods we can simply use custom Cypher queries.

object response = JSONCypherQuery.GetScalar("start a = node(1) MATCH (a)--(b) return count(b);");
int neighborsCount= Convert.ToInt32(response);


The code block

 req.Method = "POST";
                req.ContentType = "application/json";
                string parameters = null;
                JSONQueryCommand cmd = new JSONQueryCommand();
                cmd.Query = query;
                cmd.Parameters = parameters;
                string json = JsonConvert.SerializeObject(cmd);


creates a web rquest with content

{"query":"start a = node(1) MATCH (a)--(b) return count(b);","params":null}

which is sent to server and if we know what kind of data are we expecting to receive from server, we can create an appropriate parsing/deserializing method. For the example query the result is scalar and the method GetScalar extracts it from response string.




Social Network Analysis with neo4j: Graphs

I have been analyzing social networks and recently switched the database from standard relational to neo4j. The latter is much more appropriate for working with graphs and some basic insights about nodes and edges can be calculated in an instant.

The difference is obvious while dealing with big dataset. Below is an example SQL query for retrieving all connections among users in selected distance:


with recursive cluster (party, path, depth) 
 as ( select cast(@userId as character varying), cast(@userId as character varying), 1 
 union 
 ( 
 select (case 
 when this.party = amc.userA then amc.userB 
 when this.party = amc.userB then amc.userA 
 end), (this.path || '.' || (case 
 when this.party = amc.userA then amc.userB 
 when this.party = amc.userB then amc.userA 
 end)), this.depth + 1 
 from cluster this, chat amc 
 where ((this.party = amc.userA and position(amc.userB in this.path) = 0) 
 or (this.party = amc.userB and position(amc.userA in this.path) = 0)) AND this.depth < @depth + 1 ) ) 
 select party, path 
 from cluster 
 where not exists ( 
 select * 
 from cluster c2 where cluster.party = c2.party 
 and ( 
 char_length(cluster.path) > char_length(c2.path)
 or (char_length(cluster.path) = char_length(c2.path)) and (cluster.path > c2.path) 
 ) 
 ) 
 order by party, path;


Running such query on database with several million users and connections takes very long time (talking in hours on proprietary PC).




Below is the Cypher query for neo4j database which counts all friends of friends (equivalent to above one in case of depth = 2)



neo4j-sh (0)$ start b = node:User(UserId='9F56478E6CAFB9CFF8C720C5DFC392C49495C582') MATCH (b) --(friend)--(friendoffriend) RETURN count(friendoffriend)
==> +-----------------------+
==> | count(friendoffriend) |
==> +-----------------------+
==> | 131457                |
==> +-----------------------+
==> 1 row, 635 ms

The advantage in performance and simplicity is obvious.

Running queries in neo4j console



Below are some more example Cypher queries for working with graphs. you can try out these and other on simple example network on this website.

Graph Screenshot from neo4j Console


Find Neighbors


start a=node(*)
match (a)-->(b)
return a, b;
+-----------------------+
| a         | b         |
+-----------------------+
| Node[0]{} | Node[1]{} |
| Node[1]{} | Node[2]{} |
| Node[1]{} | Node[3]{} |
| Node[1]{} | Node[4]{} |
| Node[1]{} | Node[5]{} |
| Node[2]{} | Node[6]{} |
| Node[2]{} | Node[7]{} |
| Node[3]{} | Node[4]{} |
| Node[5]{} | Node[6]{} |
+-----------------------+
9 rows
0 ms

Find Mutual Connections


start a=node(*), b=node(*)
match (a)--(x)--(b)
return a, b, x
+-----------------------------------+
| a         | b         | x         |
+-----------------------------------+
| Node[0]{} | Node[2]{} | Node[1]{} |
| Node[0]{} | Node[3]{} | Node[1]{} |
| Node[0]{} | Node[4]{} | Node[1]{} |
| Node[0]{} | Node[5]{} | Node[1]{} |
| Node[1]{} | Node[3]{} | Node[4]{} |
| Node[1]{} | Node[4]{} | Node[3]{} |
| Node[1]{} | Node[6]{} | Node[2]{} |
| Node[1]{} | Node[6]{} | Node[5]{} |
| Node[1]{} | Node[7]{} | Node[2]{} |
| Node[2]{} | Node[0]{} | Node[1]{} |
| Node[2]{} | Node[3]{} | Node[1]{} |
| Node[2]{} | Node[4]{} | Node[1]{} |
| Node[2]{} | Node[5]{} | Node[6]{} |
| Node[2]{} | Node[5]{} | Node[1]{} |
| Node[3]{} | Node[0]{} | Node[1]{} |
| Node[3]{} | Node[1]{} | Node[4]{} |
| Node[3]{} | Node[2]{} | Node[1]{} |
| Node[3]{} | Node[4]{} | Node[1]{} |
| Node[3]{} | Node[5]{} | Node[1]{} |
| Node[4]{} | Node[0]{} | Node[1]{} |
| Node[4]{} | Node[1]{} | Node[3]{} |
| Node[4]{} | Node[2]{} | Node[1]{} |
| Node[4]{} | Node[3]{} | Node[1]{} |
| Node[4]{} | Node[5]{} | Node[1]{} |
| Node[5]{} | Node[0]{} | Node[1]{} |
| Node[5]{} | Node[2]{} | Node[6]{} |
| Node[5]{} | Node[2]{} | Node[1]{} |
| Node[5]{} | Node[3]{} | Node[1]{} |
| Node[5]{} | Node[4]{} | Node[1]{} |
| Node[6]{} | Node[1]{} | Node[2]{} |
| Node[6]{} | Node[1]{} | Node[5]{} |
| Node[6]{} | Node[7]{} | Node[2]{} |
| Node[7]{} | Node[1]{} | Node[2]{} |
| Node[7]{} | Node[6]{} | Node[2]{} |
+-----------------------------------+
34 rows
0 ms

Count Mutual Connections


start a=node(*), b=node(*)
match (a)--(x)--(b)
where id(a) < id(b)
return a, b, count(distinct x)
+-------------------------------------------+
| a         | b         | count(distinct x) |
+-------------------------------------------+
| Node[0]{} | Node[5]{} | 1                 |
| Node[2]{} | Node[5]{} | 2                 |
| Node[3]{} | Node[4]{} | 1                 |
| Node[6]{} | Node[7]{} | 1                 |
| Node[1]{} | Node[4]{} | 1                 |
| Node[0]{} | Node[3]{} | 1                 |
| Node[4]{} | Node[5]{} | 1                 |
| Node[1]{} | Node[6]{} | 2                 |
| Node[0]{} | Node[4]{} | 1                 |
| Node[1]{} | Node[7]{} | 1                 |
| Node[0]{} | Node[2]{} | 1                 |
| Node[1]{} | Node[3]{} | 1                 |
| Node[2]{} | Node[3]{} | 1                 |
| Node[2]{} | Node[4]{} | 1                 |
| Node[3]{} | Node[5]{} | 1                 |
+-------------------------------------------+
15 rows
0 ms

Calculate Clustering Coefficient


start a = node(1)
match (a)--(b)
with a, b as neighbours
match (a)--()-[r]-()--(a)
where id(a) <> id(neighbours) and id(neighbours) <> 0
return count(distinct neighbours), count(distinct r)
+------------------------------------------------+
| count(distinct neighbours) | count(distinct r) |
+------------------------------------------------+
| 4                          | 1                 |
+------------------------------------------------+
1 row
0 ms

The clustering coefficient of a selected node is defined as probability that two randomly selected neighbors are connected to each other. So once having number of neighbors and number of mutual connections we can calculate:

1. The number of possible connections between two neighbors = n!/(2!(n-2)!) = 4!/(2!(4-2)!) = 24/4 = 6
where n is the number of neighbors n = 4

and the actual number of connections is 1,

therefore the clustering coefficient of node 1 is 1/6

References

Cypher Query Language

Networks, Crowds, and Markets: Reasoning About a Highly Connected World By David Easley and Jon Kleinberg







Saturday, June 9, 2012

neo4j: Creating .NET REST API

After spending several days trying to implement some of the existing neo4j libraries for .NET without success, I've decided to create a library on my own.

Personally I prefer Cypher query language over Gremlin so I went through official neo4j documentation and started with development of REST client, based on Newtonsoft.NET.

Below is the code for adding node to index:


public string AddNodeToIndex(int nodeReference, string key, string value, string indexName)
        {
            string response = "";
            JSONAddNodeToIndex jsonObj = new JSONAddNodeToIndex();
            jsonObj.Uri = "http://localhost:7474/db/data/node/" + nodeReference.ToString();
            jsonObj.Value = value;
            jsonObj.Key = key;

            string json = JsonConvert.SerializeObject(jsonObj);
            //index name:favorites : ttp://localhost:7474/db/data/index/node/favorites
            HttpWebRequest req = (HttpWebRequest)WebRequest.Create("http://localhost:7474/db/data/index/node/" + indexName);
            req.Method = "POST";
            req.ContentType = "application/json";
            using (var streamWriter = new StreamWriter(req.GetRequestStream()))
            {

                streamWriter.Write(json);
            }
            var httpResponse = (HttpWebResponse)req.GetResponse();
            using (var streamReader = new StreamReader(httpResponse.GetResponseStream()))
            {
                var responseText = streamReader.ReadToEnd();
                response = responseText;
            }

            return response;
        }
    }


Hopefully soon there will be more code clean enough to share.

Wednesday, April 18, 2012

Detecting Influential Figures During the General Strike of the Public Sector in Slovenia

Creating a program that captures all the tweets that contain the selected keyword or hashtag I was able to retrieve the tweets about the General Public Strike in Slovenia.

By using Gephi I speeded up the sequence of posts for the last two days and recorded the evolution of social network. Once having all the data it is possible to detect the most influential people and communities as well.

Here is the snapshot of the network, created in last two days: Public Strike on Twitter


 


Sunday, October 23, 2011

Netduino: Using Ethernet Shield to Read/Write to SD Card

After successfully running a program that writes a file to SD card and then reads the content I've decided to summarize all the steps required in order to use Netduino with Ethernet Shield and SD card.

First, you have to upgrade the firmware (I used v4.1.1.0 Beta 1 which can be found here. The detailed instructions about firmware upgrade are available at the same address.

Next, you have to solder ICSP pin on your Netduino board and connect D4 with D10 with jumper wire as it is shown on pictures below.





After adding the ICSP header and jumper wire, the hardware is ready and you can write a program to use the SD card reader. Below is the example I used for test and it worked fine.



using System;
using System.Threading;
using Microsoft.SPOT;
using Microsoft.SPOT.Hardware;
using SecretLabs.NETMF.Hardware;
using SecretLabs.NETMF.Hardware.Netduino;
using SecretLabs.NETMF.IO;
using System.IO;

namespace NetduinoSD
{
public class Program
{
public static void Main()
{
StorageDevice.MountSD("SD", SPI.SPI_module.SPI1, Pins.GPIO_PIN_D10);
using (var filestream = new FileStream(@"SD\dontpanic.txt", FileMode.Create))
{
StreamWriter streamWriter = new StreamWriter(filestream);
streamWriter.WriteLine("This is a test of the SD card support on the netduino...This is only a test...");
streamWriter.Close();
}

using (var filestream = new FileStream(@"SD\dontpanic.txt", FileMode.Open))
{
StreamReader reader = new StreamReader(filestream);
Debug.Print(reader.ReadToEnd());
reader.Close();
}

StorageDevice.Unmount("SD");


}

}
}



The whole solution for Visual Studio 2010 can be found here

Thursday, May 19, 2011

Retrieving Cell Informations using Google Location API


Recently I examined the records of events stored on my phone. The list contained the informations about MCC (Mobile Country Code), MNC (Mobile Network Code), LAC (Location Area Code) and CellID from which it is possible to get the informations about the mobile cell's location.

Below is the code that retrieves the data from Google Location API with JSON.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Newtonsoft.Json;
using System.Net;
using System.IO;

namespace CellLocations
{
public class GoogleService
{
public GoogleCell GetCellInfo(string lac, string mnc, string mcc, string cellID)
{
HttpWebRequest myReq = (HttpWebRequest)WebRequest.Create("https://www.google.com/loc/json");
myReq.Method = "POST";
myReq.ContentType = "application/jsonrequest";
string postData = "{\"cell_towers\": [{\"location_area_code\": \"" + lac +"\", \"mobile_network_code\": \"" + mnc + "\", \"cell_id\": \"" + cellID + "\", \"mobile_country_code\": \"" + mcc + "\"}], \"version\": \"1.1.0\", \"request_address\": \"true\"}";
myReq.ContentLength = postData.Length;

StreamWriter stOut = new StreamWriter(myReq.GetRequestStream(), System.Text.Encoding.ASCII);
stOut.Write(postData);
stOut.Close();

HttpWebResponse webresponse;
webresponse = (HttpWebResponse)myReq.GetResponse();
Encoding enc = System.Text.Encoding.UTF8;
StreamReader loResponseStream = new StreamReader(webresponse.GetResponseStream(), enc);

string Response = loResponseStream.ReadToEnd();
loResponseStream.Close();
webresponse.Close();

GoogleCell cell = JsonConvert.DeserializeObject(Response);
return cell;

}
}
}



In order to deserialize the received data an appropriate class has to be created:


public class GoogleCell
{
public GoogleCell() { }
public GoogleCell(string mnc, string mcc, string lac)
{
this.Mnc = mnc;
this.Mcc = mcc;
this.Lac = lac;
}
public string Mnc { get; set; }
public string Mcc { get; set; }
public string Lac { get; set; }
public string CellID { get; set; }
public Location location { get; set; }


public class Location
{
public Location() { }
public Location(string latitude, string longitude, string accuracy)
{
this.latitude = latitude;
this.longitude = longitude;
this.accuracy = accuracy;
}
public string latitude { get; set; }
public string longitude { get; set; }
public string accuracy { get; set; }
public Address address { get; set; }

public class Address
{
public Address() { }
public string country { get; set; }
public string country_code { get; set; }
public string city { get; set; }
public string street { get; set; }
public string street_number { get; set; }
public string postal_code { get; set; }
}
}
}


The JSON library for .NET is available here.