Saturday, January 18, 2014

Understanding UMLS


I've been looking at Unified Medical Language System (UMLS) data this last week. The medical taxonomy we use at work is partly populated from UMLS, so I am familiar with the data, but only after it has been processed by our Informatics team. The reason I was looking at it is because I am trying to understand Apache cTakes, an open source NLP pipeline for the medical domain, which uses UMLS as one of its inputs.

UMLS is provided by the National Library of Medicine (NLM), and consists of 3 major parts: the Metathesaurus, consisting of over 1M medical concepts, a Semantic Network to categorize concepts by semantic type, and a Specialist Lexicon containing data to help do NLP on medical text. In addition, I also downloaded the RxNorm database that contains drug/medication information. I found that the biggest challenge was accessing the data, so I will describe that here, and point you to other web resources for the data descriptions.

Before getting the data, you have to sign up for a license with UMLS Terminology Services (UTS) - this is a manual process and can take a few days over email (I did this couple of years ago so details are hazy). UMLS data is distributed as .nlm files which can (as far as I can tell) be opened and expanded only by the Metamorphosis (mmsys) downloader, available on the UMLS download page. You need to run the following sequence of steps to capture the UMLS data into a local MySQL database. You can use other databases as well, but you would have to do a bit more work.

  1. Download the Metamorphosis (mmsys) tool. Navigate to the UMLS download page and click the link for mmsys.zip. Installation consists of unzipping it into some convenient directory. For example, if you installed under /opt, your mmsys install directory would be /opt/mmsys.
  2. Download the additional data files into your mmsys working directory. I chose the 2013AB UMLS Active Release Files set.
  3. Start up the mmsys tool by running the ./run_linux.sh script. A GUI screen appears - click the Install UMLS button. You will be prompted for an output directory location - I chose /opt/mmsys/data.
  4. The .nlm data is expanded out into three subdirectories under /opt/mmsys/data/2013AB - META, NET and LEX, which correspond to data for the UMLS Metathesaurus, Semantic Network and Specialist Lexicon respectively. Within each subdirectory, data is provided as .RRF files (basically pipe delimited text files).
  5. Login to the MySQL client and create a database to hold this data. The command is: CREATE DATABASE umlsdb DEFAULT CHARACTER SET utf8;
  6. The schema and data load script for the Metathesaurus can be generated using the mmsys tool using the top level menu: "Advanced", then "Copy Load Scripts to Hard Drive". You can specify a target database other than MySQL, but for the other data, you would have to adapt the provided MySQL schema and data load scripts.
  7. Copy the generated script files to the META subdirectory. Update the populate_mysql_db.sh file with the MySQL root directory (/usr for me - MySQL was installed using Ubuntu's apt-get install), the database name, the database user and password. I also had to add --local-infile=1 to the mysql calls in the script because my server was not built to allow data using LOAD LOCAL INFILE. This will take a while (I left mine running over a weekend, it took more than a day to load and build the indexes on a 4 CPU box).
  8. The NET subdirectory comes with its own populate_net_mysql_db.sh script. As before, the MySQL root directory, database name, database user and password need to be updated into the script, as well as pass --local-infile=1 to the mysql calls. The data loads relatively quickly, takes about 5 minutes or so.
  9. The schema and data loading SQL for the LEX subdirectory can be found on this page, which also provides instructions similar to my post. Unlike the previous ones, you need to log into the MySQL client using --local-infile=1 at the mysql command, change to your database, and run the TODO script (that you downloaded) using "SOURCE mysql_lex_tables.sql;". This is also very quick, takes about 5-10 minutes.
  10. For RxNorm, I downloaded the RxNorm_full_01062014.zip file and unzipped it into my /opt/mmsys/data/RxNorm directory. I then navigated to the scripts/mysql subdirectory, and updated the populate_mysql_rxn.sh script to add the MySQL root directory, database name, database user, password and dbserver (localhost). I also set the --local-infile=1 flag to the mysql command. Since the RRF files are in a different directory, all RRF file references in Load_scripts_mysql_rxn_unix.sql need to be offset by ../../rrf/. The load process runs for about 15 minutes.

One thing to note is that the database is not normalized. Information is repeated across tables and presented in different formats. The user of the data must decide how to handle this for his/her application. So what you do to reorganize the data is very much application-dependent. I actually tried to generate a database schema using SQLFairy in XFig format and modify it in Dia before I realized the futility of this exercise.

The table and column names are quite cryptic and the relationships are not evident from the tables. You will need to refer to the data dictionaries for each system to understand it before you do anything interesting with the data. Here are the links to the online references that describe the tables and their relationships for each system better than I can.


The tables in the Metathesaurus that are important from the cTakes point of view are MRCONSO and MRSTY, which contain information about concepts and synonymous terms and semantic types, respectively. Other tables that are important if you are looking for relationships between concepts are MRREL and MRHIER. Co-occurrences of concepts in external texts are found in MRCOC, and MRMAP and MRSMAP contains mappings between terminologies (a very complex mapping described by the docs). RxNorm seems to be structured similarly as the Metathesaurus, except the contents are drugs (although I haven't looked at RxNorm much yet, so this may not be completely accurate). For example the RxNorm analogs of the Metathesaurus tables MRCONSO and MRSTY are RXNCONSO and RXNSTY respectively.

This exercise in trying to understand the UMLS data was quite interesting. While there is some intersection between what we use at work and whats available, there is a lot we don't yet use and which can potentially be used in many interesting ways. In retrospect, I wish I had done this sooner.

Saturday, January 11, 2014

Sentiment Analysis using Classification


At the Introduction to Data Science course I took last year at Coursera, one of our Programming Assignments was to do sentiment analysis by aggregating the positivity and negativity of words in the text against the AFINN word list, a list of words manually annotated with positive and negative valences representing the sentiment indicated by the word.

At the time I wondered if perhaps the word list approach was not too labor intensive, since one must go through a manual process for each domain to identify and score positive and negative words. I figured it may be better to just treat it as a classification problem - manually identifying documents (instead of words) as positive or negative, then use that to train a classifier that can predict the sentiment of unseen documents. But then I got busy with other things and forgot about this until a few days ago, when I came across this post where it describes using classification for sentiment analysis.

The author, Andy Bromberg, describes using NLTK and Python to classify movie reviews as positive or negative. He also refers to a previous attempt using R and the AFINN polarity wordlist, similar to the Programming Assignment I described earlier. In addition, the post describes how feature selection was used to increase the accuracy of the classifier.

As a learning exercise, I decided to do something similar with Scikit-Learn. I used the review training data from the Yelp Recruiting Competition on Kaggle, which I had entered as part of the Peer Assessments in the Intro to Data Science course. Part of the data consisted of 229,907 restaurant reviews in JSON format which had votes by users to indicate usefulness, funnyness and coolness of the review. I used the text as a bag of words and consider a review to be useful, funny and cool respectively if they have more than 0 votes for that attribute. This is used to train 3 binary classifiers that can predict these attributes in new reviews. Following along with Andy's post, I then used the Chi-squared metric to find the most useful features and measured accuracy, precision and recall of the classifiers for different feature sizes.

The code to build and test each classifier using 10-fold cross validation is shown below. We first read the review files, parsing each line into a JSON object, then extracting the text and the useful, funny and cool votes. We then convert the text into a sparse matrix where each word is a feature. In our first pass, we use every word (116,713 unique words) in our text. For each of the useful, funny or cool attribute, we use the matrix and the binarized vote vector for that attribute to construct a Naive Bayes classifier. We then test the classifier and compute accuracy, precision and recall. Next we calculate the most informative features for the attribute using the Chi-squared test, and build models for 1000, 3000, 10000, 30000, and 100000 top features and calculate their accuracy, precision and recall.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# Source: src/yelp_ufc/build_classifier.py
from sklearn.cross_validation import KFold
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_selection import chi2
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.naive_bayes import MultinomialNB
import json
import numpy as np
import operator

def read_data(fname):
  f = open(fname, 'rb')
  texts = []
  ys = []
  for line in f:
    rec = json.loads(line.strip())
    texts.append(rec["text"])
    ys.append([
      1 if int(rec["votes"]["useful"]) > 0 else 0,
      1 if int(rec["votes"]["funny"]) > 0 else 0,
      1 if int(rec["votes"]["cool"]) > 0 else 0])
  f.close()
  return texts, np.matrix(ys)

def vectorize(texts, vocab=[]):
  vectorizer = CountVectorizer(min_df=0, stop_words="english") 
  if len(vocab) > 0:
    vectorizer = CountVectorizer(min_df=0, stop_words="english", 
      vocabulary=vocab)
  X = vectorizer.fit_transform(texts)
  return vectorizer.vocabulary_, X

def cross_validate(ufc_val, X, y, nfeats):
  nrows = X.shape[0]
  kfold = KFold(nrows, 10)
  scores = []
  for train, test in kfold:
    Xtrain, Xtest, ytrain, ytest = X[train], X[test], y[train], y[test]
    clf = MultinomialNB()
    clf.fit(Xtrain, ytrain)
    ypred = clf.predict(Xtest)
    accuracy = accuracy_score(ytest, ypred)
    precision = precision_score(ytest, ypred)
    recall = recall_score(ytest, ypred)
    scores.append((accuracy, precision, recall))
  print ",".join([ufc_val, str(nfeats), 
    str(np.mean([x[0] for x in scores])),
    str(np.mean([x[1] for x in scores])),
    str(np.mean([x[2] for x in scores]))])

def sorted_features(ufc_val, V, X, y, topN):
  iv = {v:k for k, v in V.items()}
  chi2_scores = chi2(X, y)[0]
  top_features = [(x[1], iv[x[0]], x[0]) 
    for x in sorted(enumerate(chi2_scores), 
    key=operator.itemgetter(1), reverse=True)]
  print "TOP 10 FEATURES FOR:", ufc_val
  for top_feature in top_features[0:10]:
    print "%7.3f  %s (%d)" % (top_feature[0], top_feature[1], top_feature[2])
  return [x[1] for x in top_features]

def main():
  ufc = {0:"useful", 1:"funny", 2:"cool"}
  texts, ys = read_data("../../data/yelp_ufc/yelp_training_set_review.json")
  print ",".join(["attrtype", "nfeats", "accuracy", "precision", "recall"])
  for ufc_idx, ufc_val in ufc.items():
    y = ys[:, ufc_idx].A1
    V, X = vectorize(texts)
    cross_validate(ufc_val, X, y, -1)
    sorted_feats = sorted_features(ufc_val, V, X, y, 10)
    for nfeats in [1000, 3000, 10000, 30000, 100000]:
      V, X = vectorize(texts, sorted_feats[0:nfeats])
      cross_validate(ufc_val, X, y, nfeats)

if __name__ == "__main__":
  main()

The top 10 features for each classifier (ie the words that have highest "polarity" for that particular attribute) are shown below. The first column is the Chi-squared score for the word, the second column is the word itself, and the third column is the index of the word in the sparse matrix.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
TOP 10 FEATURES FOR: useful

5170.064  like (60636)
4835.884  just (56649)
2595.147  don (32684)
2456.476  know (58199)
2346.778  really (84130)
2083.032  time (104423)
2063.618  people (76776)
2039.718  place (78659)
1873.081  think (103835)
1858.230  little (61092)

TOP 10 FEATURES FOR: funny

9087.141  like (60636)
6049.875  just (56649)
4848.157  know (58199)
4664.542  don (32684)
3361.983  people (76776)
2649.594  think (103835)
2505.478  oh (72420)
2415.325  ll (61174)
2349.312  really (84130)
2345.851  bar (11472)

TOP 10 FEATURES FOR: cool

6675.123  like (60636)
4616.683  just (56649)
3173.775  know (58199)
3010.526  really (84130)
2847.494  bar (11472)
2715.794  little (61092)
2670.838  don (32684)
2300.151  people (76776)
2217.659  place (78659)
2216.888  ve (110157)

We also plot some graphs for each classifier showing how the accuracy, precision and recall vary with the number of features. The horizontal lines represent the accuracy, precision and recall achieved using the full data set. As can be seen, the metrics improve as more features are added but tend to flatten out eventually.




The code to build these graphs out of the metrics printed out by our classifier training code uses Pandas dataframe plotting functionality and is shown below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Source: src/yelp_ufc/plot_results.py
import matplotlib.pyplot as plt
import pandas as pd
import sys

def main():
  assert(len(sys.argv) == 2)
  df = pd.read_csv("all.csv")
  adf = df.ix[df.attrtype == sys.argv[1]]
  adf_all = adf.ix[adf.nfeats < 0]
  adf_rest = adf.ix[adf.nfeats > 0]
  print adf_all
  print adf_rest
  adf_rest = adf_rest.drop("attrtype", 1)
  adf_rest = adf_rest.set_index("nfeats")
  adf_rest["accuracy_all"] = adf_all[["accuracy"]].values[0][0]
  adf_rest["precision_all"] = adf_all[["precision"]].values[0][0]
  adf_rest["recall_all"] = adf_all[["recall"]].values[0][0]
  adf_rest.plot(title=sys.argv[1])
  plt.show()

if __name__ == "__main__":
  main()

Thats all I have for today. Many thanks to Andy Bromberg for posting his analysis, without which my analysis would not have happened. The code for this blog can also be found on my GitHub.

Saturday, January 04, 2014

Akka Content Ingestion Pipeline, Part V


The title for this post is a bit misleading, since the post is not really about Akka or Content Ingestion. This post describes a Cassandra CQL DAO for delsym, ie, a component in my content ingestion pipeline that I have been discussing under this title. The Cassandra CQL DAO is meant to be used instead of the current MongoDB layer. I chose MongoDB originally because a document-oriented database is a natural fit for a document pipeline, and I built the Cassandra CQL interface later mainly because we use Cassandra at work, and I wanted to see if a column-oriented database presented any special challenges. Plus, I wanted to learn Cassandra CQL, and this seemed a good way to learn it.

Cassandra CQL is a DSL that looks almost like SQL, so it is easy for people with an RDBMS background to learn and use. Under the covers, Cassandra stores the data in column format using various translation rules (described by John Berryman in this Youtube presentation if you are interested). This approach gives you the best of both worlds - the performance of a column database and the simplicity and convenience of a familiar user interface. SAP HANA is another database which uses a this strategy (ie, SQL interface, column storage).

Set up involves downloading and untarring to a given location, configuring and creating directories for logs, data, saved_caches and commitlog, and starting the cassandra daemon (with "cd $CASSANDRA_HOME; bin/cassandra -f"). On another terminal, we need to create the delsymdb keyspace (database in RDBMS) and the documents table. The primary key on URL is set up in the CREATE TABLE. The block below shows this being done in the CQL shell (invoked with "cd $CASSANDRA_HOME; bin/cqlsh")

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
cqlsh> create keyspace delsymdb
   ...   with replication = {'class':'SimpleStrategy', 
   ...   'replication_factor':1};
cqlsh> use delsymdb;
cqlsh:delsymdb> create table documents (
            ...   url text,
            ...   depth int,
            ...   content text,
            ...   fetchmeta map<text,text>,
            ...   fts timestamp,
            ...   text_content text,
            ...   parsemeta map<text,text>,
            ...   pts timestamp,
            ...   its timestamp,
            ...   primary key(url));

I use the Datastax Java driver to access this database from my Scala code. I followed the examples in the Instant Cassandra Query Language book, both to learn CQL (on the CQL shell) and to write the code. I also looked at the code in cassandra-samples. My DAO code is shown below, it extends the same BaseDbDao class as MongoDbDao does, so the methods are identical.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Source: src/main/scala/com/mycompany/delsym/daos/CassandraDbDao.scala
package com.mycompany.delsym.daos

import java.text.SimpleDateFormat
import java.util.Date

import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer

import com.datastax.driver.core.Cluster
import com.typesafe.config.ConfigFactory

class CassandraDbDao extends BaseDbDao {

  val conf = ConfigFactory.load()
  val host = conf.getString("delsym.cassandradb.host")
  val dbname = conf.getString("delsym.cassandradb.dbname")
  val tablename = conf.getString("delsym.cassandradb.collname")
  
  val cluster = Cluster.builder()
                       .addContactPoint(host)
                       .build()
  val session = cluster.connect()
  session.execute("USE %s".format(dbname))
  
  override def insertFetched(url: String, depth: Int, 
      fetchMetadata: Map[String,Any], 
      content: String): Either[FailResult,Unit] = {
    try {
      deleteByUrl(url) match {
        case Left(f) => Left(f)
        case _ => {}
      }
      val fmeta = fetchMetadata.map(kv => 
          "'%s':'%s'".format(kv._1, kv._2.toString))
        .mkString(", ")
      val insfetch = "insert into documents(" + 
        "url, depth, content, fetchmeta, fts) values (" +
        "'%s', %d, '%s', {%s}, '%s');"
        .format(url, depth, content, fmeta, iso8609(new Date()))
      session.execute(insfetch)
      Right()
    } catch {
      case e: Exception => 
        Left(FailResult("Error inserting fetch data", e))
    }
  }
  
  override def insertParsed(url: String, text: String, 
      parseMetadata: Map[String,Any]): 
      Either[FailResult,Unit] = {
    try {
      val pmeta = parseMetadata.map(kv => 
        "'%s':'%s'".format(kv._1, kv._2.toString))
        .mkString(", ")
      val insparse = "insert into documents(" +
        "url, text_content, parsemeta, pts) values (" +
        "'%s', '%s', {%s}, '%s');"
        .format(url, text, pmeta, iso8609(new Date()))
      session.execute(insparse)
      Right()
    } catch {
      case e: Exception => 
        Left(FailResult("Error inserting parse data", e))
    }
  }
  
  override def insertIndexed(url: String): 
      Either[FailResult,Unit] = {
    try {
      val insindex = "insert into documents(url, its) " +
        "values('%s', '%s')"
        .format(url, iso8609(new Date()))
      session.execute(insindex)
      Right()
    } catch {
      case e: Exception => 
        Left(FailResult("Error inserting index data", e))
    }
  }
  
  override def getByUrl(url: String, fields: List[String]):
      Either[FailResult,Map[String,Any]] = {
    try {
      val fldlist = if (fields.isEmpty) '*' 
                    else fields.mkString(",") 
      val query = "select %s from %s where url = '%s'"
        .format(fldlist, tablename, url) 
      val results = session.execute(query)
      val row = results.one()
      val colnames = if (fields.isEmpty)
        row.getColumnDefinitions()
          .asList()
          .map(coldef => coldef.getName())
          .toList
      else fields
      var colvals = ArrayBuffer[(String,Any)]()
      colnames.map(colname => colname match {
        case "url" => 
          colvals += (("url", row.getString("url")))
        case "content" => 
          colvals += (("content", row.getString("content")))
        case "depth" => 
          colvals += (("depth", row.getInt("depth")))
        case "fetchmeta" => {
          val fmeta = row.getMap("fetchmeta", 
            classOf[String], classOf[String])
          fmeta.map(kv => 
            colvals += (("f_" + kv._1, kv._2)))
        }
        case "fts" => 
          colvals += (("fts", row.getDate("fts")))
        case "text_content" => 
          colvals += (("textContent", row.getString("text_content")))
        case "parsemeta" => {
          val pmeta = row.getMap("parsemeta", 
            classOf[String], classOf[String])
          pmeta.map(kv => 
            colvals += (("p_" + kv._1, kv._2)))
        }
        case "pts" => 
          colvals += (("pts", row.getDate("pts")))
        case "its" => 
          colvals += (("its", row.getDate("its")))
        case _ => {}
      })
      Right(colvals.toMap)
    } catch {
      case e: NullPointerException => Right(Map.empty) 
      case e: Exception =>  
        Left(FailResult(e.getMessage(), e))
    }
  }
  
  override def close(): Unit = cluster.shutdown()

  def deleteByUrl(url: String): Either[FailResult,Unit] = {
    try {
      val delquery = 
        "delete from documents where url = '%s';"
        .format(url)
      session.execute(delquery)
      Right()
    } catch {
      case e: Exception => 
        Left(FailResult("Error deleting by URL: [" + 
          url + "]", e))
    }
  }
  
  def iso8609(d: Date): String = {
    lazy val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
    sdf.format(d)
  }
}

As you can see, the code for the getByUrl() method is considerably more verbose than its MongoDB counterpart. This is because the driver does not provide a generalized method to get an object, you must specify the data type (for example, getString, getInt, etc). A better approach may have been to use ColumnDefinitions (similar to the ResultSetMetaData in regular JDBC) and maybe I will in a later iteration once I understand it better. For now, anytime we want to alter our table to add/delete columns, we will need to update this method.

Also, an additional method deleteByUrl() has to be defined. This is used by the insertFetched() method to first delete all columns for the page before new information is added in. This was not necessary in MongoDb, since the entire document would be replaced during insert. Another issue is that the metadata for fetch and parse can no longer be persisted in their native form - since the data type is map<text,text> the value must be String.

CQL also supports PreparedStatements (based on examples in cassandra-samples) but I could not get it to work - no errors were thrown, the inserts just did not happen. I ended up composing the SQL in code and then just passing it to session.execute() - the added benefit of this approach is that you can try it on the CQL shell if it doesn't work - the error messages are usually more informative. Another thing to note is that single and double quotes are not interchangeable as they are in most SQL dialects.

The test below expects a running Cassandra instance with the database and table created as described above. It runs the DAO methods and verifies the results by looking up data in the table at the end of each call.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// Source: src/test/scala/com/mycompany/delsym/daos/CassandraDbDaoTest.scala
package com.mycompany.delsym.daos

import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfterAll

class CassandraDbDaoTest extends FunSuite
                         with BeforeAndAfterAll {

  val cassDao = new CassandraDbDao()
  
  override def afterAll() = cassDao.close()
  
  test("get non-existent record") {
    val url = "http://www.google.com"
    cassDao.deleteByUrl(url)
    val result = 
      cassDao.getByUrl(url, List.empty)
    result match {
      case Left(f) => fail("unexpected exception")
      case Right(r) => {
        Console.println("result=[" + r + "]")
        assert(r != null)
        assert(r.isEmpty)
      }
    }
  }
  
  test("insert after fetch") {
    val url = "http://www.google.com"
    val content = "<HTML>Some arbitary content</HTML>"
    val fetchMetadata = List(
       ("feed_title", "Some arbitary title"),
       ("feed_summary", "Jack the giant killer")).toMap
    cassDao.insertFetched(url, 0, fetchMetadata, content)
    cassDao.getByUrl(url, List.empty) match {
      case Right(row) => {
        Console.println("row (after fetch, #-cols:" + 
          row.size + ") = " + row)
        assert(row != null)
        assert(! row.isEmpty)
        assert(row.size == 9)
      }
      case Left(f) => fail("unexpected exception")
    }
  }
  
  test("insert after parse") {
    val url = "http://www.google.com"
    val textContent = "Some arbitary content"
    val parseMetadata = List(
       ("title", "The real title"),
       ("author", "Jack G Killer")).toMap
    cassDao.insertParsed(url, textContent, parseMetadata)
    cassDao.getByUrl(url, List.empty) match {
      case Right(row) => {
        Console.println("row (after parse, #-cols:" + 
          row.size + ") = " + row)
        assert(row != null)
        assert(! row.isEmpty)
        assert(row.size == 11)
      }
      case Left(f) => fail("unexpected exception")
    }
  }
  
  test("insert after index") {
    val url = "http://www.google.com"
    cassDao.insertIndexed(url)
    cassDao.getByUrl(url, List.empty) match {
      case Right(row) => {
        Console.println("row (after index, #-cols:" + 
          row.size + ") = " + row)
        assert(row != null)
        assert(! row.isEmpty)
        assert(row.size == 11)
      }
      case Left(f) => fail("unexpected exception")
    }
  }

  test("get inserted record, selected fields") {
    val url = "http://www.google.com"
    val result = cassDao.getByUrl(url, List("pts", "parsemeta"))
    result match {
      case Right(row) => {
        Console.println("partial row get = " + row)
        assert(row != null)
        assert(! row.isEmpty)
        assert(row.size == 3)
      }
      case Left(f) => {
        f.e.printStackTrace()
        fail("unexpected exception", f.e)
      }
    }
  }
}

And thats all I have for today. Hopefully next post I will talk about something else :-).

Wednesday, January 01, 2014

Akka Content Ingestion Pipeline, Part IV


Happy New Year! Over the last three posts, I've described an Akka based Content Ingestion Pipeline modelled after the NutchGORA pipeline. This pipeline was my vehicle for learning Akka, something I've been planning to do for a while. In this post (the final installment on this series, at least so far), I explore how to distribute this application horizontally across multiple machines (scale out).

Akka is distributed by design. The code I've built so far can be regarded as a single server version of a distributed system. According to the Akka remoting documentation (emphasis mine):

Everything in Akka is designed to work in a distributed setting: all interactions of actors use purely message passing and everything is asynchronous. This effort has been undertaken to ensure that all functions are available equally when running within a single JVM or on a cluster of hundreds of machines. The key for enabling this is to go from remote to local by way of optimization instead of trying to go from local to remote by way of generalization.

Akka can work with remote Actors in two ways, either by looking them up in a remote ActorSystem, or by creating them in the remote ActorSystem. I use the latter approach. The components that do the heavy lifting in the pipeline are the workers, and scaling out to handle more incoming requests would imply increasing the number of workers or making them faster, both of which can be done by giving them their own dedicated hardware.

The architecture diagram has been updated with the distribution boundaries, they are indicated by the gray boxes below. The master node is the large gray box on the top, and contains the REST Interface, the Controller and the Router actors. The worker nodes (can be an array of nodes for each worker class) are the ones that wrap the Fetch, Parse and Index worker arrays.


Each of these nodes are wrapped in an Akka ActorSystem, which can be accessed by an URI from other ActorSystems. So in addition to the HTTP interface that the master node exposes to the outside world, it also exposes a host:port and has a name that other Akka ActorSystems can use to communicate with it.

For testing, I configured the pipeline with just 2 ActorSystems - the master node listening on localhost:2552 and identified by URI akka.tcp://DelSym@localhost:2552, and one remote node listening on localhost:2553 and identified by URI akka.tcp://remote@localhost:2553. Here is some code to create a named (name supplied from command line) remote Akka ActorSystem using configuration parameters in the remote.conf file.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Source: src/main/scala/com/mycompany/delsym/remote/RemoteAkka.scala
package com.mycompany.delsym.remote

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

object RemoteAkka extends App {

  val name = if (args.isEmpty) "remote" else args(0)
  
  val conf = ConfigFactory.load("remote")
  val host = conf.getString("akka.remote.netty.tcp.hostname")
  val port = conf.getInt("akka.remote.netty.tcp.port")
  
  val system = ActorSystem(name, conf)
  Console.println("Remote system [%s] listening on %s:%d"
    .format(name, host, port))
  
  sys.addShutdownHook {
    Console.println("Shutting down Remote Akka")
    system.shutdown
  }
}

The remote.conf file looks like this. This is meant to be used in order to start up ActorSystems on multiple nodes in a network.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Source: src/main/resources/remote.conf
akka {
  log-dead-letters-during-shutdown = off
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2553
    }
  }
}

