Tuesday, September 24, 2013

Reduce-side joins in Java map-reduce

1.0. About reduce side joins

Joins of datasets done in the reduce phase are called reduce side joins.  Reduce side joins are easier to implement as they are less stringent than map-side joins that require the data to be sorted and partitioned the same way.  They are less efficient than maps-side joins because  the datasets have to go through the sort and shuffle phase.

What's involved..
1.  The key of the map output, of datasets being joined, has to be the join key - so they reach the same reducer
2.  Each dataset has to be tagged with its identity, in the mapper- to help differentiate between the datasets in the reducer, so they can be processed accordingly.
3.  In each reducer, the data values from both datasets, for keys assigned to the reducer, are available, to be processed as required.
4.  A secondary sort needs to be done to ensure the ordering of the values sent to the reducer
5.  If the input files are of different formats, we would need separate mappers, and we would need to use MultipleInputs class in the driver to add the inputs and associate the specific mapper to the same.
[MultipleInputs.addInputPath( job, (input path n), (inputformat class), (mapper class n));]

Note:  The join between the datasets (employee, current salary - cardinality of 1..1) in the sample program below has been demonstrated in my blog on map side joins of large datasets, as well.  I have used the same datasets here...as the purpose of this blog is to demonstrate the concept.  Whenever possible, reduce-side joins should be avoided.

[Update - 10/15/2013]
I have added a pig equivalent in the final section.

2.0. Sample datasets used in this gist

The datasets used are employees and salaries.  For salary data, there are two files - one file with  current salary (1..1), and one with historical salary data (1..many). Then there is the department data, a small reference dataset, that we will add to distributed cache and look up in the reducer.


3.0. Implementation a reduce-side join 

The sample code is common for a 1..1 as well as 1..many join for the sample datasets.
The mapper is common for both datasets, as the format is the same.

3.0.1. Components/steps/tasks:

1.  Map output key
The key will be the empNo as it is the join key for the datasets employee and salary
[Implementation: in the mapper]

2.  Tagging the data with the dataset identity
Add an attribute called srcIndex to tag the identity of the data (1=employee, 2=salary, 3=salary history)
[Implementation: in the mapper]

3.  Discarding unwanted atributes
[Implementation: in the mapper]

4. Composite key
Make the map output key a composite of empNo and srcIndex
[Implementation: create custom writable]

5.  Partitioner
Partition the data on natural key of empNo
[Implementation: create custom partitioner class]

5.  Sorting
Sort the data on empNo first, and then source index
[Implementation: create custom sorting comparator class]

6.  Grouping
Group the data based on natural key
[Implementation: create custom grouping comparator class]

7. Joining
Iterate through the values for a key and complete the join for employee and salary data, perform lookup of department to include department name in the output
[Implementation: in the reducer]

3.0.2a. Data pipeline for cardinality of 1..1 between employee and salary data:








































3.0.2b. Data pipeline for cardinality of 1..many between employee and salary data:

























3.0.3. The Composite key

The composite key is a combination of the joinKey empNo, and the source Index (1=employee file.., 2=salary file...)


3.0.4. The mapper

