Sunday, July 29, 2012

More Fun with Hadoop In Action Exercises (Pig and Hive)

In my last post, I described a few Java based Hadoop Map-Reduce solutions from the Hadoop in Action (HIA) book. According to the Hadoop Fundamentals I course from Big Data University, part of being a Hadoop practioner also includes knowing about the many tools that are part of the Hadoop ecosystem. The course briefly touches on the following four tools - Pig, Hive, Jaql and Flume.

Of these, I decided to focus (at least for the time being) on Pig and Hive (for the somewhat stupid reason that the HIA book covers these too). Both of these are are high level DSLs that produce sequences of Map-Reduce jobs. Pig provides a data flow language called PigLatin, and Hive provides a SQL-like language called HiveQL. Both tools provide a REPL shell, and both can be extended with UDFs (User Defined Functions). The reason they coexist in spite of so much overlap is because they are aimed at different users - Pig appears to be aimed at the programmer types and Hive at the analyst types.

The appeal of both Pig and Hive lies in the productivity gains - writing Map-Reduce jobs by hand gives you control, but it takes time to write. Once you master Pig and/or Hive, it is much faster to generate sequences of Map-Reduce jobs. In this post, I will describe three use cases (the first of which comes from the HIA book, and the other two I dreamed up).

Patent (Jaccard) Similarity - Pig

Given the Patent Citation Data (cite75_99.zip), the objective is to find "similar" patents. The HIA book provides a worked example of finding patents with similar citations using the same dataset. However, I found the online Pig documentation much more useful for learning PigLatin.

For the problem, I decided to use Jaccard similarity between the cited patents to indicate the similarity between two (citing) patents. The PigLatin script below first groups the cited patents and then passes it into a Pig UDF JaccardSimilarity (Java Code | JunitTest). You can find the script on GitHub here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/* 
 * patent_similarity.pig
 * Given an input file of (citing_patent_number,cited_patent_number), this
 * script computes the Jaccard similarity between individual patents by
 * considering the overlap of the cited patents.
 */

-- pigudfs-1.0-SNAPSHOT.jar contains custom UDF JaccardSimilarity
REGISTER ./pigudfs-1.0-SNAPSHOT.jar; 

-- Load the text into a relation. Example:
-- (1,2)
-- (1,3)
-- (2,3)
-- (2,4)
-- (3,5)
-- citings = LOAD 'test.txt'
citings = LOAD 'input/cite75_99.txt' 
  USING PigStorage(',') AS (citing:int, cited:int);

-- Group by citing patent number. Example:
-- (1,{(1,2),(1,3)})
-- (2,{(2,3),(2,4)})
-- (3,{(3,5)})
citings_grpd = GROUP citings BY citing; 

-- Join previous 2 relations to include the cited patent in the relation
-- Tuple. Example:
-- (1,2,1,{(1,2),(1,3)})
-- (1,3,1,{(1,2),(1,3)})
-- (2,3,2,{(2,3),(2,4)})
-- (2,4,2,{(2,3),(2,4)})
-- (3,5,3,{(3,5)})
citings_joined = JOIN citings BY citing, citings_grpd BY group;

-- Eliminate the extra citings_grpd.group column and rename for sanity.
-- (1,2,{(1,2),(1,3)})
-- (1,3,{(1,2),(1,3)})
-- (2,3,{(2,3),(2,4)})
-- (2,4,{(2,3),(2,4)})
-- (3,5,{(3,5)})
citings_joined2 = FOREACH citings_joined 
  GENERATE $0 AS citing, $1 AS cited, $3 AS cite_pairs;

-- JOIN previous relation with citings_grpd to add the patents list
-- for the cited patent. We already have the patent list for the citing
-- patent. For reference, these relations are as follows:
-- citings_joined2: {citing: int,cited: int,cite_pairs: {(citing: int,cited: int)}}
-- citings_grpd: {group: int,citings: {(citing: int,cited: int)}}
-- Resulting data looks like this:
-- (1,2,{(1,2),(1,3)},2,{(2,3),(2,4)})
-- (1,3,{(1,2),(1,3)},3,{(3,5)})
-- (2,3,{(2,3),(2,4)},3,{(3,5)})
citings_joined3 = JOIN citings_joined2 BY cited, citings_grpd BY group;