I then added a property in application.conf to specify a list of ActorSystem URIs for the routers. The routers are Round Robin routers, so giving them a list of ActorSystem URIs will cause them to cycle through the URIs, creating remote Actors and distributing evenly across multiple remote ActorSystems. The Controller Actor code (which instantiates the routers) has been modified to create local workers if the node list is empty and remote workers otherwise. The updated code for the Controller is shown below:

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Source: src/main/scala/com/mycompany/delsym/actors/Controller.scala
package com.mycompany.delsym.actors

import scala.collection.JavaConversions.asScalaBuffer
import scala.concurrent.duration.DurationInt

import com.mycompany.delsym.daos.HtmlOutlinkFinder
import com.mycompany.delsym.daos.MockOutlinkFinder
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigList

import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.AddressFromURIString
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.actor.actorRef2Scala
import akka.remote.routing.RemoteRouterConfig
import akka.routing.RoundRobinRouter
import akka.routing.RouterConfig

class Controller extends Actor with ActorLogging {

  override val supervisorStrategy = OneForOneStrategy(
      maxNrOfRetries = 10,
      withinTimeRange = 1.minute) {
    case _: Exception => SupervisorStrategy.Restart
  }
  
  val reaper = context.actorOf(Props[Reaper], name="reaper")

  val conf = ConfigFactory.load()
  val numFetchers = conf.getInt("delsym.fetchers.numworkers")
  val fetchNodes = conf.getList("delsym.fetchers.nodes")
  