In the setup method of the mapper-
1. Get the filename from the input split, cross reference it against the configuration (set in driver), to derive the source index.  [Driver code: Add configuration [key=filename of employee,value=1], [key=filename of current salary dataset,value=2], [key=filename of historical salary dataset,value=3]
2. Build a list of attributes we cant to emit as map output for each data entity

The setup method is called only once, at the beginning of a map task.  So it is the logical place to to identify the source index.

In the map method of the mapper:
3. Build the map output based on attributes required, as specified in the list from #2

Note:  For salary data, we are including the "effective till" date, even though it is not required in the final output because this is common code for a 1..1 as well as 1..many join to salary data.  If the salary data is historical, we want the current salary only, that is "effective till date= 9999-01-01".


3.0.5. The partitioner

Even though the map output key is composite, we want to partition by the natural join key of empNo, therefore a custom partitioner is in order.


3.0.6. The sort comparator

To ensure that the input to the reducer is sorted on empNo, then on sourceIndex, we need a sort comparator.  This will guarantee that the employee data is the first set in the values list for a key, then the salary data.


3.0.7. The grouping comparator

This class is needed to indicate the group by attribute - the natural join key of empNo


3.0.8. The reducer

In the setup method of the reducer (called only once for the task)-
We are checking if the side data, a map file with department data is in the distributed cache and if found, initializing the map file reader

In the reduce method, -
While iterating through the value list -
1. If the data is employee data (sourceIndex=1), we are looking up the department name in the map file with the deptNo, which is the last attribute in the employee data, and appending the department name to the employee data.
2. If the data is historical salary data, we are only emitting salary where the last attribute is '9999-01-01'.

Key point-
We have set the sort comparator to sort on empNo and sourceIndex.
The sourceIndex of employee data is lesser than salary data - as set in the driver.
Therefore, we are assured that the employee data is always first followed by salary data.
So for each distinct empNo, we are iterating through the values, and appending the same and emitting as output.



3.0.9. The driver

Besides the usual driver code, we are-
1. Adding side data (department lookup data in map file format - in HDFS) to the distributed cache
2. Adding key-value pairs to the configuration, each key value pair being filename, source index.
This is used by the mapper, to tag data with sourceIndex.
3. And lastly, we are associating all the various classes we created to the job.



4.0. The pig equivalent



Pig script-version 1:



Pig script-version 2 - eliminating the reduce-side join:
In this script, we are filtering on most recent salary, and then using the merge join optimization (map-side) in Pig, that can be leveraged on sorted input to the join.


Output:

Sunday, September 22, 2013

Map-side join of large datasets using CompositeInputFormat

This post covers, map-side join of large datasets using CompositeInputFormat, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Hive and Pig rock and rule at joining datasets, but it helps to know how to perform joins in java.

Update [10/15/2013]
I have added the pig equivalent at the very bottom of the gist.

Feel free to share any insights or constructive criticism. Cheers!!

Related blogs:
1. Map-side join sample in Java using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2
3. Map-side join sample in Java of two large datasets, leveraging CompositeInputFormat

Sample program:

Friday, September 20, 2013

Handling small files using CombineFileInputFormat in Java MapReduce

This post covers, CombineFileInputFormat, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

NLineInputFormat in Java MapReduce - use case, code sample

This post covers, NLineInputFormat, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

Thursday, September 19, 2013

MultipleOutputs in Java MapReduce

This post covers, MultipleOutputs, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

Wednesday, September 18, 2013

Secondary sort in Java MapReduce

This post covers, secondary sort in Java mapreduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

Tuesday, September 17, 2013

Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

This post covers, map-side join in Java map-reduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!


What's in this blog?

A sample map-reduce program in Java that joins two datasets, on the map-side - an employee dataset and a department dataset, with the department number as join key.  The department dataset is a very small dataset in MapFile format, is in HDFS, and is added to the distributed cache.  The MapFile is referenced in the map method of the mapper to look up the department name, and emit the employee dataset with department name included.

Apache documentation on DistributedCache:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html

Related blogs:
1. Map-side join sample in Java using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

Data used in this blog:
http://dev.mysql.com/doc/employee/en.index.html

Pig and Hive for joins:
Pig and Hive have join capabilities built-in, and are optimized for the same.  Programs with joins written in java are more performant, but time-consuming to code, test and support - and in some companies considered an anti-pattern for joins.

Sample program

Monday, September 16, 2013

Map-side join sample in Java using reference data (text file) from distributed cache - Part 1

This post covers, map-side join in Java map-reduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

1.0. What's in this blog?

A sample map-reduce program in Java that joins two datasets, on the map-side - an employee dataset and a department dataset, with the department number as join key.  The department dataset is a very small dataset, is reference data, is in HDFS, and is added to the distributed cache.  The mapper program retrieves the department data available through distributed cache and and loads the same into a HashMap in the setUp() method of the mapper, and the HashMap is referenced in the map method to get the department name, and emit the employee dataset with department name included.

Section 2 demonstrates a solution where a file in HDFS is added to the distributed cache in the driver code, and accessed in the mapper setup method through the distributedcache.getCacheFiles method.

Section 3 demonstrates a solution where a local file is added to the distributed cache at the command line, and accessed in the mapper setup method.

Apache documentation on DistributedCache:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html

Related blogs:
1. Map-side join sample using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

Data used in this blog:
http://dev.mysql.com/doc/employee/en.index.html

Pig and Hive for joins:
Pig and Hive have join capabilities built-in, and are optimized for the same.  Programs with joins written in java are more performant, but time-consuming to code, test and support - and in some companies considered an anti-pattern for joins.

2.0. Sample program

In this program, the side data, exists in HDFS, and is added to the distributedcache in the driver code, and referenced in the mapper using DistributedCache.getfiles method.



3.0. Variation 

As a variation to the code in section 2.0, this section demonstrates how to add side data that is not in HDFS to distributed cache, through command line, leveraging GenericOptionsParser



   

Friday, September 13, 2013

Sequence File - construct, usage, code samples

This post covers, sequence file format, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

1.0. What's in this blog?


1.  Introduction to sequence file format
2.  Sample code to create a sequence file (compressed and uncompressed), from a text file, in a map reduce program, and to read a sequence file.

2.0. What's a Sequence File?


2.0.1. About sequence files:
A sequence file is a persistent data structure for binary key-value pairs.

2.0.2. Construct:
Sequence files have sync points included after every few records, that align with record boundaries, aiding the reader to sync.  The sync points support splitting of files for mapreduce operations.  Sequence files support record-level and block-level compression.

Apache documentation: http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/SequenceFile.html

Excerpts from Hadoop the definitive guide...
"A sequence file consists of a header followed by one or more records. The first three bytes of a sequence file are the bytes SEQ, which acts as a magic number, followed by a single byte representing the version number. The header contains other fields, including the names of the key and value classes, compression details, user-defined metadata, and the sync marker. 
Structure of sequence file with and without record compression-
















The format for record compression is almost identical to no compression, except the value bytes are compressed using the codec defined in the header. Note that keys are not compressed.
Structure of sequence file with and without block compression-











Block compression compresses multiple records at once; it is therefore more compact than and should generally be preferred over record compression because it has the opportunity to take advantage of similarities between records. A sync marker is written before the start of every block. The format of a block is a field indicating the number of records in the block, followed by four compressed fields: the key lengths, the keys, the value lengths, and the values."

The uncompressed, record-compressed and block-compressed sequence files, share the same header.  Details are below, from the Apache documentation, on sequence files.

SequenceFile Header
  • version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number (e.g. SEQ4 or SEQ6)
  • keyClassName -key class
  • valueClassName - value class
  • compression - A boolean which specifies if compression is turned on for keys/values in this file.
  • blockCompression - A boolean which specifies if block-compression is turned on for keys/values in this file.
  • compression codec - CompressionCodec class which is used for compression of keys and/or values (if compression is enabled).
  • metadata - SequenceFile.Metadata for this file.
  • sync - A sync marker to denote end of the header.
Uncompressed SequenceFile Format
  • Header
  • Record
  • Record length
  • Key length
  • Key
  • Value
  • A sync-marker every few 100 bytes or so.
Record-Compressed SequenceFile Format
  • Header
  • Record
  • Record length
  • Key length
  • Key
  • Compressed Value
  • A sync-marker every few 100 bytes or so.
Block-Compressed SequenceFile Format
  • Header
  • Record Block
  • Uncompressed number of records in the block
  • Compressed key-lengths block-size
  • Compressed key-lengths block
  • Compressed keys block-size
  • Compressed keys block
  • Compressed value-lengths block-size
  • Compressed value-lengths block
  • Compressed values block-size
  • Compressed values block
  • A sync-marker every block.
2.0.3. Datatypes: 
The keys and values need not be instances of Writable, just need to support serialization.

2.0.4. Creating sequence files: 
Uncompressed: Create an instance of SequenceFile.Writer and call append(), to add key-values, in order.  For record and block compressed, refer the Apache documentation.  When creating compressed files, the actual compression algorithm used to compress key and/or values can be specified by using the appropriate CompressionCodec.

2.0.5. Reading data in sequence files: 
Create an instance of SequenceFile.Reader, and iterate through the entries using reader.next(key,value).

2.0.6. Usage
- Data storage for key-value type data
- Container for other files
- Efficient from storage perspective (binary), efficient from a mapreduce processing perspective (supports compression, and splitting)

3.0. Creating a sequence file


4.0. Reading a sequence file

Covered already in the gist under section 3.

5.0. Any thoughts/comments

Any constructive criticism and/or additions/insights is much appreciated.

Cheers!!





Thursday, September 12, 2013

Map File - construct, usage, code samples

This post covers, map file format, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

1.0. What's in this blog?

1.  Introduction to map file
2.  Sample code to convert a text file to a map file
3.  Sample code to read a map file

2.0. What's a Map File?

2.0.1. Definition:
From Hadoop the Definitive Guide..
A MapFile is a sorted SequenceFile with an index to permit lookups by key. MapFile can be thought of as a persistent form of java.util.Map (although it doesn’t implement this interface), which is able to grow beyond the size of a Map that is kept in memory.
Apache documentation:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/MapFile.html 

2.0.2. Datatypes: 
The keys must be instances of WritableComparable, and the values, Writable.

2.0.3. Creating map files: 
Create an instance of MapFile.Writer and call append(), to add key-values, in order.

2.0.4. Looking up data in map files: 
Create an instance of MapFile.Reader, and call get(key,value).

2.0.5. Construct
The map file is actually a directory.  Within the same, there is an "index" file, and a "data" file.
The data file is a sequence file and has keys and associated values.
The index file is smaller, has key value pairs with the key being the actual key of the data, and the value, the byte offset.  The index file has a fraction of the keys and is determined by MapFile.Writer.GetIndexInterval().

2.0.5.1. Directory structure:
$ hadoop fs -ls formatProject/data/departments_map | awk '{print $8}'
formatProject/data/departments_map/data
formatProject/data/departments_map/index

2.0.5.2. Content of the file 'data':
$ hadoop fs -text formatProject/data/departments_map/data
d001  Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service

2.0.5.3. Content of the file 'index':
$ hadoop fs -text formatProject/data/departments_map/index
d001 121
d002 152
d003 181
d004 218
d005 250
d006 283
d007 323
d008 350
d009 380

2.0.6. Behind the scenes of a look up
The index file is read into memory, the key less than or equal to the one being looked up is (binary) searched for, and the reader seeks to this key and reads up to key being looked up, extracts and returns the value associated with the key.  Returns a null if the key is not found.

If the map file is too large to load into memory, there are configurations that can be set to skip keys in the index.   

2.0.7. Usage
Fast lookups - in joins, among others.
Can also be used as a container for small files, with the filename as the key.

3.0. Creating a map file


4.0. Looking up a key in a map file

Covered already in the gist under section 3.
The plan is to use the map file in a map-side join in a subsequent blog.

5.0. Any thoughts/comments

Any constructive criticism and/or additions/insights is much appreciated.

Cheers!!

Wednesday, September 11, 2013

Apache Oozie - Part 12: Oozie Shell Action + Passing output from one Oozie action to another

I had read about the Oozie capability to allow passing output from one action to another and forgotten about it, sure enough, it came up at an interview.  Here's some sample code...


1.0. What's covered in the blog?

1. Documentation on the Oozie shell action
2. A sample oozie workflow that includes a shell script action that echoes a count of the number of lines in a file glob, and an email action that captures the output of the shell action and email it.

Version:
Oozie 3.3.0; Pig 0.10.0

Related blogs:
Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11a: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie shell action + passing output from one action to another


2.0. Documentation on the Oozie Shell Action


Apache documentation is available at - http://oozie.apache.org/docs/3.3.0/DG_ShellActionExtension.html


3.0. Sample program



4.0. Oozie web console screenshots