Wednesday, April 04, 2012

Generating Unigram and Bigrams into MySQL from Hadoop SequenceFiles

In my previous post, I described how I used GNU Parallel to read a fairly large Lucene index into a set of Hadoop SequenceFiles. The objective is to use the data in the index to build a Unigram and Bigram Language Model for a spelling corrector. Since the spelling correction code is going to be called from a web application, I figured a good place to store the unigrams and bigrams in a MySQL database.

This is a fairly trivial task from the point of view of writing Map-Reduce code (the unigram writer is just a minor variation of the WordCount example), but this is the first time I was using Map-Reduce to crunch through a reasonably large dataset. I was also running Hadoop on a single large machine in pseudo-distributed mode, unlike previously where I mostly used it in local mode to build little proofs of concept. So there were certain things I learned about running Hadoop, which I will mention as they come up. But first, the code.

Java Code

As stated above, the code for both the UnigramCounter and BigramCounter are fairly trivial examples of Map-Reduce code. But I include them anyway, for completeness.

UnigramCounter.java

The UnigramCounter Mapper splits up the input text into words and writes them out to the context, where the Reducer picks them up and aggregates the counts, computes the soundex and metaphone values for the word, and writes the record out to a database table. The soundex and metaphones are for finding sound-alikes - I am not sure which one will give me the best results, so I compute both.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package com.mycompany.spell3.train;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.codec.language.Metaphone;
import org.apache.commons.codec.language.Soundex;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class UnigramCounter extends Configured implements Tool {

  private static final String PROP_DBNAME = "dbname";
  private static final String PROP_DBUSER = "dbuser";
  private static final String PROP_DBPASS = "dbpass";

  private static final String NULL_PATH = "/prod/hadoop/dummy";

  public static class MapClass extends 
      Mapper<LongWritable,Text,Text,IntWritable> {

    private static final IntWritable ONE = new IntWritable(1);
    
    @Override
    protected void map(LongWritable key, Text value, 
        Context context) throws IOException, 
        InterruptedException {
      String s = StringUtils.lowerCase(value.toString());
      String[] words = s.split("[^a-z]+");
      for (String word : words) {
        context.write(new Text(word), ONE);
      }
    }
  }
  
  public static class ReduceClass extends 
      Reducer<Text,IntWritable,Text,IntWritable> {

    private String MYSQL_DB_DRIVER = "com.mysql.jdbc.Driver";

    private Connection conn;
    private PreparedStatement ps;
    private AtomicInteger counter = new AtomicInteger(0);
    private Soundex soundex;
    private Metaphone metaphone;
    
    @Override
    protected void setup(Context context) 
        throws IOException, InterruptedException {
      try {
        Class.forName(MYSQL_DB_DRIVER);
        Configuration conf = context.getConfiguration();
        conn = DriverManager.getConnection(
          "jdbc:mysql://localhost:3306/" + conf.get(PROP_DBNAME),
          conf.get(PROP_DBUSER), conf.get(PROP_DBPASS));
        conn.setAutoCommit(false);
        ps = conn.prepareStatement(
          "insert into unigram_counts(word,soundex,metaphone,cnt) " +
          "values (?,?,?,?)");
        soundex = new Soundex();
        metaphone = new Metaphone();
      } catch (Exception e) {
        throw new IOException(e);
      }
    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, 
        Context context) throws IOException, 
        InterruptedException {
      int sum = 0;
      for (IntWritable value : values) {
        sum += value.get();
      }
      insertToDb(key.toString(), sum);
    }
    
    private void insertToDb(String word, int count) 
        throws IOException {
      try {
        ps.setString(1, word);
        ps.setString(2, soundex.soundex(word));
        ps.setString(3, metaphone.metaphone(word));
        ps.setInt(4, count);
        ps.execute();
        int current = counter.incrementAndGet();
        if (current % 1000 == 0) {
          conn.commit();
        }
      } catch (SQLException e) {
        System.out.println("Failed to insert unigram: " + word);
        e.printStackTrace();
      }
    }

    @Override
    protected void cleanup(Context context) 
        throws IOException, InterruptedException {
      if (ps != null) {
        try { ps.close(); } catch (SQLException e1) {}
      }
      if (conn != null) {
        try {
          conn.commit();
          conn.close();
        } catch (SQLException e) {
          throw new IOException(e);
        }
      }
    }
  }
  
  @Override
  public int run(String[] args) throws Exception {
    Path input = new Path(args[0]);
    Path output = new Path(NULL_PATH);
    
    Configuration conf = getConf();
    conf.set(PROP_DBNAME, args[1]);
    conf.set(PROP_DBUSER, args[2]);
    conf.set(PROP_DBPASS, args[3]);
    
    Job job = new Job(conf, "Unigram-Counter");
    
    FileInputFormat.setInputPaths(job, input);
    FileOutputFormat.setOutputPath(job, output);
    
    job.setJarByClass(UnigramCounter.class);
    job.setMapperClass(MapClass.class);
    job.setReducerClass(ReduceClass.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setNumReduceTasks(5);
    
    boolean succ = job.waitForCompletion(true);
    if (! succ) {
      System.out.println("Job failed, exiting");
      return -1;
    }
    return 0;
  }

  public static void main(String[] args) throws Exception {
    if (args.length != 4) {
      System.out.println(
        "Usage: UnigramCounter path_to_seqfiles output_db db_user db_pass");
      System.exit(-1);
    }
    int res = ToolRunner.run(new Configuration(), 
      new UnigramCounter(), args);
    System.exit(res);
  }
}

BigramCounter.java

The BigramCounter Mapper uses a Sentence BreakIterator to break the input up into sentences, computes bigrams of word pairs within each sentence and writes them out to the context, where the Reducer picks them up, aggregates the counts and writes the bigram and count to another database table.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package com.mycompany.spell3.train;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.BreakIterator;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class BigramCounter extends Configured implements Tool {

  private static final String PROP_DBNAME = "dbname";
  private static final String PROP_DBUSER = "dbuser";
  private static final String PROP_DBPASS = "dbpass";

  private static final String NULL_PATH = "/prod/hadoop/dummy";

  public static class MapClass extends 
      Mapper<LongWritable,Text,Text,IntWritable> {
    
    private static final IntWritable ONE = new IntWritable(1);
    private static final String SENTENCE_START = "<s>";
    private static final String SENTENCE_END = "</s>";
    private static final String WORD_SEPARATOR = "__";
    
    @Override
    protected void map(LongWritable key, Text value, 
        Context context) 
        throws IOException, InterruptedException {
      String s = value.toString();
      BreakIterator sit = BreakIterator.getSentenceInstance();
      sit.setText(s);
      int start = sit.first();
      int end = -1;
      while ((end = sit.next()) != BreakIterator.DONE) {
        String sentence = StringUtils.lowerCase(s.substring(start, end));
        start = end;
        String[] words = sentence.split("[^a-z]+");
        String prevWord = null;
        for (int i = 0; i < words.length; i++) {
          String bigram = null;
          if (i == 0) {
            // begin sentence
            bigram = StringUtils.join(
              new String[] {SENTENCE_START, words[i]}, 
              WORD_SEPARATOR);
          } else if (i == words.length - 1) {
            // end sentence
            bigram = StringUtils.join(
              new String[] {words[i], SENTENCE_END}, 
              WORD_SEPARATOR);
          } else {
            // middle of sentence
            bigram = StringUtils.join(new String[] {
              prevWord, words[i]}, WORD_SEPARATOR);
          }
          context.write(new Text(bigram), ONE);
          prevWord = words[i];
        }
      }
    }
  }
  
  public static class ReduceClass extends 
    Reducer<Text,IntWritable,Text,IntWritable> {

    private static final String MYSQL_DB_DRIVER = "com.mysql.jdbc.Driver";

    private Connection conn;
    private PreparedStatement ps;
    private AtomicInteger counter = new AtomicInteger(0);
    
    @Override
    protected void setup(Context context) 
        throws IOException, InterruptedException {
      try {
        Class.forName(MYSQL_DB_DRIVER);
        Configuration conf = context.getConfiguration();
        conn = DriverManager.getConnection(
          "jdbc:mysql://localhost:3306/" + conf.get(PROP_DBNAME), 
          conf.get(PROP_DBUSER), conf.get(PROP_DBPASS));
        conn.setAutoCommit(false);
        ps = conn.prepareStatement(
          "insert into bigram_counts(bigram,cnt) values (?,?)");
      } catch (Exception e) {
        throw new IOException(e);
      }
    }
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, 
        Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable value : values) {
        sum += value.get();
      }
      insertToDb(key.toString(), sum);
    }
    
    private void insertToDb(String bigram, int sum) 
        throws IOException {
      try {
        ps.setString(1, bigram);
        ps.setInt(2, sum);
        ps.execute();
        int current = counter.incrementAndGet();
        if (current % 1000 == 0) {
          conn.commit();
        }
      } catch (SQLException e) {
        System.out.println("Failed to insert bigram: " + bigram);
        e.printStackTrace();
      }
    }

    @Override
    protected void cleanup(Context context)
        throws IOException, InterruptedException {
      if (ps != null) {
        try { ps.close(); } catch (SQLException e) {}
      }
      if (conn != null) {
        try {
          conn.commit();
          conn.close();
        } catch (SQLException e) {
          throw new IOException(e);
        }
      }
    }
  }
  
  @Override
  public int run(String[] args) throws Exception {
    Path input = new Path(args[0]);
    Path output = new Path(NULL_PATH);
    
    Configuration conf = getConf();
    conf.set(PROP_DBNAME, args[1]);
    conf.set(PROP_DBUSER, args[2]);
    conf.set(PROP_DBPASS, args[3]);
    
    Job job = new Job(conf, "Bigram-Counter");
    
    FileInputFormat.setInputPaths(job, input);
    FileOutputFormat.setOutputPath(job, output);
    
    job.setJarByClass(BigramCounter.class);
    job.setMapperClass(MapClass.class);
    job.setReducerClass(ReduceClass.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setNumReduceTasks(5);
    
    boolean succ = job.waitForCompletion(true);
    if (! succ) {
      System.out.println("Job failed, exiting");
      return -1;
    }
    return 0;
  }

  public static void main(String[] args) throws Exception {
    if (args.length != 4) {
      System.out.println(
        "Usage: BigramCounter path_to_seqfiles output_db db_user db_pass");
      System.exit(-1);
    }
    int res = ToolRunner.run(new Configuration(), 
      new BigramCounter(), args);
    System.exit(res);
  }
}

