Hadoop The Definitive Guide Eclipse Environment Setup

If you like Tom Whites Hadoop the definitive guide book,  you will be more excited and satisfied to try out the code yourself. It is possible that you can use Ant or Maven to copy the source code into your project and configure it yourself. However, the low hanging fruit here might be just use git to clone his source code into your local machine and it will almost work out of box. here I took a few screen shots loading his code in Eclipse environment and hopes and be helpful.

1. Get Source Code.

Tom’s book source code is hosted in github, click here. You can submit issues or ask the author himself if you have further questions. I git cloned the project into my Eclipse workspace – a brand new workspace called EclipseTest.

Image

 

2. Load Existing Maven Project into Eclipse.

Then you need to open up eclipse, and click File -> Import -> Maven -> Existing Maven Projects. Since every chapter could be a separate maven project and I imported the whole book, every chapters and also the tests&example code for sake of time.

Image

 

When you try to load the maven projects, it might report errors complaining missing plugins .etc. Give it a quick try if you can just simply find the solution in Eclipse market place to make the problem go away, if not, then just keep importing with errors. In my case, I was missing maven plugin 1.5..etc. which lead to a situation that I have some problem building chapter4 only.. However, that is good enough for me since I can at least get started with other chapters or examples.

I also took a screen shot of the output file so you can have an brief idea of how the output should look like.

Image

3. Run Code.

Now you can test any examples that built successfully within Eclipse without worrying about environment. For example, I am reading Chapter7. Mapreduce types and formats which he explained how to subclass the RecordRead and treat every single file as a record. And he came up with a paragraph of code to concatenate a list of small files into sequence file – SmallFilesToSequenceFileConverter.java. I already run the start-all.sh from the hadoop binary bin folder. And I can see the hadoop services(Datanode, Resource Manager, SecondaryNameNode..etc.) are currently running. You need to configure the Java Run Configuration, so the code knows where to go for the input files and so does for the output files. After that you can just click run, and bang! code finishes successfully.

Image

Image

STUDY SOURCE CODE: EPISODE 5 – HADOOP.MAPREDUCE.JOBTRACKER

There is a huge difference between MapReduce1 and MapReduce2(YARN), in map reduce 1, they are using job tracker and task tracker to management the process, while YARN uses the resource manager, application manager. Etc to improve the performance.

Since MapReduce1 has already played its part in the Hadoop history for a while, we will study how the map reduce works from a perspective of studying history.

I downloaded the older version of hadoop release 1.2.1 from Here. You can also view the source code from Apache SubVersion online here:

When you set all of this up, and dive to the directory mapred.org.apache.ahdoop.mapred, you will be amazed at how many classes they have in that directory. There are about 200 classes in that directory. From this perspective, we can see this is the central place where all the map-reduce magic happens. Now let’s star this journey with the JobTracker who is the “central location for submitting and tracking MR jobs in a network environment”.  It has about 5000 lines of code. Since this is not some sort tutorial but nothing other than some random study notes by a Java layman. I will first post the basic structure of this class, like the content of a book based on the author’s comment, marked by the line number.

JobTracker:

  • 0 foreplay
  • 1500 Real JobTracker
  •                -propertis
  •                -constructors.
  • 2300 Lookptable JobinProg and TaskinProg
  •                -create
  •                -remote
  •                -mark
  • 2515 Accessor
  •                -jobs
  •                -tasks
  • 2909 InterTrackerProtocol
  •                -heartbeat
  •                -update..
  • 3534 JobSubmission Protol
  •                -getjobid
  •                -submitjob
  •                -killjob
  •                -setjobpriority
  •                -getjobprofile
  •                -status
  • 4354 Job Tracker Methods
  • 4408 Methods to track TT
  •                -update
  •                -lost
  •                -refresh
  • 4697 Main (debug)
  •                4876 Check the job if it has invalid requirements
  •                4995 MXBean implementation
  •                               -blacklist
  •                               -graylist
  •                5110 JobTracker SafeMode

 

STUDY SOURCE CODE: EPISODE 5 – HADOOP.MAPREDUCE.JOBSUBMITTER

In Tom White’s book, he mentioned in Chapter 6, Classic Mapreduce – He described from the macro perspective, the whole map reduce job could be mapped into 6 logical steps. Job submission, initialization, task assignment, execution, progress and status update and in the end, job completion.

