Hadoop Streaming

Hadoop Streaming uses UNIX standard streams as the interface between Hadoop and your program so you can write Mapreduce program in any language which can write to standard output and read standard input. Hadoop offers a lot of methods to help non-Java development.

The primary mechanisms are Hadoop Pipes which gives a native C++ interface to Hadoop and Hadoop Streaming which permits any program that uses standard input and output to be used for map tasks and reduce tasks.


With this utility one can create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.

Hadoop Streaming Example using Python

Hadoop Streaming supports any programming language that can read from standard input and write to standard output. For Hadoop streaming, one must consider the word-count problem. Codes are written for the mapper and the reducer in python script to be run under Hadoop.

Mapper Code

!/usr/bin/python

import sys

for intellipaatline in sys.stdin: # Input takes from standard input

intellipaatline = intellipaatline.strip() # Remove whitespace either side

words = intellipaatline.split() # Break the line into words

for myword in words: # Iterate the words list

output print ‘%s\t%s’ % (myword, 1) # Write the results to standard

Reducer Code

#!/usr/bin/python from operator

import item getter

import sys current_word = “” current_count = 0 word = “” for intellipaatline in sys.stdin:

# Input takes from standard input intellipaatline = intellipaatline.strip()

# Remove whitespace either side word , count = intellipaatline.split(‘\t’, 1)

# Split the input we got from mapper.py try:

# Convert count variable to integer count = int(count) except ValueError:

# Count was not a number, so silently ignore this line continue if current_word == word: current_count += count else:

if current_word: print ‘%s\t%s’ % (current_word, current_count)

# Write result to standard o/p current_count = count current_word = word if current_word == word:

# Do not forget to output the last word if needed! print ‘%s\t%s’ % (current_word, current_count)

Mapper and Reducer codes should be saved in mapper.py and reducer.py in Hadoop home directory.

WordCount Execution

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1. 2.1.jar \

-input input_dirs \ –

output output_dir \ –

mapper<path/mapper.py \

-reducer <path/reducer.py

Where “\” is used for line continuation for clear readability

How Hadoop Streaming Works?

  • Input is read from standard input and the output is emitted to standard output by Mapper and the Reducer. Utility creates a Map/Reduce job, submits the job to an appropriate cluster, and monitors the progress of the job until completion.
  • Every mapper task will launch the script as a separate process when the mapper is initialized after a script is specified for mappers. Mapper task inputs are converted into lines and fed to the standard input and Line oriented outputs are collected from the standard output of the procedure Mapper and every line is changed into a key, value pair which is collected as the outcome of the mapper.
  • Each reducer task will launch the script as a separate process and then the reducer is initialized after a script is specified for reducers. As the reducer task runs, reducer task input key/values pairs are converted into lines and feds to the standard input (STDIN) of the process.
  • Each line of the line-oriented outputs is converted into a key/value pair after it is collected from the standard output (STDOUT) of the process, which is then collected as the output of the reducer.

Important Commands

ParametersDescription
-input directory/file-nameInput location for mapper. (Required)
-output directory-nameOutput location for reducer. (Required)
-mapper executable or script or JavaClassNameMapper executable. (Required)
-reducer executable or script or JavaClassNameReducer executable. (Required)
-file file-nameCreate the mapper, reducer or combiner executable available locally on the compute nodes.
-inputformat JavaClassNameClass you offer should return key, value pairs of Text class. If not specified TextInputFormat is used as the default.
-outputformat JavaClassNameClass you offer should take key, value pairs of Text class. If not specified TextOutputformat is used as the default.
-partitioner JavaClassNameClass that determines which reduce a key is sent to.
-combiner streaming Command or JavaClassNameCombiner executable for map output.
-inputreaderFor backwards compatibility: specifies a record reader class instead of an input format class.
-verboseVerbose output.
-lazyOutputCreates output lazily. For example if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect or Context.write.
-numReduceTasksSpecifies the number of reducers.
-mapdebugScript to call when map task fails.
-reducedebugScript to call when reduction makes the task failure
-cmdenv name=valuePasses the environment variable to streaming commands.

Hadoop Pipes

It is the name of the C++ interface to Hadoop MapReduce. Unlike Hadoop Streaming which uses standard I/O to communicate with the map and reduce code Pipes uses sockets as the channel over which the tasktracker communicates with the process running the C++ map or reduce function. JNI is not used.

That’s all for this section of the hadoop tutorial. Let’s move on to the next one on Pig!

Setting Up A Multi Node Cluster In Hadoop

Installing Java

Syntax of java version command

$ java -version

