Tags

, , ,

In continuation to my previous post on sentiment analysis, here lets explore further on performing the same using RHadoop.

What will be covered?

  • Environment & Pre-requisites
  • Rhadoop in action
    • Setting Rhadoop environment variables
    • Setting working folder paths
    • Loading data
    • Scoring function
    • Writing Mapper
    • Writing Reducer
    • Run your Map-Reduce program
    • Read data output from hadoop to R data frame

Environment:

I have performed this analysis on below given set up:

Pre-requisites:

Ensure that all hadoop process are running. You can do this by running the hadoop command on your terminal

start-dfs.sh and start-yarn.sh

Then, run the command jps on your terminal and the result should look similar to below screen shot:

11

RHadoop In Action:

Set up the environment variables, and note that the path may change based on your version of Ubuntu and Hadoop (I’m using Hadoop 2.4.0) installation

Sys.setenv("HADOOP_CMD"="/usr/local/hadoop/bin/hadoop"
Sys.setenv("HADOOP_STREAMING"="/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.4.0.jar")

Setting folder paths:

To ease the testing process during development stage flexibility of switching between local and hadoop environment would be useful. In the below code setting Local = T would use files from local folder otherwise Local = F to use from hadoop

# Root folder path
setwd('/home/manohar/example/Sentiment_Analysis')

# Set "LOCAL" variable to T to execute using rmr's local backend.
# Otherwise, use Hadoop (which needs to be running, correctly configured, etc.)
LOCAL=F

if (LOCAL)
{
  rmr.options(backend = 'local')
  
  # we have smaller extracts of the data in this project's 'local' subdirectory
  hdfs.data.root = '/home/manohar/example/Sentiment_Analysis/'
  hdfs.data = file.path(hdfs.data.root, 'data', 'data.csv')
  
  hdfs.out.root = hdfs.data.root
  
} else {
  rmr.options(backend = 'hadoop')
  
  # assumes 'Sentiment_Analysis/data' input path exists on HDFS under /home/manohar/example
  
  hdfs.data.root = '/home/manohar/example/Sentiment_Analysis/'
  hdfs.data = file.path(hdfs.data.root, 'data')
  
  # writes output to 'Sentiment_Analysis' directory in user's HDFS home (e.g., /home/manohar/example/Sentiment_Analysis/)
  hdfs.out.root = 'Sentiment_Analysis'
}

hdfs.out = file.path(hdfs.out.root, 'out')

 

Loading Data:

Below code will copy the file from local to hadoop, if file already exists then will return TRUE

# equivalent to hadoop dfs -copyFromLocal
hdfs.put(hdfs.data,  hdfs.data)

 

Our data is in csv file, so setting the input format for better code readability especially in for the mapper stage

# asa.csv.input.format() - read CSV data files and label field names
# for better code readability (especially in the mapper)
#
asa.csv.input.format = make.input.format(format='csv', mode='text', streaming.format = NULL, sep=',',
                                         col.names = c('ID', 'Name', 'Gender', 'Age','OverAllRating',                                             
                                                       'ReviewType', 'ReviewTitle', 'Benefits', 'Money', 'Experience', 
                                                       'Purchase', 'claimsProcess', 'SpeedResolution', 'Fairness',            
                                                       'ReviewDate', 'Review', 'Recommend', 'ColCount'),
                                         stringsAsFactors=F)

 

Load opinion lexicons, the files and paper on opinion lexicons can be found here

pos_words <- scan('/home/manohar/example/Sentiment_Analysis/data/positive-words.txt', what='character',     comment.char=';')
neg_words <- scan('/home/manohar/example/Sentiment_Analysis/data/negative-words.txt', what='character', comment.char=';')

 

Scoring Function:

Below is the main function that calculates the sentiment score, written by Jeffrey Breen (source here!)

score.sentiment = function(sentence, pos.words, neg.words)
{
  require(plyr)
  require(stringr)
  
  score = laply(sentence, function(sentence, pos.words, neg.words) {
    # clean up sentences with R's regex-driven global substitute, gsub():
    sentence = gsub('[[:punct:]]', '', sentence)
    sentence = gsub('[[:cntrl:]]', '', sentence)
    sentence = gsub('\\d+', '', sentence)
    # and convert to lower case:
    sentence = tolower(sentence)
    
    # split into words. str_split is in the stringr package
    word.list = str_split(sentence, '\\s+')
    # sometimes a list() is one level of hierarchy too much
    words = unlist(word.list)
    
    # compare our words to the dictionaries of positive & negative terms
    pos.matches = match(words, pos.words)
    neg.matches = match(words, neg.words)
    
    # match() returns the position of the matched term or NA
    # we just want a TRUE/FALSE:
    pos.matches = !is.na(pos.matches)
    neg.matches = !is.na(neg.matches)
    
    # and conveniently enough, TRUE/FALSE will be treated as 1/0 by sum():
    score = sum(pos.matches) - sum(neg.matches)
    
    return(score)
  }, pos.words, neg.words)
  
  score.df = data.frame(score)
  return(score.df)
}

 

Mapper:

This is the first stage in the map-reduce process which splits out each word into a separate string (also called as tokenizing the string) and for each word seen, it will output the word and a 1 which is the count value to indicate that it has seen the word one time. Mapper phase works parallel because Hadoop uses divide and conquer approach to slove the problem. This is just to execute your code as fast as possible. In this phase all the computation, processing and distribution of data takes place. However in our case we are using single node the code and logic is fairly simple.

The mapper gets keys and values from the input formatter. In our case, the key is NULL and the value is a data.frame from read.table()

mapper = function(key, val.df) {  
  # Remove header lines
  val.df = subset(val.df, Review != 'Review')
  output.key = data.frame(Review = as.character(val.df$Review),stringsAsFactors=F)
  output.val = data.frame(val.df$Review)
  return( keyval(output.key, output.val) )
}

 

Reducer:

The reduce phase will then sum up the number of times each word was seen and write that sum count together with the word as output.

There are two sub parts that internally works before our code gives its final result, that are shuffle and short. Shuffle just to collect similar type of works into single unit and Short for shorting data into some order.

reducer = function(key, val.df) {  
  output.key = key
  output.val = data.frame(score.sentiment(val.df, pos_words, neg_words))
  return( keyval(output.key, output.val) )  
}

 

Running your Map-Reduce:

Executing the map-reduce logic program.

mr.sa = function (input, output) {
  mapreduce(input = input,
            output = output,
            input.format = asa.csv.input.format,
            map = mapper,
            reduce = reducer,
            verbose=T)
}
out = mr.sa(hdfs.data, hdfs.out)

------- output on screen ------
> out = mr.sa(hdfs.data, hdfs.out)
16/09/09 10:11:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [/usr/local/hadoop/data/hadoop-unjar2099064477903127749/] [] /tmp/streamjob6583314935744487158.jar tmpDir=null
16/09/09 10:11:33 INFO client.RMProxy: Connecting to ResourceManager at localhost/127.0.0.1:8050
16/09/09 10:11:33 INFO client.RMProxy: Connecting to ResourceManager at localhost/127.0.0.1:8050
16/09/09 10:11:36 INFO mapred.FileInputFormat: Total input paths to process : 3
16/09/09 10:11:36 INFO mapreduce.JobSubmitter: number of splits:4
16/09/09 10:11:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1473394634383_0002
16/09/09 10:11:39 INFO impl.YarnClientImpl: Submitted application application_1473394634383_0002
16/09/09 10:11:39 INFO mapreduce.Job: The url to track the job: http://manohar-dt:8088/proxy/application_1473394634383_0002/
16/09/09 10:11:39 INFO mapreduce.Job: Running job: job_1473394634383_0002
16/09/09 10:11:58 INFO mapreduce.Job: Job job_1473394634383_0002 running in uber mode : false
16/09/09 10:11:58 INFO mapreduce.Job:  map 0% reduce 0%
16/09/09 10:12:27 INFO mapreduce.Job:  map 48% reduce 0%
16/09/09 10:12:37 INFO mapreduce.Job:  map 100% reduce 0%
16/09/09 10:13:22 INFO mapreduce.Job:  map 100% reduce 100%
16/09/09 10:13:35 INFO mapreduce.Job: Job job_1473394634383_0002 completed successfully
16/09/09 10:13:36 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=1045750
        FILE: Number of bytes written=2580683
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=679293
        HDFS: Number of bytes written=578577
        HDFS: Number of read operations=15
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=4
        Launched reduce tasks=1
        Data-local map tasks=4
        Total time spent by all maps in occupied slots (ms)=148275
        Total time spent by all reduces in occupied slots (ms)=53759
        Total time spent by all map tasks (ms)=148275
        Total time spent by all reduce tasks (ms)=53759
        Total vcore-seconds taken by all map tasks=148275
        Total vcore-seconds taken by all reduce tasks=53759
        Total megabyte-seconds taken by all map tasks=151833600
        Total megabyte-seconds taken by all reduce tasks=55049216
    Map-Reduce Framework
        Map input records=9198
        Map output records=1818
        Map output bytes=1037580
        Map output materialized bytes=1045768
        Input split bytes=528
        Combine input records=0
        Combine output records=0
        Reduce input groups=1616
        Reduce shuffle bytes=1045768
        Reduce input records=1818
        Reduce output records=1720
        Spilled Records=3636
        Shuffled Maps =4
        Failed Shuffles=0
        Merged Map outputs=4
        GC time elapsed (ms)=1606
        CPU time spent (ms)=22310
        Physical memory (bytes) snapshot=1142579200
        Virtual memory (bytes) snapshot=5270970368
        Total committed heap usage (bytes)=947912704
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=678765
    File Output Format Counters 
        Bytes Written=578577
    rmr
        reduce calls=1616
16/09/09 10:13:36 INFO streaming.StreamJob: Output directory: /Sentiment_Analysis/out

 

Load output from hadoop to R data frame:

Read the output from hadoop folder to a R variable and convert it to data frame for further processing.

results = from.dfs(out)

# put the result in a dataframe
df = sapply(results,c)
df = data.frame(df) # convert to dataframe
colnames(df) <- c('Review', 'score') # assign column names

print(head(df))

------- Result -----
                                             Review score
1                              Very good experience     1
2                         It was a classic scenario     1
3              I have chosen  for all my insurances     0
4           As long as customers understand the t&c     0
5    time will tell if  live up to our expectations     0
6 Good price  good customer service happy to help..     3

 

Now we have the sentiment score for each text. This opens up opportunity for further analysis such as classifying emotion, polarity and a whole lot of visualization for insight. Please see my previous post here to learn more about this.

You can find the full working code in my github account here!

 

Advertisements