We will start by looking at the job submission step first. Actually there is a class JobSubmitter in hadoop.mapreduce, and as the name indicates, this class handles how the job got submitted to the jobtracker from the client.

Image

This method will get the source file system and destination file system and compare if they are the same ones by comparing host name and even port number.

Image

Clearly, in the process of submitting the job, there will be some files involved that need to be sent to the job tracker, like the code files, the jar files, the customized configuration file etc. And before sending the file out, we need to check if the job tracker is actually on the same machine with the client. In that way, we don’t need to send the data out. And the return object of the method is the new Path(old path if exists).

Image

Of course, after checking if the local file system is actually the same as the remote file system. The smart and cautious hadoop developers will copy the files to the job track file system. In this step, it will check and load the files, libjars and archives. I omitted several lines of code and trying to fit the whole method into one screen shot. So the reader can have a brief idea of how this method looks like.

Image

Now we are already half way through this class and I folded the rest of the class so you can have an overview of what is left in this class. As you can see, submitJobInternal might be the longest method in the code. And actually, the whole idea behind submitJobInternal is fairly straightforward. Maybe it will be helpful to understand if I categorize it into five words “1. Configure 2. Secret 3. Split 4. Queue 5. Submit”

(1). Configure:

Image

basically configure the job based on the client attributes, set up all the configuration properties like user name, job id, host address, etc.

(2). Secret:

Image

This job submission cannot happen with certain kind of credential behind it when distributing the job beyond only the jobid. It will get the delegation token for the job directory from the namenode and also generate a secret key.

(3). Splits

Image

This method splits the input job logically.

(4). Queue

Image

Set up which job queue the job need to be sent out and get the administration of that job queue.

(5). Submit

Image

In this end, call the submitClient.submitJob method to submit the job to the jobtracker using the jobId, submission directory and the secret key/credentials. And after this is done, the staging table will get deleted and the job got submitted successfully.

Out of those five words, the Split is actually an interesting topic. Actually, there are three methods afterwards that are directly related to this concept. There are two scenarios when thinking about splitting input files as the input for mappers. If that is a job that has already been created, it will read in the number of mappers and split the input file based on that number. If that is a job that is new or doesn’t have the number of mappers specified, it will split in another way. After the splitting, it will sort the splitted file by size, so the file with the biggest size will get submitted first. 

Image

Note: The split is a logical split of the inputs, which means the files won’t be split physically into certain of chunks. And instead, each split file might just be a tuple which records the start line filename, number and offset.

 

 

STUDY SOURCE CODE: EPISODE 4 – HADOOP.FS.SHELL

Have you ever ask the question to yourself: “what is really going on behind the command hdfs -copyFromLocal”, how it split the file into chunks and store it in different data node?

To answer that question, you need to start with the command `hdfs`. After you find the code of hdfs shell command, it will look like something similar like this:

Image

That was the last part of the code, and it basically run a Java class corresponding to your shell sub command after hdfs. For example, the command that we are interested in “copyFromLocal” belongs to the subcommand dfs. And dfs maps to “org.apache.hadoop.fs.FsShell”. And if you go to the directory – shell, which is in the same folder as FsShell, you will see most of the commands that you are familiar with actually maps to a separate Java class. ‘Mkdir.java’ to ‘dfs -mkdir’, …. I took a quick look at the source code of FsShell.java. 

Image

 

As you can see, depending on your command, the shell instance (client) will read in the local hadoop configuration and set up a job based on the configuration. Then run the corresponding command.To me, there are a few haunting terms that how it end up mapping to the corresponding Java class, say ‘Mkdir.java’..etc. It is usually CommandFactory and also an abstract class to describe command ..etc, which I don’t understand at this stage and I will jump straight to the CopyCommands.java.

Image

As you can see, the commands CopyFromLocal and CopyToLocal are actually the wrapper of Put and Get. And if you trace down the path, you will find the Put command is actually using IOUtils.copyBytes. It is interesting to see in the end, real world usage will end up in the function that is belongs to the `utility tool kit`. So far, I am still confused that how does the job knows how to break the files down to chunks and which chunk sent to where.. I guess it might be determined by the filesystem configuration file and the jobclient make all the magics happen. In another way, behind that one liner ‘ToolRunner.run’, all the coordination happens smartly. And that might be the next focus point of learning the Java source code of Hadoop.

 

