Sunday, January 29, 2012

Nutch/GORA - Delta Indexing

Background

Delta, or incremental indexing, is pretty much the main reason I am so excited about Nutch/GORA. Replacing segment files with database opens up the possibility of external programs modifying metadata in the database, and thereby modifying the pipeline's behavior to support delta indexing.

For web crawling (Nutch's default use case), the Adaptive Fetch Schedule is an excellent choice. This component (set by db.fetch.schedule.class) will decrease or increase the fetch interval based on whether the page has changed or not. However, if you wanted to use Nutch as an indexing pipeline for a Content Management System (CMS) for example, then you will need to offer the CMS folks a slightly more deterministic way to control what appears in the index.

When a CMS publishes a piece of content, the content should (within some predictable time interval) make it into the index (so it starts appearing in search results). On the other hand, when a CMS unpublishes a piece of content, it should disappear from the index. In this post, I propose an approach that does this, using Nutch/GORA (with custom plugins) and some supporting infrastructure.

Infrastructure

I have previously described using a HTTP server to stage content in a local filesystem to Nutch. I use a similar strategy here, except that my seed file for a given provider (or CMS) is now a dynamically generated "index" file, also served by the HTTP server, that lists all the content available from the provider. Something like this:

In line with this, my script that launches a CherryPy HTTP server now has an additional handler to dynamically generate the index page by recursively scanning the directory (/provider_index), as well as one that serves a named file from disk (/provider). Heres the code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#!/usr/bin/python
import cherrypy
import os
import os.path
import urllib

from cherrypy.lib.static import serve_file

SITES_DIR = "/path/to/your/sites"
SERVER_HOST = "localhost"
SERVER_PORT = 8080

def _accumulate_files(files, dirname, fnames):
  """
  This function gets called for every file (and directory) that is walked
  by os.path.walk. It accumulates the file names found into a flat array.
  The file names accumulated are relative to the providers directory.
  """
  for fname in fnames:
    abspath = os.path.join(dirname, fname)
    if os.path.isfile(abspath):
      abspath = abspath.replace(os.path.join(SITES_DIR, "providers"), "")[1:]
      files.append(abspath)

class Root:

  @cherrypy.expose
  def test(self, name):
    """
    Expose the mock site for testing.
    """
    return serve_file(os.path.join(SITES_DIR, "test", "%s.html" % (name)), \
      content_type="text/html")

  @cherrypy.expose
  def provider_index(self, name):
    """
    Builds an index page of links to all the files for the specified
    provider. The files are stored under sites/providers/$name. The
    function will recursively walk the filesystem under this directory
    and dynamically generate a flat list of links. Path separators in
    the filename are converted to "__" in the URL. The index page can
    be used as the seed URL for this content.
    """
    files = []
    os.path.walk(os.path.join(SITES_DIR, "providers", name), \
      _accumulate_files, files)
    index = "<html><head></head><body><ul>"
    for file in files:
      url = "http://%s:%s/provider/%s" % (SERVER_HOST, SERVER_PORT, \
        urllib.quote_plus(file.replace(os.path.sep, "__")))
      index += """
        <li><a href="%s">%s</a></li>
      """ % (url, url)
    index += "</ul></body></html>"
    return [index]

  @cherrypy.expose
  def provider(self, name):
    """
    Returns the contents of the XML file stored at the location 
    corresponding to the URL provided. The "__" in the URL are converted
    back to file path separators.
    """
    ct = None
    if name.endswith(".xml"):
      ct = "application/xml"
    elif name.endswith(".json"):
      ct = "application/json"
    if ct is None:
      return serve_file(os.path.join(SITES_DIR, "providers", \
        "%s" % name.replace("__", os.path.sep)), \
        content_type = "text/html")
    else:
      return serve_file(os.path.join(SITES_DIR, "providers", \
        "%s" % (urllib.unquote_plus(name).replace("__", os.path.sep))), \
        content_type = ct)

if __name__ == '__main__':
  current_dir = os.path.dirname(os.path.abspath(__file__))
  # Set up site-wide config first so we get a log if errors occur.
  cherrypy.config.update({'environment': 'production',
    'log.access_file': 'site.log',
    'log.screen': True,
    "server.socket_host" : SERVER_HOST,
    "server.socket_port" : SERVER_PORT})
  cherrypy.quickstart(Root(), '/')

My seed file now lists only the URL to the index file, as shown below:

1
http://localhost:8080/provider_index/prov1  u_idx=prov1

Delta Indexing Overview

As I mentioned earlier, the CMS can send either a publish or an unpublish request. A publish request can translate to an Add (if its a new page that doesn't exist in the database) or an Update. An unpublish request would translate to a Delete (or Hide) request at the database level.

For an Add request, we reset the fetch time of the provider page to the current time minus the fetch interval, making it eligible for a fetch in the next crawl run. Since it is dynamically generated off the file system, the added file is guaranteed to be in this file (since it exists in the filesystem). In the first iteration of the recrawl, the index page will be refetched and reparsed, and the second iteration will generate the newly added page(s) from the index page outlinks to the fetch list, and then to the index.

Of course, the index page itself should not make it into the index, so we have a custom Index Filter (described later) to filter these out by pattern.

Here is an alternative approach which relies on accurate Last-Modified headers and a more aggressive crawl schedule. With this approach, you don't need an index file or any custom handling for Adds and Updates, but you do need to crawl more frequently. I prefer an event based approach, which is what I have here.

For an Update request, we reset the fetch time of the page specified by the URL, similar to the index page. The first iteration of the next recrawl will pick the changes up and update it into the index.

For a Delete, we do two things - first, we delete the record from Solr. Then we mark the record deleted in Cassandra by setting the status to STATUS_GONE. We add logic to an indexing filter to filter out pages with this status from making it into the index.

Delta Indexing Tool

To do the publish and unpublish on the index, I built a tool (callable using full class name from bin/nutch). For this I built my package under nutch's src/java tree. I had to modify the nutch build.xml slightly to get my code to compile, specifically add the com/mycompany/nutch/**/*.java path to the javac includes in the "compile-core" target of nutch's build.xml.

I had initially planned on building it with GORA so it could be easily translated to other backends as well. The GORA tutorial has examples of retrieving and updating records by key. However, it turns out that the CassandraStore (the gora-cassandra implementation of the DataStore), has an empty implementation of get(String) that returns null.

So I finally settled on using the Hector API (which GORA also uses) to talk directly with the Cassandra database. I am using Hector's template API which feels similar to (and is probably inspired by) Spring's JdbcTemplate. Here is the code for this tool. Not as nice as using GORA (ie not database agnostic), but it'll have to do for now.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Source: src/java/com/mycompany/nutch/tools/DeltaHandler.java
package com.mycompany.nutch.tools;

import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.template.ColumnFamilyResult;
import me.prettyprint.cassandra.service.template.ColumnFamilyTemplate;
import me.prettyprint.cassandra.service.template.ColumnFamilyUpdater;
import me.prettyprint.cassandra.service.template.ThriftColumnFamilyTemplate;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.factory.HFactory;

import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.crawl.CrawlStatus;
import org.apache.nutch.util.TableUtil;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;

public class DeltaHandler {
  
  private static final Log LOG = LogFactory.getLog(DeltaHandler.class);
  
  private static final String PROVIDER_INDEX_URLPREFIX_KEY =
    "mycompany.provider.index.urlprefix";
  private static final String SOLR_URL = "mycompany.solr.url";
  private static final Utf8 U_IDX = new Utf8("u_idx");
  
  private String providerIndexUrlPrefix;
  private CommonsHttpSolrServer solrServer;
  private Keyspace keyspace;
  private ColumnFamilyTemplate<String,String> template;
  
  public DeltaHandler() {
    try {
      init();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
  
  private void init() throws Exception {
    Configuration conf = new Configuration();
    conf.addResource("nutch-default.xml");
    conf.addResource("nutch-site.xml");
    this.providerIndexUrlPrefix = conf.get(PROVIDER_INDEX_URLPREFIX_KEY);
    this.solrServer = new CommonsHttpSolrServer(conf.get(SOLR_URL));
    Cluster cluster = HFactory.getOrCreateCluster("Test Cluster", 
      new CassandraHostConfigurator("localhost:9160"));
    this.keyspace = HFactory.createKeyspace("webpage", cluster);
    this.template = new ThriftColumnFamilyTemplate<String,String>(
      keyspace, "f", StringSerializer.get(), StringSerializer.get()
    );
  }
  
  private void destroy() {
  }
  
  public void publish(String url, String idx) throws Exception {
    String key = TableUtil.reverseUrl(url);
    ColumnFamilyResult<String,String> res = 
      template.queryColumns(key);
    String bas = res.getString("bas"); 
    if (StringUtils.isEmpty(bas)) {
      // requested URL does not exist, should be an "add",
      // reset fetchtime for the index page
      key = TableUtil.reverseUrl(StringUtils.join(new String[] {
        providerIndexUrlPrefix, idx}, "/"));
      res = template.queryColumns(key);
      bas = res.getString("bas");
    }
    if (StringUtils.isEmpty(bas)) return;
    int fetchInterval = Integer.valueOf(res.getString("fi"));
    // update the fetch time to current - fetchInterval so
    // it is eligible for crawling immediately
    ColumnFamilyUpdater<String,String> updater =
      template.createUpdater(key);
    updater.setString("ts", String.valueOf(
      System.currentTimeMillis() - fetchInterval));
    template.update(updater);
  }
  
  public void unpublish(String url, String idx, boolean commit) 
      throws Exception {
    String key = TableUtil.reverseUrl(url);
    ColumnFamilyResult<String,String> res = template.queryColumns(key);
    String bas = res.getString("bas");
    if (StringUtils.isNotEmpty(bas)) {
      System.out.println("found it!");
      ColumnFamilyUpdater<String,String> updater = 
        template.createUpdater(key);
      updater.setString("st", String.valueOf(
        CrawlStatus.STATUS_GONE));
      template.update(updater);
      deleteFromSolr(key, commit);
    }
  }
  
  private void deleteFromSolr(String key, boolean commit) 
      throws Exception {
    solrServer.deleteById(key);
    if (commit) {
      solrServer.commit();
    }
  }
  
  private static void usage() {
    System.out.println(
      "Usage: DeltaHandler publish|unpublish url idx [commit]");
    System.out.println("commit = true|false, only for unpublish");
    System.exit(-1);
  }
  
  public static void main(String[] args) {
    String command = null;
    String url = null;
    String idx = null;
    Boolean commit = null;
    if (args.length > 0) {
      command = args[0];
      if (!("publish".equals(command)) && 
          !("unpublish".equals(command))) {
        usage();
      }
    }
    if ("publish".equals(command)) {
      if (args.length > 2) {
        url = args[1];
        idx = args[2];
      } else {
        usage();
      }
    } else if ("unpublish".equals(command)) {
      if (args.length > 3) {
        url = args[1];
        idx = args[2];
        commit = Boolean.valueOf(args[3]);
      } else {
        usage();
      }
    }
    DeltaHandler handler = null;
    try {
      handler = new DeltaHandler();
      if ("publish".equals(command)) {
        handler.publish(url, idx);
      } else {
        handler.unpublish(url, idx, commit);
      }
    } catch (Exception e) {
      LOG.error(e, e);
    } finally {
      if (handler != null) {
        handler.destroy();
      }
    }
  }
}

This is really only for testing. For high volume use, it would be fairly simple to have this logic live behind a webservice which the CMS code could call. But in any case, here are examples of command line usage of the tool above.

1
2
3
4
5
6
7
sujit@cyclone:local$ # publishing a page
sujit@cyclone:local$ bin/nutch com.mycompany.nutch.tools.DeltaHandler \
  publish http://localhost:8080/provider/prov1__1__000022.xml prov1
sujit@cyclone:local$ # unpublishing a page
sujit@cyclone:local$ bin/nutch com.mycompany.nutch.tools.DeltaHandler \
  unpublish http://localhost:8080/provider/prov1__1__000022.xml prov1 \
  true

I also added a few new properties for this tool (and for the plugin decribed below). Here are the additional properties from my nutch-site.xml file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
<property>
  <name>mycompany.provider.index.urlprefix</name>
  <value>http://localhost:8080/provider_index</value>
  <description>The URL to the content server hosting the provider content  </description>
</property>

<property>
  <name>mycompany.solr.url</name>
  <value>http://localhost:8983/solr</value>
  <description>The SOLR URL to publish/unpublisth to</description>
</property>

<property>
  <name>mycompany.provider.index.pattern</name>
  <value>provider_index</value>
  <description>Pattern for "meta" pages listing other local pages. This
  page is needed for delta indexing to manage on-demand add/delete/update
  of pages from collections which need this feature. But it should not
  be indexed, so need to be removed during the indexing step.</description>
</property>

Delta Indexing Filter Plugin

As mentioned above, you probably don't want your index pages to make it into the index, since they are basically link farms and have no useful content (for the search user). So you want to suppress these pages from ever making it into the index.

The second class of pages would be the ones marked GONE by the unpublish command. The unpublish deletes the record from the Solr index, but you want to make sure that the page doesn't slip in on the next solrindex call. So we build an indexing filter which filters out these two category of pages.

The functionality above is built into the DeltaIndexFilter, a simple Nutch IndexFilter implementation. Here is the declaration for this filter from my plugin.xml file.

1
2
3
4
5
6
  <extension id="com.mycompany.nutch.indexer.provider"
      name="Delta Indexing related Page Index Suppressor"
      point="org.apache.nutch.indexer.IndexingFilter">
    <implementation id="mycompany-indexer-provider"
        class="com.mycompany.nutch.indexer.delta.DeltaIndexingFilter"/>
  </extension>

and here is the code for the DeltaIndexFilter.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/indexer/delta/DeltaIndexingFilter.jav
package com.mycompany.nutch.indexer.delta;

import java.util.Collection;
import java.util.HashSet;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.crawl.CrawlStatus;
import org.apache.nutch.indexer.IndexingException;
import org.apache.nutch.indexer.IndexingFilter;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.storage.WebPage.Field;

public class DeltaIndexingFilter implements IndexingFilter {

  private static final String PROVIDER_INDEX_PATTERN_KEY = 
    "mycompany.provider.index.pattern";
  
  private static final Set<WebPage.Field> FIELDS = 
    new HashSet<WebPage.Field>();
  static {
    FIELDS.add(WebPage.Field.STATUS);
    FIELDS.add(WebPage.Field.METADATA);
  }
  
  private Configuration conf;
  private String providerIndexPattern;
  
  @Override
  public NutchDocument filter(NutchDocument doc, String url, WebPage page)
      throws IndexingException {
    if (StringUtils.isEmpty(providerIndexPattern)) {
      return doc;
    } else {
      if (url.contains(providerIndexPattern) || 
          CrawlStatus.STATUS_GONE == page.getStatus()) {
        // do not index this page
        return null;
      } else {
        return doc;
      }
    }
  }

  @Override
  public Collection<Field> getFields() {
    return FIELDS;
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
    this.providerIndexPattern = conf.get(
      PROVIDER_INDEX_PATTERN_KEY);
  }
}

And thats pretty much it! Delta indexing in just a few lines of code (relatively speaking of course - this is Java we are talking about :-)), thanks to Nutch/GORA (which gives us the Cassandra database) and Hector (which gives us the API to write to it from outside Nutch).

Update - 2012-02-04: I managed to figure out how to implement the CassandraStore.get() method, so my DeltaHandler code does not have any (direct) Hector calls anymore, here it is.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Source: src/java/com/mycompany/nutch/tools/DeltaHandler.java
package com.mycompany.nutch.tools;

import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.gora.store.DataStore;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.crawl.CrawlStatus;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.Bytes;
import org.apache.nutch.util.TableUtil;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;

public class DeltaHandler {
  
  private static final Log LOG = LogFactory.getLog(DeltaHandler.class);
  
  private static final String PROVIDER_INDEX_URLPREFIX_KEY =
    "mycompany.provider.index.urlprefix";
  private static final String SOLR_URL = "mycompany.solr.url";
  private static final Utf8 U_IDX = new Utf8("u_idx");
  
  private String providerIndexUrlPrefix;
  private CommonsHttpSolrServer solrServer;
  private DataStore<String,WebPage> dataStore;
  
  public DeltaHandler() {
    try {
      init();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
  
  private void init() throws Exception {
    Configuration conf = new Configuration();
    conf.addResource("nutch-default.xml");
    conf.addResource("nutch-site.xml");
    this.providerIndexUrlPrefix = 
      conf.get(PROVIDER_INDEX_URLPREFIX_KEY);
    this.solrServer = new CommonsHttpSolrServer(
      conf.get(SOLR_URL));
    this.dataStore = StorageUtils.createDataStore(
      conf, String.class, WebPage.class);
  }

  private void destroy() {
    try {
      dataStore.close();
    } catch (Exception e) {
      LOG.error(e);
    }
  }
  
  /**
   * Could be an addition or update. If the URL is present
   * in the database, then it is considered to be an update
   * and the fetch time is reset to current time - fetch
   * interval. If the URL is not present in the database, it
   * is considered to be an add operation, and the meta index
   * page corresponding to the idx is reset, so it is recrawled.
   * @param url the URL to publish.
   * @param idx the value of the u_idx metadata.
   * @throws Exception if thrown.
   */
  public void publish(String url, String idx) throws Exception {
    LOG.info("Starting publish for url=" + url + ", u_idx=" + idx);
    String key = TableUtil.reverseUrl(url);
    LOG.info("key=" + key);
    WebPage page = dataStore.get(key);
    if (page == null) {
      // record does not exist, reset the index page
      String indexKey = TableUtil.reverseUrl(
        StringUtils.join(new String[] {
        providerIndexUrlPrefix, idx}, "/"));
      LOG.info("index key=" + indexKey);
      WebPage indexPage = dataStore.get(indexKey);
      LOG.info("is indexpage null?" + (indexPage == null));
      resetFetchTime(indexPage);
      dataStore.put(indexKey, indexPage);
      LOG.info("Completed publish for url=" + url + 
        ", u_idx=" + idx + ", reset fetch time for index page");
    } else {
      // record exists, reset its fetch time
      resetFetchTime(page);
      dataStore.put(key, page);
      LOG.info("Completed publish for url=" + url + 
        ", u_idx=" + idx + ", reset fetch time for page");
    }
  }

  private void resetFetchTime(WebPage page) {
    int fetchInterval = page.getFetchInterval();
    LOG.info("after get fetch interval=" + fetchInterval);
    page.setFetchTime(System.currentTimeMillis() - fetchInterval);
  }

  /**
   * Checks to see if the record exists in the database with
   * the specified u_idx metadata value. If so, deletes the
   * record from SOLR, then marks the record as GONE in database.
   * @param url the URL to unpublish.
   * @param idx the value of the u_idx parameter.
   * @throws Exception if thrown.
   */
  public void unpublish(String url, String idx, boolean commit) 
      throws Exception {
    LOG.info("Starting unpublish for url=" + url + 
      ", u_idx=" + idx + ", commit=" + commit);
    String key = TableUtil.reverseUrl(url);
    WebPage page = dataStore.get(key);
    if (page != null) {
      if (page.getMetadata().containsKey(U_IDX)) {
        String uIdx = Bytes.toString(Bytes.toBytes(
          page.getMetadata().get(U_IDX)));
        if (idx.equals(uIdx)) {
          page.setStatus(CrawlStatus.STATUS_GONE);
          dataStore.put(key, page);
          deleteFromSolr(key, commit);
          LOG.info("Completed unpublish for url=" + url + 
            ", u_idx=" + idx);
        }
      }
    }
  }

  private void deleteFromSolr(String key, boolean commit) 
      throws Exception {
    solrServer.deleteById(key);
    if (commit) {
      solrServer.commit();
    }
  }
  
  private static void usage() {
    System.out.println(
      "Usage: DeltaHandler publish|unpublish url idx [commit]");
    System.out.println("commit = true|false, only for unpublish");
    System.exit(-1);
  }
  
  public static void main(String[] args) {
    String command = null;
    String url = null;
    String idx = null;
    Boolean commit = null;
    if (args.length > 0) {
      command = args[0];
      if (!("publish".equals(command)) && 
          !("unpublish".equals(command))) {
        usage();
      }
    }
    if ("publish".equals(command)) {
      if (args.length > 2) {
        url = args[1];
        idx = args[2];
      } else {
        usage();
      }
    } else if ("unpublish".equals(command)) {
      if (args.length > 3) {
        url = args[1];
        idx = args[2];
        commit = Boolean.valueOf(args[3]);
      } else {
        usage();
      }
    }
    DeltaHandler handler = null;
    try {
      handler = new DeltaHandler();
      if ("publish".equals(command)) {
        handler.publish(url, idx);
      } else {
        handler.unpublish(url, idx, commit);
      }
    } catch (Exception e) {
      LOG.error(e, e);
    } finally {
      if (handler != null) {
        handler.destroy();
      }
    }
  }
}

The patch for CassandraStore.get() is available at GORA-93. Since it is applied against GORA 0.2-incubating, you will also need to patch your NutchGora branch with the patch from NUTCH-1205. Note that the reason why neither patch is applied at the moment is because of some failing unit tests traceable to the gora-sql module. The patches worked fine for me, but I am using only gora-cassandra, so YMMV.

2 comments (moderated to prevent spam):

Lina said...

Hi sujit,
I am working on nutch and your blog posts are really helpful But i have some different problem. I want to crawl a particular site and fetch meta tags and other tags from html doc which are crawled and then store the URL->meta data in a mysql database.
Can you please tell me how should i approach to solve this problem. Since i am new So dont know much stuff.And I have successfully crawled the site.

Sujit Pal said...

Hi Lina, if you are using Nutch, you could write a custom parsing plugin that parses the contents of the page to produce the meta tags and write out the (URL=>metatag) mappings to a MySQL database (in addition to writing out the parse segments). This is inline with your normal nutch crawl.

If you are using Nutch/GORA you can also do this offline by running a MR job against your datastore to retrieve the URL and content, extract out the (URL+metatag,metadata) in the map phase, and write these out to the MySQL database in the reduce phase. If not using Nutch/GORA then you can use the parse segments as input as well. This would probably be your preferred approach since you have already finished crawling the site.