  val numParsers = conf.getInt("delsym.parsers.numworkers")
  val parseNodes = conf.getList("delsym.parsers.nodes")
  
  val numIndexers = conf.getInt("delsym.indexers.numworkers")
  val indexNodes = conf.getList("delsym.indexers.nodes")
  
  val testUser = conf.getBoolean("delsym.testuser")
  val outlinkFinder = if (testUser) new MockOutlinkFinder()
                      else new HtmlOutlinkFinder()
  
  val queueSizes = scala.collection.mutable.Map[String,Long]()
  
  val fetchers = context.actorOf(Props[FetchWorker]
    .withRouter(buildRouter(numFetchers, fetchNodes)), 
    name="fetchers")
  reaper ! Register(fetchers)
  queueSizes += (("fetchers", 0L))

  val parsers = context.actorOf(Props[ParseWorker]
    .withRouter(buildRouter(numParsers, parseNodes)), 
    name="parsers")
  reaper ! Register(parsers)
  queueSizes += (("parsers", 0L))
  
  val indexers = context.actorOf(Props[IndexWorker]
    .withRouter(buildRouter(numIndexers, indexNodes)),
    name="indexers")
  reaper ! Register(indexers)
  queueSizes += (("indexers", 0L))

  def receive = {
    case m: Fetch => {
      increment("fetchers")
      fetchers ! m
    }
    case m: FetchComplete => {
      decrement("fetchers")
      if (m.fwd) parsers ! Parse(m.url)
    }
    case m: Parse => {
      increment("parsers")
      parsers ! m
    }
    case m: ParseComplete => {
      decrement("parsers")
      outlinks(m.url).map(outlink => 
        fetchers ! Fetch(outlink._1, outlink._2, outlink._3))
      if (m.fwd) indexers ! Index(m.url)
    }
    case m: Index => {
      increment("indexers")
      indexers ! m
    }
    case m: IndexComplete => {
      decrement("indexers")
    }
    case m: Stats => {
      sender ! queueSize()
    }
    case m: Stop => {
      reaper ! Stop(0)
    }
    case _ => log.info("Unknown message received.")
  }
  