Hadoop Configuration Changes

Hadoop is built to run on clusters of many medium size machines. What I had instead was one large 16-CPU machine, so I wanted to make sure that its processing power was utilized to the maximum possible. So I made the following changes to mapred-site.xml based on the advice in this StackOverflow page.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
<!-- Source: $HADOOP_HOME/conf/mapred-site.xml -->
<configuration>
...
<property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
  <value>10</value>
  <description/>
</property>

<property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
  <value>10</value>
  <description/>
</property>
</configuration>

In core-site.xml, I changed the location of the hadoop.tmp.dir to a large, relatively unused partition on the box instead of its default location. This was actually in response to a job failure where it ran out of HDFS space. Since at that point I had to rerun the job again anyway, I shut down Hadoop, deleted the old hadoop.tmp.dir and then restarted Hadoop and reformatted the namenode.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
<!-- Source; $HADOOP_HOME/conf/core-site.xml -->
<configuration>

<property>
  <name>hadoop.tmp.dir</name>
  <value>/prod/hadoop/tmp</value>
  <description>A base for other temporary directories.</description>
</property>
...
</configuration>

Since I have only a single data node, I set the dfs.replication in hdfs-site.xml to 1.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
<!-- $HADOOP_HOME/conf/hdfs-site.xml -->
<configuration>