-- Eliminate the extra citings_grpd.group value and rename cols for sanity.
-- Also eliminate the citing part of the tuples in both left_citeds and
-- right_citeds, so we can calculate similarity.
-- (1,2,{(2),(3)},{(3),(4)})
-- (1,3,{(2),(3)},{(5)})
-- (2,3,{(3),(4)},{(5)})
citings_joined4 = FOREACH citings_joined3 
  GENERATE $0 AS citing, $1 AS cited, 
           $2.cited AS left_citeds, $4.cited AS right_citeds;

-- Remove sim(A,A) because we know its always 1.0
citings_joined5 = FILTER citings_joined4 BY citing != cited;

-- Project the relation through a UDF
-- (1,2,0.0)
-- (1,3,0.0)
-- (2,3,0.0)
citings_similarity = FOREACH citings_joined5 
  GENERATE citing, cited, 
  com.mycompany.pigudfs.JaccardSimilarity(left_citeds, right_citeds) AS jsim;

-- Remove entries with 0 similarity
citings_similarity2 = FILTER citings_similarity BY jsim > 0.0;

-- Order the output by descending order of similarity
citings_similarity_ordrd = ORDER citings_similarity2 BY jsim DESC;

-- Store the output in a comma-separated format into output
STORE citings_similarity_ordrd INTO 'patent_sim_output' USING PigStorage(',');

The above script results in a linear sequence of 5 Map-Reduce jobs. As you can imagine, building the script above would probably take less time than building a sequence of 5 Map-Reduce jobs.

This Data Chef Blog post has a solution to a similar problem where it demonstrates how to calculate Jaccard Similarity without a UDF. However, I find my solution simpler, based on my (very limited) experience that UDFs make for easier to read PigLatin code).

Another trick I found useful and wanted to share... While building Pig (or for that matter, Hive) programs, it is often helpful to work with a very small (often cooked up) subset of data. The data I used can be found in comments above each block of code (it is the data produced by DUMPing the last tuple in each block). As you can see, the data I used is unrelated in terms of content to real data, but it exhibits characteristics that my code will exploit to produce the results. Also since the dataset size is small, its easy to do multiple quick runs during development.

Movie (Collaborative Filtering) Similarity - Pig

This example is something I dreamt up, based on reading something about collaborative filtering. The objective here is to find movies to recommend to a user given his choice of movies so far, using Collaborative Filtering against the ratings file in the MovieLens 1M dataset. For each movie, we find users who have rated the same movie, then we find movies (call this related movies) rated by each of those users - the number of times a related movie is mentioned for a given movie constitutes its "similarity" to the original movie. Here is the script. It uses a UDF to Deduplicate, Rank and Count occurrences of related movies called OrderByCountDesc (Java Code | JUnit test). The script can be downloaded from GitHub here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/*
 * movie_collab_filter.pig
 * Finds movies to recommend based on collaborative filtering.
 * Each movie is mapped to a user and then each user is mapped to other
 * movies rated highly by the user. The other movies are candidates for
 * recommendation order by rating desc.
 */
-- Register custom UDF jar
REGISTER ./pigudfs-1.0-SNAPSHOT.jar;

-- load data
-- The field delimiter here is "::" which can't be parsed by PigStorage, so
-- we need to sed the input to replace "::" to "\t". Output:
-- (1,100,4,20120724)
-- (2,100,5,20120724)
-- (3,100,4,20120724)
-- (1,200,4,20120724)
-- (3,200,5,20120724)
-- (1,300,1,20120724)
-- (2,300,4,20120724)
ratings = LOAD 'input/ml-ratings.dat' USING PigStorage('\t') 
  AS (uid:int, mid:int, rating:int, timestamp:chararray);

-- since this is a recommender system we want to only consider entries whose
-- ratings >= 4 (on a 5-point rating scale). Also remove extraneous cols.
-- (1,100)
-- (2,100)
-- (3,100)
-- (1,200)
-- (3,200)
-- (2,300)
ratings2 = FILTER ratings BY rating > 3;
ratings3 = FOREACH ratings2 GENERATE uid, mid;

-- Build copy of ratings3 for self JOINs below
ratings3_copy = FOREACH ratings3 GENERATE *;

-- For each movie, first find all other users who have rated the movie
-- highly, then for each such movie, find all the other movies for the
-- same user. The other movies are the ones related to the original 
-- movie through the magic of collaborative filtering.
ratings_join_mid = JOIN ratings3 BY mid, ratings3_copy BY mid;
ratings_join_mid2 = FOREACH ratings_join_mid 
  GENERATE $0 AS uid, $1 AS mid, $3 AS tmid;
