Overcoming frustration of distributed computing

Apache hadoop and pig are the kinds of tools comparable to nixcraft with wings, even though pigs don’t fly. In some sense, the ability to wield thousands of machines for some highly or not so highly parallelizable algorithms is pretty close to the cutting edge of human computing power. With a well-designed pipeline, for instance, one could compute more digits of pi than has been done in recorded history. However it is often tempting to translate unix commands directly to their grid versions, which can be catastrophic, since the feedback loop is usually fairly long, often due to job overheads.

The main frustration I had to deal with today, even though I have encountered this many times in the past, is the curious trailing tab at the end of each line of my tab separated input file. Locally when I split such lines in a python script, I can be somewhat conservative in stripping the line only by new line symbol. Once the input is fed to a hadoop streaming job, however, I also need to strip out a trailing tab, whose origin has been mysterious to me. Pig streaming seems immune to this travesty.

Other caveats in working with grid that I might as well record here, are divided into two categories. The first has to do with importing external libraries for python streamers. It is fairly straightforward to package an entire python executable along with all its installed packages as a tar ball, and ship it from hdfs in a grid job. So far this is exactly like how one would invoke a local uninstalled version of python. But when an external python script is imported, the remote machines seem unable to locate those scripts, even when they are explicitly included in the streaming configuration. The trick is to include the following line before any external library is imported:

sys.path.insert(0,’/’.join(__file__.split(‘/’)[:-1]))

The second set of caveats has to do with maintaining the same order of records in the input files. A common situation I run into is to apply the same script to 100 input files, in parallel fashion. The only problem is that by default mapreduce jobs would scramble the records according to some internal sorting and partitioning rule. In general, there is no way to enforce ordering of the mapper output, unless the input is already sorted according to some column, and further more there is another column identical which each input file. Once those two columns are in place, the mapper can simply be a cat, whereas before the results are sent to the reducers, they can be partitioned and sorted using the following flags:

-Dstream.num.map.output.key.fields=$numinputcolumns -Dmapred.text.key.partitioner.options=-k”$fileidentifyingcolumn” -Dmapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -Dmapred.text.key.comparator.options=”-k”$fileidentifyingcolumn,”$sortingcolumn”

Advertisements

About aquazorcarson

math PhD at Stanford, studying probability
This entry was posted in Uncategorized. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s