<property>
  <name>dfs.replication</name>
  <value>1</value>
  <description/>
</property>

</configuration>

MySQL Configuration Changes

The default location of the MySQL data directory was in /var/lib/mysql, which was in the "/" partition, too small for my purposes. I actually ran out of disk space in this partition while writing bigrams to MySQL (the job just hangs at a fixed map-reduce completion status). I had to kill the job, shut down MySQL, reconfigure the data directory and the socket location, move the contents over to the new location, and restart MySQL. Here are the configuration changes:

1
2
3
4
5
6
7
# Source: /etc/my.cnf
[mysqld]
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
datadir=/prod/mysql_db
socket=/prod/mysql_db/mysql.sock
...

Deployment

Before this, I used to write shell scripts that set the JARS required by Hadoop and my application in the classpath, and then called Java. When I was doing this, I discovered that you can use the $HADOOP_HOME/bin/hadoop to call your custom Map-Reduce tasks as well, so I decided to use that.

However, I needed to set a few custom JAR files that Hadoop did not have (or need) in its classpath. I was using commons-codec which provided me implementations of Soundex and Metaphone, and I was writing to a MySQL database for which I needed the JDBC driver JAR, plus a few others for functionality I was too lazy to implement on my own.

There are two ways to supply these extra JAR files to the bin/hadoop script. One is by specifying their paths in the -libjars parameter. I thought this was nice, but it didn't work for me - for some reason it could not see the parameters I was passing to my Map-Reduce job via the command line. The second way is to package your custom JARs in the lib subdirectory of your application's JAR file, a so-called fat jar. The fat JAR approach was the one I took, creating it using the simple Ant target shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  <target name="fatjar" depends="compile" description="Build JAR to run in Hadoop">
    <mkdir dir="${maven.build.output}/lib"/>
    <copy todir="${maven.build.output}/lib">
      <fileset dir="${custom.jars.dir}">
        <include name="commons-lang3-3.0.1.jar"/>
        <include name="commons-codec-1.3.jar"/>
        <include name="mysql-connector-java-5.0.5-bin.jar"/>
        ...
      </fileset>
    </copy>
    <jar jarfile="${maven.build.directory}/${maven.build.final.name}-fatjar.jar"
        basedir="${maven.build.output}" excludes="**/package.html"/>
  </target>