OMG – Java in Eclipse

  1. syso, CTRL+SPACE (stdout print)
  2. Alt + Up/Down (move lines)
  3. Alt+Shift+Up/Down (move hierachy)
  4. Alt+Shift+R (refactor)
  5. Ctrl+Shift+O (organize)
  6. Ctrl+1 (fix)
  7. Ctrl+Q (last edit)
  8. Alt+s, r (accessor)
  9. Ctrl+j (incremental search)
  10. Ctrl+h (Java search)
  11. Ctrl+O (show all methods)
  12. Ctrl+PageUp/PageDown (switch opened files)
  13. Ctrl+E (switch opened files)
  14. Ctrl+M (maximize window)
  15. Ctrl+3 (search cmds)
  16. Ctrl+F8 (switch perspective)
  17. Ctrl+L (jump to line)
  18. Ctrl+Shift+G (search infile reference)
  19. Mylyn
  20. Ctrl+Shift+num_/ or num_* (fold and expanding code)

STUDY SOURCE CODE: EPISODE 3 – HADOOP.HDFS.SERVER.DATANODE.BlockSender/BlockReceiver

How to read and write block from/to a datanode.

Lets start from the blocksender – who “Reads a block from the disk and sends it to a recipient.”

BlockReceiver_comment

There might be two methods out of all that interest people the most, sendBlock and sendPacket. Since sendBlock is actually using sendPacket, I will went through the code of sendPacket first.

BlockSender_sendPacket1

All the input arguments are fairly straightforward except for the “transferTo”, which “use transferTo to send data”. So what is “transferTo”? transferTo was added into hadoop by Raghu Angadi in this jira ticket. And he claimed that it might reduce the datanode CPU by 50% or more. Clearly, it is some sort of java function to transfer data in a much more efficient way. Here is a much detail explanation of how transferTo works. It basically moves the data from one fileChannel to another one directly without “copying” it. It is better than writing a for loop and move the data bits by bits from one channel to another.

BlockSender_sendPacket2

This paragraph of code above is how the data node actually sends data out, in two different mode. The “normal transfer” is basically using the “hadoop.io.IOUtils.readFully” to read the data using a while loop from the inputstream and write it to the output buffer.

If the flag “transferTo” has been turned on, it will go straight to the “socket” part of code. First write the header and checksum, then write the data part using transferToFully function. On the other hand, if the “transferTo” flag has been turned off, it will do the normal transfer first, send the data first to verify the checksum, then write the header/checksum/data together to the buf. Beyond of the codes here, there are some exception catch and logging code which I won’t explain here.

 

BlockSender_sendBlock

It is much easier to understand the “sendBlock” class after going through “sendPacket”, because it basically first determine what the size of `pktBufSize` should be. There is an interesting thing here. When the flag transferTo is true, the `pktBufSize` is only the sum of the `PacketHeader.PKT_MAX_HEADER_LEN` and `checksumSize * maxChunksPerPacket`. The first adder is basically a place holder large enough to hold the limited biggest header. However, the second adder is only the number of bytes for all the checksums. Yes, it doesn’t create a packet buffer big enough to hold the data. You might be so confused if have not followed through the `sendPacket`, actually when you are doing `transferTo`, the way how data got transferred is directly move data from one fileChannel to another fileChannel and in this case, you even don’t need to buffer to be used for the temporary buffer to hold the “copying” data. Now we can see it not only improves the CPU performances but also saves tons of space when transferring the data. Of course, if you are not using `transferTo`,  it will create a buffer that big enough to hold chunk data and checksum data for so so many chunks per packet.

After the buffer got successfully created, the javaNIO will allocate enough space to create pktBuf, and then send the block packet by packet.

In the end:

(1) Why is ‘transferTo’ so fast? Answer: DMA(Direct Memory Access)

(3) How buffer works in Java.

exercise_NIOByteBuffer

 

STUDY SOURCE CODE: EPISODE 2 – HADOOP.HDFS.PROTOCOL.BLOCK

A Block is a hadoop FS primitive, identified by a long.

If you go to the data directory of the hadoop datanode, you can usually see a whole bunch of files starting with blk_ followed by a long number, or similar file name but end with .meta. Also some other subdirectories that recursively include all the block files. These files are block files. If you open those files using text editor (VI), you can even see the plain data partially, if some data got compressed, you might see the binary format representation. Anyway, the Block is the unit of the data. Understanding the Block class will be helpful to understand how hadoop distribute data later.

This post will walk through the source code of hadoop.hdfs.protocol.block class.

