Skip to content

Hadoop Streaming

    Hadoop Streaming uses UNIX standard streams as the interface between Hadoop and your program so you can write Mapreduce program in any language which can write to standard output and read standard input. Hadoop offers a lot of methods to help non-Java development.

    The primary mechanisms are Hadoop Pipes which gives a native C++ interface to Hadoop and Hadoop Streaming which permits any program that uses standard input and output to be used for map tasks and reduce tasks.

    With this utility one can create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.

    Hadoop Streaming Example using Python

    Hadoop Streaming supports any programming language that can read from standard input and write to standard output. For Hadoop streaming, one must consider the word-count problem. Codes are written for the mapper and the reducer in python script to be run under Hadoop.

    Mapper Code


    import sys

    for intellipaatline in sys.stdin: # Input takes from standard input

    intellipaatline = intellipaatline.strip() # Remove whitespace either side

    words = intellipaatline.split() # Break the line into words

    for myword in words: # Iterate the words list

    output print ‘%s\t%s’ % (myword, 1) # Write the results to standard

    Reducer Code

    #!/usr/bin/python from operator

    import item getter

    import sys current_word = “” current_count = 0 word = “” for intellipaatline in sys.stdin:

    # Input takes from standard input intellipaatline = intellipaatline.strip()

    # Remove whitespace either side word , count = intellipaatline.split(‘\t’, 1)

    # Split the input we got from try:

    # Convert count variable to integer count = int(count) except ValueError:

    # Count was not a number, so silently ignore this line continue if current_word == word: current_count += count else:

    if current_word: print ‘%s\t%s’ % (current_word, current_count)

    # Write result to standard o/p current_count = count current_word = word if current_word == word:

    # Do not forget to output the last word if needed! print ‘%s\t%s’ % (current_word, current_count)

    Mapper and Reducer codes should be saved in and in Hadoop home directory.

    WordCount Execution

    $ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1. 2.1.jar \

    -input input_dirs \ –

    output output_dir \ –

    mapper<path/ \

    -reducer <path/

    Where “\” is used for line continuation for clear readability

    How Hadoop Streaming Works?

    • Input is read from standard input and the output is emitted to standard output by Mapper and the Reducer. Utility creates a Map/Reduce job, submits the job to an appropriate cluster, and monitors the progress of the job until completion.
    • Every mapper task will launch the script as a separate process when the mapper is initialized after a script is specified for mappers. Mapper task inputs are converted into lines and fed to the standard input and Line oriented outputs are collected from the standard output of the procedure Mapper and every line is changed into a key, value pair which is collected as the outcome of the mapper.
    • Each reducer task will launch the script as a separate process and then the reducer is initialized after a script is specified for reducers. As the reducer task runs, reducer task input key/values pairs are converted into lines and feds to the standard input (STDIN) of the process.
    • Each line of the line-oriented outputs is converted into a key/value pair after it is collected from the standard output (STDOUT) of the process, which is then collected as the output of the reducer.

    Important Commands

    -input directory/file-nameInput location for mapper. (Required)
    -output directory-nameOutput location for reducer. (Required)
    -mapper executable or script or JavaClassNameMapper executable. (Required)
    -reducer executable or script or JavaClassNameReducer executable. (Required)
    -file file-nameCreate the mapper, reducer or combiner executable available locally on the compute nodes.
    -inputformat JavaClassNameClass you offer should return key, value pairs of Text class. If not specified TextInputFormat is used as the default.
    -outputformat JavaClassNameClass you offer should take key, value pairs of Text class. If not specified TextOutputformat is used as the default.
    -partitioner JavaClassNameClass that determines which reduce a key is sent to.
    -combiner streaming Command or JavaClassNameCombiner executable for map output.
    -inputreaderFor backwards compatibility: specifies a record reader class instead of an input format class.
    -verboseVerbose output.
    -lazyOutputCreates output lazily. For example if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect or Context.write.
    -numReduceTasksSpecifies the number of reducers.
    -mapdebugScript to call when map task fails.
    -reducedebugScript to call when reduction makes the task failure
    -cmdenv name=valuePasses the environment variable to streaming commands.

    Hadoop Pipes

    It is the name of the C++ interface to Hadoop MapReduce. Unlike Hadoop Streaming which uses standard I/O to communicate with the map and reduce code Pipes uses sockets as the channel over which the tasktracker communicates with the process running the C++ map or reduce function. JNI is not used.

    That’s all for this section of the hadoop tutorial. Let’s move on to the next one on Pig!

    Setting Up A Multi Node Cluster In Hadoop

    Installing Java

    Syntax of java version command

    $ java -version

    Following output is presented.

    java version "1.7.0_71"
    Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
    Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

    Creating User Account

    System user account on both master and slave systems should be created to use the Hadoop installation.

    # useradd hadoop 
    # passwd hadoop

    Mapping the nodes

    hosts file should be edited in /etc/ folder on all nodes and IP address of each system followed by their host names must be specified.

    # vi /etc/hosts

    Enter the following lines in the /etc/hosts file. hadoop-master hadoop-slave-1 hadoop-slave-2

    Configuring Key Based Login

    Ssh should be setup in each node such that they can converse with one another without any prompt for password.

    # su hadoop 
    $ ssh-keygen -t rsa
    $ ssh-copy-id -i ~/.ssh/ [email protected]
    $ ssh-copy-id -i ~/.ssh/ [email protected]
    $ ssh-copy-id -i ~/.ssh/ [email protected]
    $ chmod 0600 ~/.ssh/authorized_keys
    $ exit

    Installing Hadoop

    Hadoop should be downloaded in the master server.

    # mkdir /opt/hadoop
    # cd /opt/hadoop/
    # wget 
    # tar -xzf hadoop-1.2.0.tar.gz 
    # mv hadoop-1.2.0 hadoop 
    # chown -R hadoop /opt/hadoop 
    # cd /opt/hadoop/hadoop/

    Configuring Hadoop

    Hadoop server must be configured

    core-site.xml should be edited.


    hdfs-site.xml file should be editted.


    mapred-site.xml file should be editted.


    JAVA_HOME, HADOOP_CONF_DIR, and HADOOP_OPTS should be edited. 

    export JAVA_HOME=/opt/jdk1.7.0_17 
    export HADOOP_CONF_DIR=/opt/hadoop/hadoop/conf 

    Installing Hadoop on Slave Servers
    Hadoop should be installed on all the slave servers

    # su hadoop 
    $ cd /opt/hadoop 
    $ scp -r hadoop hadoop-slave-1:/opt/hadoop
    $ scp -r hadoop hadoop-slave-2:/opt/hadoop

    Configuring Hadoop on Master Server
    Master server should be  configured

    # su hadoop 
    $ cd /opt/hadoop/hadoop

    Master Node Configuration

    $ vi etc/hadoop/masters  

    Slave Node Configuration

    $ vi etc/hadoop/slaves 

    Name Node format on Hadoop Master

    # su hadoop 
    $ cd /opt/hadoop/hadoop 
    $ bin/hadoop namenode –format
    11/10/14 10:58:07 INFO namenode.NameNode: STARTUP_MSG:
    STARTUP_MSG: Starting NameNode
    STARTUP_MSG: host = hadoop-master/
    STARTUP_MSG: args = [-format]
    STARTUP_MSG: version = 1.2.0
    STARTUP_MSG: build = -r 1479473; compiled by 'hortonfo' on Mon May 6 06:59:37 UTC 2013
    STARTUP_MSG: java = 1.7.0_71
    11/10/14 10:58:08 INFO util.GSet: Computing capacity for map BlocksMap editlog=/opt/hadoop/hadoop/dfs/name/current/edits
    11/10/14 10:58:08 INFO common.Storage: Storage directory /opt/hadoop/hadoop/dfs/name has been successfully formatted.
    11/10/14 10:58:08 INFO namenode.NameNode: SHUTDOWN_MSG: /************************************************************
    SHUTDOWN_MSG: Shutting down NameNode at hadoop-master/

    Hadoop Services

    Starting Hadoop services on the Hadoop-Master.

    $ cd $HADOOP_HOME/sbin 

    Addition of a New DataNode in the Hadoop Cluster


    Add new nodes to an existing Hadoop cluster with some suitable network configuration. suppose the following network configuration.

    For New node Configuration:

    IP address :
    netmask : 
    hostname :

    Adding a User and SSH Access

    Add a User

    “hadoop” user must be added and password of Hadoop user can be set to anything one wants.

    useradd hadoop
    passwd hadoop

    To be executed on master

    mkdir -p $HOME/.ssh
    chmod 700 $HOME/.ssh 
    ssh-keygen -t rsa -P '' -f $HOME/.ssh/id_rsa 
    cat $HOME/.ssh/ >> $HOME/.ssh/authorized_keys
    chmod 644 $HOME/.ssh/authorized_keys 
    Copy the public key to new slave node in hadoop user $HOME directory 
    scp $HOME/.ssh/ [email protected]:/home/hadoop/

    To be executed on slaves

    su hadoop ssh -X [email protected]

    Content of public key must be copied into file “$HOME/.ssh/authorized_keys” and then the permission for the same must be changed.

    cd $HOME 
    mkdir -p $HOME/.ssh
    chmod 700 $HOME/.ssh  
    cat >>$HOME/.ssh/authorized_keys
    chmod 644 $HOME/.ssh/authorized_keys

     ssh login must be changed from the master machine. Possibility of ssh to the new node without a password from the master must be verified.

    ssh [email protected] or [email protected] 

    Set Hostname of New Node
    Hostname is set in file /etc/sysconfig/network

    On new slave3 machine 

    Machine must be restarted or hostname command should be run to a new machine with the respective hostname to make changes effective.
    On slave3 node machine:
    /etc/hosts must be updated on all machines of the cluster slave3

     ping the machine with hostnames to check whether it is resolving to IP.


    Start the DataNode on New Node

    Datanode daemon should be started manually using $HADOOP_HOME/bin/ script. Master(NameNode) should join the cluster after being automatically contacted. New node should be added to the conf/slaves file in the master server. New node will be recognized by script-based commands.

    Login to new node

    su hadoop or ssh -X [email protected]

    HDFS is started on a newly added slave node

    ./bin/ start datanode

    jps command output must be checked on a new node.

    $ jps 
    7141 DataNode 
    10312 Jps

    Removing a DataNode

    Node can be removed from a cluster as it is running, without any data loss. A decommissioning feature is made available by HDFS which ensures that removing a node is performed securely.

    Step 1
    Login to master machine user where Hadoop is installed.

    $ su hadoop

    Step 2
    Before starting the cluster an exclude file must be configured. A key named dfs.hosts.exclude should be added to our $HADOOP_HOME/etc/hadoop/hdfs-site.xmlfile.
    NameNode’s local file system which contains a list of machines which are not permitted to connect to HDFS receives full path by this key and the value associated with it.

    <name>dfs.hosts.exclude</name><value>/home/hadoop/hadoop-1.2.1/hdfs_exclude.txt</value><description>>DFS exclude</description>

    Step 3
    Hosts to decommission are determined.
    Additions should be made to file recognized by the hdfs_exclude.txt for every machine to be decommissioned which will prevent them from connecting to the NameNode.

    Step 4
    Force configuration reload.
    “$HADOOP_HOME/bin/hadoop dfsadmin -refreshNodes” should be run

    $ $HADOOP_HOME/bin/hadoop dfsadmin -refreshNodes

    NameNode will be forced to re-read its configuration, this is inclusive of the newly updated ‘excludes’ file. Nodes will be decommissioned over a period of time, allowing time for each node’s blocks to be replicated onto machines which are scheduled to remain active.
    jps command output should be checked on DataNode process will shutdown automatically.
    Step 5
    Shutdown nodes.
    The decommissioned hardware can be carefully shut down for maintenance after the decommission process has been finished.

    $ $HADOOP_HOME/bin/hadoop dfsadmin -report

    Step 6
    Excludes are edited again and once the machines have been decommissioned, they can be removed from the ‘excludes’ file. “$HADOOP_HOME/bin/hadoop dfsadmin -refreshNodes” will read the excludes file back into the NameNode;DataNodes will rejoin the cluster after the maintenance has been completed, or if additional capacity is needed in the cluster again.
    To run/shutdown tasktracker

    $ $HADOOP_HOME/bin/ stop tasktracker
    $ $HADOOP_HOME/bin/ start tasktracker