Okay, in my last post I wrote about all that we need to do to get our environment ready for some serious development with Hadoop. If you haven’t yet read that post, please do so. There’s a lot to be done to actually start working with Hadoop using Vagrant. https://techsagabyjig.wordpress.com/2017/03/20/first-blog-post/

To summarize what was described there:

  1. If you wish to try out Hadoop and MapReduce on your system, you first need to ensure that you have enough RAM and HD space. Minimum 8GB with i3 processor (i5 or i7 gives you higher limits of RAM – 16GB/24GB) and 500 GB hard disk. More the better…. You will be able to create more VMs and run more programs with better speed, all at the same time!
  2. Install VirtualBox, Vagrant, and Hadoop on your vagrant box.
  3. Create the shared folder for transferring data to and from the guest machine.
  4. Get the VM ready for pseudo distributed architecture of Hadoop with configurations.
  5. Insert data into HDFS, which is the official file system for Hadoop.

If you are done with these steps, then you are ready to get more knowledge about this technology while practically trying it out on your computer. That’s the best way to learn without getting bored even before you start!

So, you must already be knowing a bit about this technology that is ruling the data segment in IT right now. It is the best way of administering semi and unstructured data with localized processing. It follows the batch-processing approach and works on the entire dataset rather than just a part of it like RDBMS. So, is it better than relational databases? The answer is no. It has gained popularity because of its approach that is so revolutionary and solves what is relevant to the current scenario and our coming days in data management. It is just another tool that solves a different kind of a problem. Relational database management software work on structured data that has a predefined schema. It can do what Hadoop does but with lesser efficiency and smartness. Hadoop is what we need to analyze data from all streaming sources like IoT devices, multimedia, search engines and healthcare systems. It is not interactive, and writes data once but reads several times. Updating data in HDFS can be difficult, but it is possible. To blend in RDBMS kind of features, many sub-products of Hadoop is out there in the market. You must have heard of Apache Spark, Pig, Hive, HBase etc. We’ll discuss these later. These are related products that process a variety of data types in different ways.  You can find more details about the background of Hadoop in a  book named Hadoop: The Definitive Guide by Tom White.

Now, imagine that you get an assignment from a science department that gives you loads and loads of data collected over time and tells you to calculate some statistical information that it needs. You have not few MBs of data but, several GBs of data in text form. What would you do? They haven’t given you any schema and the data is all alpha-numerical in rows. Say its weather data and you need to calculate the maximum value.

SQL can be very very slow because it has to break it down into discreet fields and enter the values. Then calculate. Scripts can be written but that will again be efficient only to some level. 

Doug Cutting, the founder of Hadoop and Lucene project came up with an approach that divided the whole problem-solving process into 2 steps: Mapping and Reducing. The structure of the data to be processed is in the form of keys and values. So it is the process of converting the raw data in rows into key-value pairs and then calculating the result in batches of few records, recursively till the final answer is retrieved.

The raw data may look like this:

0067011990999991950051507004…9999999N9+00001+99999999999…
0043011990999991950051512004…9999999N9+00221+99999999999…
0043011990999991950051518004…9999999N9-00111+99999999999… 0043012650999991949032412004…0500001N9+01111+99999999999…
0043012650999991949032418004…0500001N9+00781+99999999999…
With the map function we will make it look like this:

(0, 0067011990999991950051507004…9999999N9+00001+99999999999…)
(106, 0043011990999991950051512004…9999999N9+00221+99999999999…)
(212, 0043011990999991950051518004…9999999N9-00111+99999999999…)
(318, 0043012650999991949032412004…0500001N9+01111+99999999999…)
(424, 0043012650999991949032418004…0500001N9+00781+99999999999…)

Here, the first column is a random increasing number.

After this we get,

(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78) which are pairs (year, temperature) recorded.

Now, the task of calculating the Max. temperature is done by the reducer.

Initially, it will group the temperatures:

(1949, [111, 78])
(1950, [0, 22, −11])

and finally calculate and give the output as:

(1949, 111)
(1950, 22)

For our programs, you can download and use the datasets in the below github repo:

https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all

Writing our first MapReduce program in Java

The Mapper:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;

// The Writable interface is specific to Hadoop and we will talk about it in detail later.
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == ‘+’) { // parseInt doesn’t like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches(“[01459]”)) { // a check
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}

The Reducer for max. temperature:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {

int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}

The job for running the program:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println(“Usage: MaxTemperature <input path> <output path>”);
System.exit(-1);
}

Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName(“Max temperature”);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

We will discuss the line-by-line semantics of the above three programs in the next post. But for now, we will attempt to run this in hadoop and check the output. 

Once you have created a local folder in you guest machine and copied these programs there, you need to compile then using the following command:

hadoop com.sun.tools.javac.Main *.java

If you get any errors, check the Java version, should be 8 or below, the PATH variable should contain the java bin folder and HADOOP_CLASSPATH should have tools.jar of Java 8. Please refer to the previous post.

If successfully done, the .class files are created and then we need to create the jar our of the class files: jar cvf mt.jar *.class

Now, you can insert the latest weather data into HDFS. Note that you can download into you host computer and copy that to your “share” folder that is in sync with the “host” folder of guest.

hadoop fs -copyFromLocal  ~/host/1990.txt  /user/$USER/data  

To check that is inserted,

hadoop fs -ls /user/$USER/data

Next, we will run the MapReduce program on our data.

hadoop jar mt.jar /user/$USER/data.txt  /user/$USER/output/1

$USER is your username. To check the output, list the files as,

hadoop fs -ls /user/$USER/output/1

You will see two files: 1. part-r-00000 and 2. _SUCCESS, the first one contains the result, something like this:

out

The output will give you the percentage wise map and reduce function and finally the statistics of tasks, counters, reads/writes, memory and space used etc. and most importantly the time taken. For GBs of data it might be just a few minutes.

If you are lucky, then everything might go smoothly. But there are high chances that you will get errors and exceptions. I have listed those below:

  1. SASL: Check your Java version. Shouldn’t be 9. openjdk Java 8 is the recommended version.
  2. Make sure that all the servers are running. Use jps to ensure Datanode, Namenode Historyserver are running before inserting data or running the programs. If these are stopped you may need to restart them, else the hadoop commands will throw exceptions. All need to run, not one or two.
  3. ChecksumException: Checksum Error: This is due to corruption of data. If the job has completed successfully, you may need to destroy the VM and recreate if inserting the data again into HDFS does not help 😦 Sorry!!
  4. Connection Closed…This is a very common scenario. For this you may need to halt the VM and restart it. Exit from the VM, then “vagrant halt”, “vagrant up”.
  5. hdfs.DFSClient: DataStreamer Exceptionorg.apache.hadoop.ipc.RemoteException(java.io.IOException): File /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1486087445115_0003/job.jar could only be replicated to 0 nodes instead of minReplication (=1).  There are 1 datanode(s) running and no node(s) are excluded in this operation. Your HDFS disk is full and you need to add more space. User another VM that’s it. At any point in time, you can check the root file system using  hadoop fs -df command.

So, we ran a sample mapreduce program to calculate the maximum temperature.

Next  blog will describe in detail the structure of the program in detail and the actual working of the program in the cluster. Please go through the following link to get more details:

http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

Happy Reading!

 

 

 

 

 

 

 

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s