Very first of all, block class extends the interface Writable and Comparable, which are the two classes that every key, value variable in the map reduce job should extend from. So let’s first take a look at the methods that Block implements from Writable and Comparable.

Image

Here there are basically three pairs of methods to read and write the fields, helper and blockid.

Image

These two methods indicates that a block might contain different content, but to judge if two blocks are the same, blockId is the only variable that matter. BTW, the `?:` statement really simplifies the code and interesting to read.

There are also a few routine java methods like the class accessor(setter/getter), constructor…etc. However, beyond that, there are a few methods which will use regular expression to extract the id/generationstamp from the file name which might worth mentioning here.

Image

And here is the screen shot of a data node with block file, block meta file, and current working directory highlighted by light yellow marker.

Image

You can write a test class, include the hadoop-core in the to pom file using Maven, and see if those regular expression functions will be able to parse out the blockid, and gentime.

 

 

Study Source Code: Episode 1 – hadoop.hdfs.DFSOutputStream.java

Whenever I talk to other people, I say “I love open source software and hate proprietary ones”. However, have you ever read the source code yourself? I rarely read the source code of any tools that I use, but I benefited so much whenever I took a look. Also, these days, our team got stuck with our hadoop environment. Some people complaint it is all screwed up but they cannot correctly identify what is going wrong.  Other people say everything is doing fine but they also cannot face the fact that all the Hive queries take way much longer than everyone expected. Based on two things that I mentioned above, I decide to take a look at the source of Hadoop/HDFS …etc. I am more like a Python programmer and my only knowledge of Java is no more than `System.out.println()` and `java -jar` to run the jar file. However, since I have already follow this tutorial to set up the hadoop maven project in Eclipse, I think it will be a good start.

These series of posts are more about some random study notes about how I try to understand the nut and bolt about the hadoop and wish I will figure out what is really going on behind the screen of `hdfs dfs -copyFromLocal source destination`..etc

Today, it is all about the DFSOutputStream.java, the reason that why I started with this class is from the book by Tom White – “The Definitive Guide of Hadoop”. He described the process behind hdfs write. And DFSOutputStream is one of the most important classes involved in this process. To describe the role of DFSOutputStream in a nutshell, the comments at the beginning of the source code is the most authoritative one.

Image

 

5 MB = 5 * 1024KB = 5120 KB = 80 Packets * 64 (KB/Packet) = 80 Packets * 512 (B/Chunk) * (128 Chunk/Packet).

As we can see from the class definition. DFSOutputStream is a subclass of FSOutputSummer, or DFSOutputSream extends class FSOutputSummer – “This is a generic output stream for generating checksums for data before it is written to the underlying stream“.

Keep going down the code, we can see there are many attributes declared which might be interesting too take a look at first.

Image

 

  • MAX_PACKETS=80, PACKET=64KB -> 5MB, also there is an another attribute “chunksPerPacket”, each packet doesn’t have to be completely filled up to be sent out. At most you can have 128 chunks per packet.
  • There are three variables that are super interesting, “dataQueue” and “ackQueue” which are both LinkedList type whose elements are Packet class. Also, another variable “streamer” is the object who will grab available data nodes from the pipeline and distribute the user write data to different data nodes.
  • Another few variables who are either user arguments: ‘fieldId, src, ..’ or variables to describe the queuing behavior ‘currentSeqno, lastQueuedSeqno, …’.

After the variables, here comes the definition of the most important class type ‘Packet’. There are two constructors for the class `Packet`, one is used to create a heart beat packet, and the other one is used to create a new packet. ‘pktSize’ is teh maximum size of the packet, including checksum data and actual data, chunksPerPkt is the max number of chunks per packet(128), ‘offsetInBlock’ is the offset in bytes into the HDFS block, our default block size is 128MB.

Image

 

Here, we might need to take out a paper and draw the buffer to help understand how a packet really looks like. First, packet need to declare a buffer whose length is the sum of PKT_MAX_HEADER_LEN and pktSize. The variable PKT_MAX_HEADER_LEN is defined in another class called `hdfs.protocal.datatransfer.PacketHeader`.

Image

 

Instead of trying to dive into another class, I will just paste the related code here and keep the scope inside the class `DFSOutputStream`,  basically, the order of is 1.header, 2. checksum 3. data….when I was first looking at the graph-ish description of the buf at the beginning of the class. I was thinking the checksum and data was inconsistent, somehow cut off by some data represented by underscore. Well, I guess the reason that they put underscore there is because the checksum and data might not fully populate every bytes and they start from the beginning and might leave the later part empty…