  def buildRouter(n: Int, nodes: ConfigList): RouterConfig = {
    if (nodes.isEmpty) RoundRobinRouter(n)
    else {
      val addrs = nodes.unwrapped()
        .map(node => node.asInstanceOf[String])
        .map(node => AddressFromURIString(node))
        .toSeq
      RemoteRouterConfig(RoundRobinRouter(n), addrs)
    }
  }
  
  def queueSize(): Stats = Stats(queueSizes.toMap)
  
  def outlinks(url: String): 
      List[(String,Int,Map[String,String])] = {
    outlinkFinder.findOutlinks(url) match {
      case Right(triples) => triples
      case Left(f) => List.empty
    }
  }
  
  def increment(key: String): Unit = {
    queueSizes += ((key, queueSizes(key) + 1))
  }
  
  def decrement(key: String): Unit = {
    queueSizes += ((key, queueSizes(key) - 1))
  }
}

The documentation indicates that a better approach would be to declare the routers in configuration, so the local configuration would be different from the distributed configuration. I did not do this because my test case refers to the routers as /controller/* but the actual code refers to it as /api/controller/* (I should probably change the test code but I was too lazy). But in any case, changing from a local to a remote router configuration is simply a matter of wrapping the Router Configuration with a RemoteRouterConfig (buildRouter function in the code above), so this approach works fine also.

Going from local to remote also requires you to think about serialization. I have chosen to use Java serialization, and I have configured Akka (via the application.conf file) to automatically use Java serialization for my messages. In addition, the distributed version of the master ActorSystem also exposes its own address in the configuration and sets the provider to a remote ActorRef provider. The other important difference is the non empty nodes list value under the delsym namespace for each of the fetcher, parser and indexer. The remote configuration is shown below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// Source: src/main/resources/application.conf.remote
akka {
  loglevel = INFO
  stdout-loglevel = INFO
  akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
  log-dead-letters-during-shutdown = off
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2552
      }
    }
    serializers {
      java = "akka.serialization.JavaSerializer"
    }
    serialization-bindings {
      "com.mycompany.delsym.actors.DelsymMessage" = java
    }
  }
}

spray {
  can {
    server {
      server-header = "DelSym REST API"
    }
  }
}

delsym {
  testuser = true
  fetchers {
    numworkers = 5
    refreshIntervalDays = 30
    numRetries = 3
    nodes = ["akka.tcp://remote@127.0.0.1:2553"]
  }
  parsers {
    numworkers = 5
    nodes = ["akka.tcp://remote@127.0.0.1:2553"]
  }
  indexers {
    numworkers = 5
    nodes = ["akka.tcp://remote@127.0.0.1:2553"]
  }
  mongodb {
    host = "127.0.0.1"
    port = 27017
    dbname = "delsymdb"
    collname = "documents"
  }
  solr {
    server = "http://127.0.0.1:8983/solr/collection1/"
    dbfieldnames = "_id,url,p_title,p_author,textContent"
    solrfieldnames = "id,url,title,author,text"
    commitInterval = 10
  }
  rest {
    host = "127.0.0.1"
    port = 8080
    timeout = 1
  }
}

There are now 3 versions of application.conf in the Delsym repo on GitHub. You will have to link to the correct one depending on whether you want to run the mock tests, run in local (all actors on single ActorSystem) or remote (multiple ActorSystems) mode.

The effort to build the code for this part of the pipeline was mostly conceptual, ie, understanding how the different components fit together. I found the following Akka reference pages very useful. The pages are all for Akka version 2.2.3 (latest stable version) that I used for this work - the default pages that show up (in response to a Google search for example) are for version 2.3 which is still in development. The 2.3 code is different enough for this detail to be annoying, so mentioning it here.


In addition, I also found the akka-sample-remote-scala useful, although the pattern shown there is slightly different from what I used. Another useful source was the Remoting chapter from the Akka Essentials book.

I was able to run the ActorFlowTest unit test with Mock workers (minus the asserts, since the workers update counters on the remote ActorSystem which I no longer have control over) and verify from the logs that the fetching, parsing and indexing happen on my remote ActorSystem at localhost:2553. The code also exits cleanly which means Deathwatch works fine with remote workers. However, I saw lots of messages sent to the dead-letter mailbox which I haven't been able to figure out yet (they are not application messages) - I will post an update here (and bugfix to the DelSym GitHub repo once I do.