Following output is presented.

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

Creating User Account

System user account on both master and slave systems should be created to use the Hadoop installation.

# useradd hadoop 
# passwd hadoop

Mapping the nodes

hosts file should be edited in /etc/ folder on all nodes and IP address of each system followed by their host names must be specified.

# vi /etc/hosts

Enter the following lines in the /etc/hosts file.

192.168.1.109 hadoop-master
192.168.1.145 hadoop-slave-1
192.168.56.1 hadoop-slave-2

Configuring Key Based Login

Ssh should be setup in each node such that they can converse with one another without any prompt for password.

# su hadoop 
$ ssh-keygen -t rsa
$ ssh-copy-id -i ~/.ssh/id_rsa.pub [email protected]
$ ssh-copy-id -i ~/.ssh/id_rsa.pub [email protected]
$ ssh-copy-id -i ~/.ssh/id_rsa.pub [email protected]
$ chmod 0600 ~/.ssh/authorized_keys
$ exit

Installing Hadoop

Hadoop should be downloaded in the master server.

# mkdir /opt/hadoop
# cd /opt/hadoop/
# wget http://apache.mesi.com.ar/hadoop/common/hadoop-1.2.1/hadoop-1.2.0.tar.gz 
# tar -xzf hadoop-1.2.0.tar.gz 
# mv hadoop-1.2.0 hadoop 
# chown -R hadoop /opt/hadoop 
# cd /opt/hadoop/hadoop/

Configuring Hadoop

Hadoop server must be configured

core-site.xml should be edited.

<configuration> 
<property> 
<name>fs.default.name</name><value>hdfs://hadoop-master:9000/</value> 
</property> 
<property> 
<name>dfs.permissions</name> 
<value>false</value> 
</property> 
</configuration>

 
hdfs-site.xml file should be editted.

<configuration> 
<property> 
<name>dfs.data.dir</name> 
<value>/opt/hadoop/hadoop/dfs/name/data</value> 
<final>true</final> 
</property> 
<property> 
<name>dfs.name.dir</name> 
<value>/opt/hadoop/hadoop/dfs/name</value> 
<final>true</final> 
</property> 
<property> 
<name>dfs.replication</name> 
<value>1</value> 
</property> 
</configuration>

 
mapred-site.xml file should be editted.

<configuration> 
<property> 
<name>mapred.job.tracker</name><value>hadoop-master:9001</value> 
</property> 
</configuration> 

JAVA_HOME, HADOOP_CONF_DIR, and HADOOP_OPTS should be edited. 

export JAVA_HOME=/opt/jdk1.7.0_17 
export HADOOP_OPTS=-Djava.net.preferIPv4Stack=true
export HADOOP_CONF_DIR=/opt/hadoop/hadoop/conf 

 
Installing Hadoop on Slave Servers
Hadoop should be installed on all the slave servers

# su hadoop 
$ cd /opt/hadoop 
$ scp -r hadoop hadoop-slave-1:/opt/hadoop
$ scp -r hadoop hadoop-slave-2:/opt/hadoop

 
Configuring Hadoop on Master Server
Master server should be  configured

# su hadoop 
$ cd /opt/hadoop/hadoop

 
Master Node Configuration

$ vi etc/hadoop/masters  
hadoop-master

 
Slave Node Configuration

$ vi etc/hadoop/slaves 
hadoop-slave-1
hadoop-slave-2

 
Name Node format on Hadoop Master

# su hadoop 
$ cd /opt/hadoop/hadoop 
$ bin/hadoop namenode –format
 
11/10/14 10:58:07 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = hadoop-master/192.168.1.109
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 1.2.0
STARTUP_MSG: build =
https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.2 -r 1479473; compiled by 'hortonfo' on Mon May 6 06:59:37 UTC 2013
STARTUP_MSG: java = 1.7.0_71
************************************************************/
11/10/14 10:58:08 INFO util.GSet: Computing capacity for map BlocksMap editlog=/opt/hadoop/hadoop/dfs/name/current/edits
………………………………………………….
………………………………………………….
………………………………………………….
11/10/14 10:58:08 INFO common.Storage: Storage directory /opt/hadoop/hadoop/dfs/name has been successfully formatted.
11/10/14 10:58:08 INFO namenode.NameNode: SHUTDOWN_MSG: /************************************************************
SHUTDOWN_MSG: Shutting down NameNode at hadoop-master/192.168.1.15
************************************************************/

Hadoop Services

Starting Hadoop services on the Hadoop-Master.

$ cd $HADOOP_HOME/sbin 
$ start-all.sh