Once this is done, the script to run either job is quite simple. I show the cscript to run the BigramCounter below, simply replace with UnigramCounter for the other one.

1
2
3
4
5
6
#!/bin/bash
# Source: bin/bigram_counter.sh
HADOOP_HOME=/opt/hadoop-1.0.1
$HADOOP_HOME/bin/hadoop fs -rmr /prod/hadoop/dummy
$HADOOP_HOME/bin/hadoop jar /path/to/my-fatjar.jar \
  com.mycompany.spell3.train.BigramCounter $*

To run this script from the command line:

1
2
hduser@bigmac:spell3$ nohup ./bigram_counter.sh /prod/hadoop/spell \
  spelldb spelluser spelluser &

Job Killing

I needed to kill the job midway multiple times, either because I discovered I had goofed on some programming issue (incorrect database column names, etc) and the job would start throwing all kinds of exceptions down the line, or because (as mentioned previously), MySQL ran out of disk space. To do this, you need to use bin/hadoops job -kill command.

1
2
3
4
hduser@bigmac:hadoop-1.0.1$ # list out running jobs
hduser@bigmac:hadoop-1.0.1$ bin/hadoop job -list
hduser@bigmac:hadoop-1.0.1$ # kill specific job
hduser@bigmac:hadoop-1.0.1$ bin/hadoop job -kill ${job-id}

Even I had enough sense to not do a kill -9 on the hadoop daemon itself, but there was one time when I did a stop-all.sh and ended up having to throw away all my data because Hadoop got all choked up.

Another little tip is to avoid throwing exceptions from your Mapper or Reducer. A better option is to log it. This is true for any batch job, of course, but I once had one of the jobs fail after about 2 days of processing because of too many exceptions thrown by the Reducer. In the code above, I just used a System.ot.println() to log SQLExceptions if they occur, but its better to use a real logger.

So anyway, after about a week and a half of processing (including all sorts of silly but expensive mistakes), I ended up with approximately 400 million unigrams and 600 million bigrams in the database. Now to figure out how to actually use this information :-).

Update - 2012-04-09: I had a bug in my bigram generation code, which caused bad results, so I reran it. This time the job failed two times in a row, caused by (I suspect) extremely high loads on the MySQL database server. The first time I discovered that the mysql.sock file disappeared, so I terminated the job manually. The second time I found that the mysql.sock file would disappear and then reappear after a while once the load came back down (this is the only place I have found another mention of this) - however, ultimately this job failed as well. I ended up writing the bigrams and counts to text files in HDFS and the job completed in a fraction of the time it took before. So another lesson learned - avoid writing out to external datastores from within Hadoop,

2 comments (moderated to prevent spam):

David Smiley said...

This was an interesting read. But I just cringed looking at what your Hadoop reduce job was doing -- committing to a database after each row! You ultimately concluded to not even write to a database, and sure, that's a possibility, but another is to optimize your reducer so that it uses JDBC batches. I'm not even sure if you need to have all the intermediate commits -- it depends on your database.

Sujit Pal said...

Hi David, it would indeed be cringeworthy if I did that :-). I am actually committing every 1000 rows (see inside the insertToDb() method in both cases). Although come to think of it, 1000 is probably a bit on the low side, I could probably increase it and see better performance. I like the idea of using JDBC batches, thanks for that, I will try it next time.