After the Packet has been constructed, there are three methods containing the word ‘write’:

  1. writeData(byte[] inarray, int off, int len), which will read the input data from the buffer inarray, validate the size of the input, make sure it will not exceed the limit set by the constructor. (dataPos + len <= buf.length). Then use System.arraycopy to copy the input content to the packet buffer.
  2. writeChecksum(byte[] inarray, int off, int len), it has almost exactly the same code structure as the writeData function.. but this method is used to write Checksum to the packet buffer.
  3. writeTo(DataOutputStream stm) which “Write the full packet, including the header, to the given output stream.”, in the writeTo function, it first get the absolute length of the data and checksum, then build the pktLen by adding up the data and check, PLUS an extra bytes.. not sure what used for. Then it build the header based on several header information. (1). pktLen, (2). offsetInBlock, (3). seqno, (4). lastPacketInBlock, (5). dataLen, (6). syncBlock which matches the comment in the PacketHeader class. After the header get successfully built, the interesting parts comes. How to arrange the position of these three parts, header, checksum, data before sending. If the checksumPos doesn’t equal to the dataStart, which means that there is a gap between the checksum and actual data in the buffer. Then the code will move the checksum to the right so the checksum and data are right next to each other and will leave more room for the header. After all of this, there are a few “assert” to check everything is fine and send out the contiguous full packet to the output stream.

So till here, we went through the source code of DFSOutputStream-Packet definition which is very inspiring to help understand how the atomic packet looks like which sending the data. And after we are done with it, we will take a close look at what is the “DataStreamer”. From the number of code lines, there are 2000 lines of code in total for DFSOutputStream, and there are about 1000 lines dedicated to define the data streamer, I will say it is a good example of how important this subclass matters.

First, let’s take a look at the DataStreamer constructor:

DFSOutputStream_datastreamerconstructor

To build the DataStreamer, you need: (1) last block of the file to be appended (2) status of the file to be appended (3) number of bytes per checksum

The DataStreamer constructor is very interesting, it first read in the information of the previous written block, and do some math calculation of whether there is enough free space in the previous block. Here we take a few parameter just to prove the concept. For example, we assume the blockSize is 128MB, which is the default value for CDH. And assume we are loading a file with size of 2GB, which is 2048 MB. And the variable bytePerChecksum is 512Bytes as default. Lets walk through these few lines and try to do some calculations:

DFSOutputStream_constructorcheckfreespace

stat.getLen() will be 2000MB, say blockSize is 128MB, then

usedInLastBlock = 2000 % 128 = 80MB, in another way, it is 15 complete blocks plus another block whose complete size is 128MB and only 80MB got populated. Then freeInLastBlock turns out to be (int)(blockSize – usedInLastBlock) =(int)(128-80)=48MB.

say bytesPerChecksum=0.5 MB which is 512Bytes. Then usedInCksum = (int) (2000%0.5)=0, then freeInCksum = 512 – 0 = 512. Then there will be a if statement to check “if there is space in the last partial chunk, then set up in such a way that the next packet will have only one chunk that fills up the partial chunk.”. However, there is a function that we need to take a look before moving on called “computePacketChunkSize”.

DFSOutputStream_computePacketChunkSize

This function will calculate the packetSize based on psize and csize. You can populate these variables with a few random numbers to just have a go with how this function works. But after all of this, it will trigger another function called ‘setPipeline(lastBlock)’…

To Be Honest, I am Running Out of Patience, and Really Think I Cannot Finish All the Codes This Afternoon. But I Won’t Leave This Very First Post Unfinished, I Went Through the Functions and I Think “transfer” is the Most Interesting One to Me. And I will Also Take a Look At the Other Functions Later!

DFSOutputStream_transfer

Sender(out).transfer(block, blockToken, dfsClient.clientName, targets);

out.flush();

Done!

 

 

 

 

Kickstart configuration

 

kickstartconfigurator

ssh -Y Enables trusted X11 forwarding. Trusted X11 forwardings are not
subjected to the X11 SECURITY extension controls. You need to have XQuartz/X11 installed.

On the server, you run system-config-kickstart, it will actually open up the app window on your local mac using ssh tunnel. Score!

For windows user, you can run the command: export DISPLAY=yourlocalcomputerip:0.0  and if you have some tool like reflection… it will work too.