Addition of a New DataNode in the Hadoop Cluster

Networking

Add new nodes to an existing Hadoop cluster with some suitable network configuration. suppose the following network configuration.

For New node Configuration:

IP address : 192.168.1.103
netmask : 255.255.255.0 
hostname : slave3.in

Adding a User and SSH Access

Add a User

“hadoop” user must be added and password of Hadoop user can be set to anything one wants.

useradd hadoop
passwd hadoop

To be executed on master

mkdir -p $HOME/.ssh
chmod 700 $HOME/.ssh 
ssh-keygen -t rsa -P '' -f $HOME/.ssh/id_rsa 
cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
chmod 644 $HOME/.ssh/authorized_keys 
Copy the public key to new slave node in hadoop user $HOME directory 
scp $HOME/.ssh/id_rsa.pub [email protected]:/home/hadoop/

 
To be executed on slaves

su hadoop ssh -X [email protected]

Content of public key must be copied into file “$HOME/.ssh/authorized_keys” and then the permission for the same must be changed.

cd $HOME 
mkdir -p $HOME/.ssh
chmod 700 $HOME/.ssh  
cat id_rsa.pub >>$HOME/.ssh/authorized_keys
chmod 644 $HOME/.ssh/authorized_keys

 ssh login must be changed from the master machine. Possibility of ssh to the new node without a password from the master must be verified.

ssh [email protected] or [email protected] 

Set Hostname of New Node
Hostname is set in file /etc/sysconfig/network

On new slave3 machine 
NETWORKING=yes 
HOSTNAME=slave3.in

Machine must be restarted or hostname command should be run to a new machine with the respective hostname to make changes effective.
On slave3 node machine:
hostname slave3.in
/etc/hosts must be updated on all machines of the cluster

192.168.1.102 slave3.in slave3

 ping the machine with hostnames to check whether it is resolving to IP.

ping master.in 

 
Start the DataNode on New Node

Datanode daemon should be started manually using $HADOOP_HOME/bin/hadoop-daemon.sh script. Master(NameNode) should join the cluster after being automatically contacted. New node should be added to the conf/slaves file in the master server. New node will be recognized by script-based commands.

Login to new node

su hadoop or ssh -X [email protected]

HDFS is started on a newly added slave node

./bin/hadoop-daemon.sh start datanode

jps command output must be checked on a new node.

$ jps 
7141 DataNode 
10312 Jps

Removing a DataNode

Node can be removed from a cluster as it is running, without any data loss. A decommissioning feature is made available by HDFS which ensures that removing a node is performed securely.

Step 1
Login to master machine user where Hadoop is installed.

$ su hadoop

 
Step 2
Before starting the cluster an exclude file must be configured. A key named dfs.hosts.exclude should be added to our $HADOOP_HOME/etc/hadoop/hdfs-site.xmlfile.
NameNode’s local file system which contains a list of machines which are not permitted to connect to HDFS receives full path by this key and the value associated with it.

<property>
<name>dfs.hosts.exclude</name><value>/home/hadoop/hadoop-1.2.1/hdfs_exclude.txt</value><description>>DFS exclude</description>
</property> 

 
Step 3
Hosts to decommission are determined.
Additions should be made to file recognized by the hdfs_exclude.txt for every machine to be decommissioned which will prevent them from connecting to the NameNode.

slave2.in

 
Step 4
Force configuration reload.
“$HADOOP_HOME/bin/hadoop dfsadmin -refreshNodes” should be run

$ $HADOOP_HOME/bin/hadoop dfsadmin -refreshNodes

 
NameNode will be forced to re-read its configuration, this is inclusive of the newly updated ‘excludes’ file. Nodes will be decommissioned over a period of time, allowing time for each node’s blocks to be replicated onto machines which are scheduled to remain active.
jps command output should be checked on slave2.in. DataNode process will shutdown automatically.
 
Step 5
Shutdown nodes.
The decommissioned hardware can be carefully shut down for maintenance after the decommission process has been finished.

$ $HADOOP_HOME/bin/hadoop dfsadmin -report

 
Step 6
Excludes are edited again and once the machines have been decommissioned, they can be removed from the ‘excludes’ file. “$HADOOP_HOME/bin/hadoop dfsadmin -refreshNodes” will read the excludes file back into the NameNode;DataNodes will rejoin the cluster after the maintenance has been completed, or if additional capacity is needed in the cluster again.
To run/shutdown tasktracker

$ $HADOOP_HOME/bin/hadoop-daemon.sh stop tasktracker
$ $HADOOP_HOME/bin/hadoop-daemon.sh start tasktracker