ratings_join_uid = JOIN ratings_join_mid2 BY uid, ratings3 BY uid;

-- Remove rows where the original movie and the "other" movie are the
-- same (because we don't want to recommend the same movie to the user).
-- Finally remove extraneous columns. Final output after this block:
-- (100,200)
-- (200,100)
-- (100,200)
-- (200,100)
-- (100,200)
-- (300,100)
-- (100,300)
-- (100,300)
-- (100,300)
-- (100,200)
-- (100,200)
-- (200,100)
-- (200,100)
-- (100,200)
ratings_join_uid2 = FILTER ratings_join_uid BY $1 != $4;
ratings_join_uid3 = FOREACH ratings_join_uid2 
  GENERATE $1 AS mid, $4 AS rmid;

-- Group the related movies so we can generate a count.
-- (100,{(100,200),(100,200),(100,200),(100,300),(100,300),
--        (100,300),(100,200),(100,200),(100,200)})
-- (200,{(200,100),(200,100),(200,100),(200,100)})
-- (300,{(300,100)})
ratings_cnt = group ratings_join_uid3 BY mid;

-- Use custom UDF to dedup and rerank related movie tuples by count.
-- (100,{(100,200),(100,300)})
-- (200,{(200,100)})
-- (300,{(300,100)})
ratings_cnt_ordrd = FOREACH ratings_cnt 
  GENERATE group AS mid, 
  com.mycompany.pigudfs.OrderByCountDesc($1) AS ordered_mids;

-- Flatten the result so data can be fed into a relational table.
-- (100,200)
-- (100,300)
-- (200,100)
-- (300,100)
ratings_cnt_flat = FOREACH ratings_cnt_ordrd
  GENERATE mid,
  FLATTEN(ordered_mids.mid_r) AS rmid;

-- Store output into HDFS
STORE ratings_cnt_flat INTO 'movie_collab_filter_output' 
  USING PigStorage('\t');

The script results in a non-linear sequence of 4 Map-Reduce jobs.

Movie (Collaborative Filtering) Similarity - Hive

While I was building the Pig solution above, it occurred to me that this would be better solved with Hive, since this is basically pivoting on two columns and grouping, so I decided to try doing this with Hive. If you are familiar with SQL, then Hive has a learning curve thats almost non-existent. Here is Hive-QL script.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
--
-- movie_collab_filter.q
-- Builds up a table of similar movie pairs ordered by count.
--

-- Create table to hold input data and load input data
-- Output:
-- 1 100 4 20120724
-- 2 100 5 20120724
-- 3 100 4 20120724
-- 1 200 4 20120724
-- 3 200 5 20120724
-- 1 300 1 20120724
CREATE TABLE ratings (uid INT, mid INT, rating INT, tstamp INT)
  ROW FORMAT DELIMITED                             
  FIELDS TERMINATED BY '\t'
  STORED AS TEXTFILE;
LOAD DATA LOCAL INPATH '/tmp/test.txt' OVERWRITE INTO TABLE ratings;

-- Only use ratings which are > 3
CREATE TABLE ratings2 (uid INT, mid INT);
INSERT OVERWRITE TABLE ratings2
  SELECT uid, mid FROM ratings
  WHERE rating > 3;

-- For each (uid,mid) pair, find all users who have the same mid
-- Then for each such record, find all movies with the same uid.
-- Output:
-- 100 200
-- 100 300
-- 300 100
-- 300 200
-- 100 200
-- 100 300
-- 200 100
-- 200 300
-- 200 100
-- 200 300
-- 100 200
-- 100 300
-- 300 100
-- 100 300
-- 100 300
-- 100 300
-- 200 100
-- 100 200
-- 200 100
-- 100 200
-- 100 200
CREATE TABLE mid_pairs (mid INT, rmid INT);
INSERT OVERWRITE TABLE mid_pairs 
  SELECT a.mid, c.mid 
  FROM ratings2 a JOIN ratings2 b ON (a.mid = b.mid) 
                  JOIN ratings2 c ON (b.uid = c.uid);

-- Eliminate pairs where the source and related mid are identical.
CREATE TABLE mid_pairs2 (mid INT, rmid INT);
INSERT OVERWRITE TABLE mid_pairs2
  SELECT mid, rmid
  FROM mid_pairs
  WHERE mid != rmid;

-- Group by (mid, rmid) and count occurrences
-- 100 200 6
-- 100 300 6
-- 200 100 4
-- 200 300 2
-- 300 100 2
-- 300 200 1
CREATE TABLE mid_counts (mid INT, rmid INT, cnt INT);
INSERT OVERWRITE TABLE mid_counts
  SELECT mid, rmid, COUNT(rmid)
  FROM mid_pairs2
  GROUP BY mid, rmid;

DROP TABLE ratings2;
DROP TABLE mid_pairs;
DROP TABLE mid_pairs2;

This translates into 5 underlying Map-Reduce jobs. As you can see, not too hard to understand if you know SQL (which I am guessing most of you do).

Thats it for today. See you again next week!

Friday, July 20, 2012

Fun With Hadoop In Action Exercises (Java)

As some of you know, I recently took some online courses from Coursera. Having taken these courses, I have come to the realization that my knowledge has some rather large blind spots. So far, I have gotten most of my education from books and websites, and I have tended to cherry pick subjects which I need at the moment for my work, as a result of which I tend to ignore stuff (techniques, algorithms, etc) that fall outside that realm. Obviously, this is Not A Good Thing™, so I have begun to seek ways to remedy that.

I first looked at Hadoop years ago, but never got much beyond creating proof of concept Map-Reduce programs (Java and Streaming/Python) for text mining applications. Lately, many subprojects (Pig, Hive, etc) have come up in order to make it easier to deal with large amounts of data using Hadoop, about which I know nothing. So in an attempt to ramp up relatively quickly, I decided to take some courses at BigData University.

The course uses BigInsights (IBM's packaging of Hadoop) which run only on Linux. VMWare images are available, but since I have a Macbook Pro, that wasn't much use to me without a VMWare player (not free for Mac OSX). I then installed VirtualBox and tried to run a Fedora 10 64-bit image on it, and install BigInsights on Fedora, but it failed. I then tried to install Cloudera CDH4 (Cloudera's packaging of Hadoop) on it (its a series of yum commands), but that did not work out either. Ultimately I decided to ditch VirtualBox altogether and do a pseudo-distributed installation of the stock Apache Hadoop (1.0.3) direct on my Mac following instructions on Michael Noll's page.

The Hadoop Fundamentals I course which I was taking covers quite a few things, but I decided to stop and actually read all of Hadoop in Action (HIA) in order to get a more thorough coverage. I had purchased it some years before as part of Manning's MEAP (Early Access) program, so its a bit dated (examples are mostly in the older 0.19 API), but its the only Hadoop book I possess, and the concepts are explained beautifully, and its not a huge leap to mentally translate code from the old API to the new, so it was well worth the read.

I also decided to tackle the exercises (in Java for now) and post my solutions on GitHub. Three reasons. First, it exposes me to a more comprehensive set of scenarios than I have had previously, and forces me to use techniques and algorithms that I wont otherwise. Second, hopefully some of my readers can walk circles around me where Hadoop is concerned, and they would be kind enough to provide criticism and suggestions for improvement. And third, there may be some who would benefit from having the HIA examples worked out. So anyway, here they are, my solutions to selected exercises from Chapters 4 and 5 of the HIA book for your reading pleasure.

Top K Records

Using the cite75_99.txt of the Citation Dataset discussed in the HIA examples, the objective is to find the top N (== 10 in this case) most frequently cited patents in descending order.

The format of the input data is CITING_PATENT,CITED_PATENT. The solution (TopKRecords.java) consists of two Map-Reduce jobs described below.

In the first job, the mapper extracts the CITED_PATENT and sends the pair (CITED_PATENT, 1) to the reducer which aggregates it to (CITED_PATENT, n) where n is the number of times CITED_PATENT was cited. The record is then fed into a fixed size PriorityQueue with a custom Comparator that sorts the record by ascending count. As the Priority Queue size increases beyond N, records are discarded from the head of the queue (which is the lowest value). The cleanup method of the reducer writes out the record (CITED_PATENT, n) for the top N patents in the reducer to HDFS.

In the second job, the mapper reads the files written out by the previous job and writes out key value pairs as (CITED_PATENT, n) to a single reducer which now aggregates these into a fixed size Priority Queue to produce the top N cited patents across all the splits. Note that both the first and second jobs use the same Reducer implementation.

Web Traffic Measurement

Given a set of webserver logs for a site, the objective here is to find the hourly web traffic to that site. The best I could do for a relatively high traffic site, though, was the Solr/Tomcat logs from our staging servers, with a somewhat funky (multi-line) log format. Since Apache HTTPD logs are single-line, I decided to stick with the spirit of the problem and preprocess the Tomcat logs so they look like Apache access.log files. The script I used to do this is HourlyWebTraffic.py.

The solution is a single Map-Reduce job in HourlyWebTraffic.java. The mapper reads the logline and uses a regular expression to extract the date format from the log, parses it into a Date object using SimpleDateFormat, and extracts the hour from it. The mapper emits the (HOUR, 1) pair to the reducer, which sorts them. I got lazy here and made my number of reducers == 1 so the results are sorted and suitable for graphing, but I could just as well have used a custom partitioner (described in the Time Series example below) to ensure that the reducer outputs are sorted (or use an external sort on the merged files).

Sparse Matrix Dot Product

Given two matrices X and Y, the objective is to compute their Dot Product. The matrices are reshaped into column vectors and specified in sparse format, ie (col,value) only when value is non-zero. Each matrix is specified in its own data file. I used the DotProduct.py script to generate the two matrix files.

The solution is modeled as two Map-Reduce jobs in DotProduct.java. The mapper in the first job just parses the data into the key-value pair (col, value) pairs, and the reducer aggregates these values, thus calculating the products of X and Y column pairs. If there is only one (col, value) pair found in the reducer, then the product is 0. The reducer writes out (constant, product) values into HDFS.

The mapper in the next job reads the (constant, product) values from HDFS and the reducer sums them up. The sum is written out to HDFS.

Moving Average of a Time Series

Given a time series data (a list of numbers), the objective here is to calculate a Simple Moving Average over the last N data points. In addition, the exercise asks to use a Combiner to reduce data shuffling across the network, and a Partitioner to ensure that the averaged data remains sorted by sequence across multiple Reducers.

For the data, I used TimeSeries.py to generate a list of numbers for this. The solution (TimeSeries.java) is modeled as a single Map-Reduce job, described below:

In the first job, the mapper reads in the data from HDFS into a blocking queue. Whenever the size of the queue reaches N, all N readings are emitted to the reducer as (currentline, value). Thus, if N = 5, the mapper will emit (v1, v2, v3, v4, v5) as (5, v1), (5, v2), (5, v3), (5, v4) and (5, v5) and remove the last item from the queue. So from that point on, as the mapper reads each additional line, it will emit the last N values with the current line number as the key. The reducer sums these values and divides by N (the moving average) and writes it to HDFS.

The Combiner class here is the same as the Reducer class, and effectively computes the moving average before it is sent across the network to the reducer, so the reducer is just a pass through.

The Partitioner class splits up the key space into buckets. Thus each Reducer will deal with a subset of the range of keys and produce sorted lists of each subset as its output. When concatenated correctly, these sorted lists will also be sorted. The number of records, needed to compute the size of each partition, is passed in through the command line parameters and through the Configuration (our Partitioner implements Configurable).

Disjoint Selection (Reduce-side join)

This is a slight variation on the problem posed in the HIA book, since I wasn't able to get the data being referred to. Instead I used the MovieLens 1M dataset which has a similar parent-child structure.

With the MovieLens data, the objective is to find users who are not "power taggers", ie, those who have tagged less than MIN_RATINGS (25) movies. The solution (DisjointSelector.java) is a single Map-Reduce job using a technique called Repartitioned Join or Reduce-Side join.

The Mapper reads both the user data and ratings data. The number of colums differ in the two datasets, so it classifies the record as User or Rating and emits (userID, recordType) to the reducer. The Reducer groups the records by recordType and sums up the occurrences. If the number of rating records are less than MIN_RATINGS, then it writes out the (userID, numberOfRatings) to HDFS.

Ratio Calculation (Reduce-side join)

Once again, I changed the problem a bit because I could not get the data. The problem asks for the ratio between stock prices of the same stock from today and yesterday. What I could find was daily closing numbers for the last one year for any stock ticker symbol at http://www.google.com/finance/historical?output=csv&q=[Symbol name]. So I changed my example to calculate the ratio between Google and IBM stock prices over the last year. I am not sure what the results mean, or if they even mean anything at all.

So this is just another example of reduce-side joining. The code can be found in StockPriceRatio.java.

The Mapper parses out the date, symbol and closing price from the CSV file and emits the key-value pair (date, symbol:closing_price) to the reducer. The reducer parses out the symbol:closing_price and sets the closing_price value as the numerator if symbol == GOOG or the denominator if symbol == IBM. It then calculates the ratio and writes out (date, ratio) values to HDFS.

Product of Vector and Matrix (Map-side join + Distributed Cache)

The objective here is to build a Map-Reduce job to compute the product of a vector and matrix, both of which are sparse, and is represented similarly as the Sparse Matrix Dot Product example. The Matrix is identified by (row, col, value) and the vector is identified by (row, value) for non-zero values of value. Additionally, the vector should be held in Distributed Cache.

For this, we use another join technique called Map-side join shown in MatrixMultiplyVector.java. The data was generated using MatrixMultiplyVector.py.

Each Mapper reads the Vector file from HDFS Distributed Cache and populates an internal HashMap. It then reads each Matrix file entry, and multiplies the entry (if one exists) with the corresponding vector entry where vector row == matrix column. The Mapper emits (row, product) pairs to the Reducer, which sums up all the product values for the given row. The end result is a file representing the result vector in sparse format on HDFS.

Spatial Join (Semi-Join)

In this problem, given a 2-dimensional space where the x and y coordinates range from [-1 billion, +1 billion], and given a file of FOOs and BARs representing points on this space, the objective is to find FOOs which are less than 1 unit distance from a BAR. Distance is measured as Eucledian Distance. Also the number of BARs are << number of FOOs.

The idea here is to filter out join candidates in the Mapper and then join them in the Reducer, a technique called Semi-Join or Map-side filtering with Reduce-side Joining. The data for this was generated using SpatialJoin.py and the Java code for the Map-Reduce job can be seen in SpatialJoin.java.

This one had me racking my brains for a bit. I finally hit upon the idea of reducing a BAR to its cell (the 1x1 unit in which the point exists) plus all its immediate neighbor cells (a total of 9 cells). A FOO is reduced to only its containing cell. Now candidate neighbors for a FOO can be found by comparing its containing cell to one of the BAR cells. You still need to compute the actual Eucledian distance for each FOO-BAR pair, but this is much less than computing across all FOO-BAR pairs.

So the solution is basically a single Map-Reduce job. The Mapper reads data from either the FOO or the BAR file. If it finds a FOO, it just computes its 1x1 cell and emits (cell, original_value) to the Reducer. In case of BAR, it computes its 1x1 cell and its immediate neighbors and emits all 9 (cell, original_value) pairs to the Reducer. The Reducer aggregates the FOO and BAR into two Lists, then loops through the FOO and BAR points calculating the square of the distance between each pair. Pairs which are within 1 unit of each other are written out as (FOO-X, FOO-Y, BAR-X, BAR-Y) to HDFS.

Spatial Join with Bloom Filter (Semi-Join, Bloom Filter)

This is the same problem as the previous one, except that this time we have to use a Bloom Filter to hold the BARs. I used the Bloom Filter implementation that comes with Hadoop. This dynamic BloomFilter tutorial provided me good information about how to size the filter given the number of records I had.

The solution (SpatialJoinWithBloomFilter.java) consists of two Map-Reduce jobs. The first creates the BloomFilter in a distributed manner and persists it to HDFS. The Mapper reads the BAR file and writes the container cell and its neighbors for each BAR point into the Bloom Filter. The Reducer aggregates (ORs) the BloomFilters produced by the Mapper into a single one and writes it to HDFS in its cleanup() method.

The Mapper in the second job loads up the BloomFilter from HDFS in its setup() method, then reads the FOOs file. For each FOO, it will compute the container cell and check to see if there is a BAR with the same cell in the BloomFilter. If there is, it passes the FOO across. For the BARs, the Mapper will pass all the BARs to the Reducer. In both cases, the data is passed as (cell, original_value). The Reducer will get the group of FOO and BAR which was found to be close together, so it will group them similar to the previous solution, and compute the Eucledian distance between all candidate FOO-BAR pairs. Pairs within 1 unit of each other are written out to the HDFS.

So there you have it. None of these are hugely complicated and doesn't need a whole lot of domain knowledge, but it helped me explore and understand techniques that are standard for Hadoop coders. Hopefully it helps you too.