EECS-4415M Big Data Systems Assignment #2 Frequency

$30.00

Category: You will Instantly receive a download link for .zip solution file upon Payment || To Order Original Work Click Custom Order?

Description

5/5 - (7 votes)

Write a MapReduce job for Hadoop to find the tf-idf of words in documents. Use Python (3) to implement your map & reduce tasks, as in the examples in class. (You will use the stream library in Hadoop for this.)

The tf-idf metric is a measure of how important a word is within a document for helping establish the document’s relevance with respect to a term-based search which employs the word. The metric tf stands for “term frequency”, and the metric idf for “inverse document frequency”. See the Wikipedia article, Wikipedia: tf-idf, for more background. (The Wikipedia write up as of 2021 January 24 was quite decent.) The measures were introduced in 1972 by Karen Spärck Jones, and have been a main staple in information retrieval ever since. There is much written about the tf and idf metrics.

Your MapReduce job should read in twenty books from The Project Gutenberg as text-file documents, and output worddocument , tf-idf score per word per document.

Parsing

You will need to tokenize the input into “words”. For this assignment, let us choose the convention that we split on whitespace and “_” characters, as in Assignment #1. E.g.,

tokens = filter(None, re.split('[\W+_]', line))

And further, we lowercase all letters in each token, and we throw away all but letters and digits. E.g.,

word = re.sub('[^a-zA-Z0-9]+', '', token.lower())

Let us call the resulting tokens words.

tf & idf

For tf for this assignment, let us simply use the count of the number of occurrences of the word in the document. So this is very similar to an example MapReduce job that we did in class.

Thus, tf is with respect to a word W and a document D. Let

tf(W,D)=#occurrences of W in D

A better version of the tf measure is the number occurences divided by the total number of word (occurrences) in the document. However, this would make our MapReduce job a bit more involved. Well, maybe for next time.

Define the idf of a word W as follows. let d(W) be the count of the number of documents that word W appears within. E.g, say that “parke” appears in 17 of the 20 documents. Then d(parke)=17. Let N be the total number of documents that we are processing; e.g., 20. Then

idf(W)=ln(Nd(W))

where ln is natural log. Thus, inf(parke)=ln(2017)=0.162519.

Ideally, we would compute the number of total documents involved as part of our MapReduce job. However, again, this would make our MapReduce job a bit more involved. So, this is also for next time. For this assignment, just hardcode N, the total number of documents being processed (e.g., 20), into your mappers or reducers.

The tf-idf score for a word W in a document D is then just the product of the tf measure of the word and document and the idf measure of the word:

tf-idf(W,D)=tf(W,D)idf(W)
Hadoop

We shall use a Docker container image distribution of Apache Hadoop for the assignment. This is the same one I used for examples in class.

Installation: Docker & the Hadoop Image

Install Docker on your box (your computer), the community edition. The Hadoop image we shall use is “sequenceiq/hadoop-docker:2.7.0”. Install the image for Docker by

mybox% docker pull sequenceiq/hadoop-docker:2.7.0

You can also find sequenceiq/hadoop-docker:2.7.0 at Docker Hub, along with some instructions: sequenceiq/hadoop-docker — Docker Hub

Hadoop Container

Create an Hadoop container in which you will run your job.

mybox% docker run -it --name Hadoop sequenceiq/hadoop-docker:2.7.0 /etc/bootstrap.sh -bash

The --name flag above gives a name to your container, to make it easier to access. You can give it whatever name you want; “Hadoop” is a reasonable name.

On running that command, your shell will then drop you into a bash shell running within the Hadoop container.

You can suspend the container any time by stopping it from some other shell / window.

mybox% docker stop Hadoop

To start it up again at some point:

mybox% docker start Hadoop

To get a bash shell within the container again, start up a shell / window on your box and type

mybox% docker attach Hadoop

Read some about Docker for managing your images and containers.

Inception: Inside the container

Your access to the container is via a simple bash shell. The container is running a quite barebones linux with Apache Hadoop installed within it.

You are running effectively as root within the container. I recommend using /home within the conatiner for storing your files. You can copy your job directory from your computer’s filesystem — say freq/ — into /home to have access to it.

mybox% docker cp freq Hadoop:/home/freq

Docker‘s cp command can copy both directions.

For your Hadoop job, the input files have to be places within Hadoop‘s hdfs filesystem. To issue Hadoop commands, it is easiest to be in the directory of the Hadoop installation. An environment variable\$HADOOP_PREFIX, is set that has the directory’s location. Thus,

bash$ cd $HADOOP_PREFIX

The directory happens to be /usr/local/hadoop in this installation. So,

bash$ cd /usr/local/hadoop

is the same thing. We shall be using the Hadoop commands hdfs and hadoop to manage the distributed file system and to run map-reduce jobs, respectively. We can copy files into (-put) and out of (getHadoop‘s distributed file system (hdfs). Say you have a directory /home/freq/Gutenberg/ containing 20 text files. To copy it to a directory named Gutenberg/ to hdfs,

bash$ bin/hdfs dfs -put /home/freq/Gutenberg Gutenberg

The “bin/hdfs dfs” call has many (sub)commands that mimic the standard *nix shell commands for dealing with files and directories. E.g., to list,

bash$ bin/hdfs dfs -ls Gutenberg

To remove, say, directories One and Two that might contain files,

bash$ bin/hdfs dfs -rm -r One Two

To see the contents of file Out/part-00000,

bash$ bin/hdfs dfs -cat Out/part-00000

And so forth.

To execute an Hadoop MapReduce job — the job invocation — one calls

bash$ bin/hadoop jar …

providing the appropriate execution wrapper jarmappers and reducers (your Python programs, in this case), and the input and output directories. Refer to the class example as a template.

This image of Hadoop seems picky about allowing one to create hdfs directories via MapReduce jobs by default. I had to run

bash$ bin/hdfs dfsadmin -safemode leave

to turn off this “safe mode” before running my MapReduce job, so new directories could be created on the fly.

To test your Hadoop container, the image has a built-n stock example. You can run

bash$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.0.jar grep input output 'dfs[a-z.]+'

After a successful run, you can see the results in the hdfs directory output/.

bash$ `bin/hdfs dfs -cat output/*

A note on output directories: the job invocation creates the output directory / directories. It fails if there is already a directory with that name in the hdfs. So clean up (i.e., remove) these output directories between runs.

Python 2

Sadly, the distribution of Hadoop we shall use is bundled with Python 2, not Python 3, the later being our preference for the class. But we can live with it. Just be aware. Most things written for Python 3 will run in Python 2 without trouble. (Writing print with the parentheses — a Python 3 thing — works fine for Python 2, although not vice versa. Just test your code this time around with the Python 2 interpreter.

Class Examples

In the example we ran in class, we first did a simple one-stage MapReduce job to parse out the words from a set of input text documents — we used book textfiles from The Project Gutenberg, as we did for Assignment #1, and that we are doing for this assignment — to generate as output each word paired with how many occurrences of the word was seen in total. Nickname this job WordCount. We were able to do this in one stage; that is, with one map-reduce pair.

Our mapper parsed the input text files to words, and emitted each word (as the key) paired with the value “1” (the count), to indicate an occurrence of the word. Then our reducer aggregated for each key (a word) the count to yield the total number of occurrences of that word.

The second example extended on WordCount to count the number of occurrences of words starting with each given letter (“a”–“z”). Nickname this job LetterCount. We did this just by adding a second stage after the “WordCount” stage. Our second-stage mapper read as its input the output files from stage one (WordCount‘s reducer) and output as its key the first letter of each word along with the word’s count. Then our second-stage reducer aggregated the counts for each letter key.

Note that it would have been easy to write LetterCount to use just a single mapper-reducer stage. We did it in two stages in the example in class simply to illustrate constructing multi-stage MapReduce jobs.

See the files for job LetterCount as a template for your work. The file lct.sh is the bash script to invoke the Hadoop job for LetterCount.

Strategy

While LetterCount could have been in one stage, the freq job will need to be multi-stage, most likely.

The first stage is almost the same as WordCount. The only difference is that here, you need to count the number of occurrences of a word within a document, not of a word overall. Thus, you can adapt WordCount‘s mapper and reducer for this.

In this case, the output key-value should be, say, word,doccount (letting “▭” denote the tab character). Thus, the key is word,doc and the value is count.

The idf metric counts how many of the documents a word appeared within. Thus, idf is with respect to a word, W, not a word W, document D pair. Somehow, you need to get idf(W) associated as a value with the W,D key so that you can compute each tf-idf(W,D). That is the crux of this job.

Avoid

It may be tempting to collect all the worddocument pairs per word in a reducer instance so that it can output something like word,doccount,#docs. After having collected all the key-values for a particular word, the reducer can know how many documents (#docs) that the word has appeared in. And it needs to know both the count — the number of occurrences of the word in the document — and the #docs — how many documents overall the word has appeared in — to compute the tf-idf for each worddocument pair. The reducer could then iterate over the worddocument pairs it collected (cached) for that word, and write out, say, word,doccount,#docs for each. (Or write out word,doccount,tf-idf, since at this point, you have the information to compute the tf-idf.)

However, doing it this way would be bad practiceWhy? Imagine that instead of 20 books, we run the job with an input of 10 million books. A reducer instance could run out of memory attempting to cache up to 10 million pairs in main memory per word it processes. In other words, implemented this way, the job does not scale. And that is the whole idea of MapReduce, is that such jobs scale. Furthermore, even if the number of books in the input is “small” enough that such a reducer does not run into memory bounds, it would be quite inefficient.

Warning. If you implement your job for the assignment this way, it is a 20% penalty.

Instead, we want our mappers and reducers to be light weight. Each should only need to look at a key-value pair one at a time to generate the output pairs. For this, think of a way to get the #docs information into the key-value stream so that it is available per word as needed.

Debugging

Debugging code for a distributed runtime can be difficult. And the ship-in-a-bottle nature of things here — your host machine running a linux container running Hadoop — complicate things further.

Hadoop usually commandeers ports so one can use a web browser to access web portals that show parameters of the system and for tracing specific tasks. This is a bit involved to set up to pass through when we are working within a container, though… So, for this assignment which is quite contained (pun!), we don’t.

You will be able to get what you need from the runtime output that spills to the screen when you launch an Hadoop job. (See the FAQ for how to catch a copy of this into a log file.) You ought to also test your code initially just as a pipeline, to see that your Python mapppers and reducers are working. It is best to do this in the container, since that is where the code needs to run for the Hadoop job.

Say that you have a two-stage Hadoop job; that is, two map-reduce pairs with the second pair reading as its input the output of the first pair. Let aMap.py and aReduce.py be your first pair, and let bMap.py and bReduce.py be your second pair. Let bookA.txt be an input file for the job. You can run the sequence from the command line.

bash# cat bookA.txt | aMap.py | sort -k1,1 | aReduce.py | bMap.py | sort -k1,1 | bReduce.py

The sort calls here are stand-ins for Hadoop‘s shuffle step.

Does your aMap.py break because it needed to see the input “file”’s name? Fake it by first setting that environment variable.

bash# mapreduce_map_input_file="bookA.txt"

But that does not test for more than one input file with different names. For this, we can do the pipeline in steps.

bash# mapreduce_map_input_file="bookA.txt"
bash# cat bookA.txt | aMap.py >! aBtwn.txt
bash# mapreduce_map_input_file="bookB.txt"
bash# cat bookB.txt | aMap.py >> aBtwn.txt
bash# mapreduce_map_input_file="bookB.txt"
bash# cat bookC.txt | aMap.py >> aBtwn.txt
bash# cat aBtwn.txt | sort -k1,1 | aReduce.py | bMap.py | sort -k1,1 | bReduce.py

You have done things so they need the input to the reducers to be sorted by both the key (primary) and the value (secondary)? E.g., then

bash# mapreduce_map_input_file="bookA.txt"
bash# cat bookA.txt | aMap.py | sort -k1,1 -k2,2 | aReduce.py | bMap.py | sort -k1,1 -k2,2 | bReduce.py
Deliverables

Input

Test your job on the twenty books (text documents) in Gutenberg/. Note that I removed the preamble and licence information from these twenty files from The Project Gutenberg (TPG), so as not to bias the tf-idf scores that we compute. Please fetch to the original book files at TPG for the proper text if you wish to distribute them.

Output

Your output is the output of your final mapper. Let “▭” denote the tab character. Your key-value pairs should be of the format word,doctf-idf-score, one key-value pair per line.

Submit

Use the “submit” command on a PRISM machine to turn in your program.

% submit 4415 freq freq/

For the call to the “submit” utility above, “4415” is the code name for the class, “freq” is the nickname of the assignment, and “freq/” is the directory containing the materials that you are turning in.

In your setup for your job, name the last hdfs output directory Out.

Include the following in your assignment directory freq/.

  • cover.txt
    A cover-page text file named cover.txt with your name and student number.

    Optionally, include any special observations. And include instructions, should your solution deviate from the instructions herein. (Note, however, any deviations that require instructions different than herein will lead to a reduction in marks, unless extremely well justified.)

  • mappers & reducers
    Your Python programs for your mappers and your reducers named in a clear, logical way.
  • job.sh
    Your bash job-invocation script for Hadoop.
  • output.txt
    The first 100 lines of one of the final output files (in Out/) produced by your job in the hdfs; say, from part-00000.

Enough should be included that I can painlessly run your job myself.

Due:

  • before midnight Friday 5 February
  • before midnight Monday 8 February

If you run into issues with running Docker, launching images into containers of any kind, launching the Hadoop container, or getting it to work as documented on your machine, don’t despair. Get your MapReduce job working in “command-line & pipe” mode as described above in debugging, and document briefly the issues you encountered — with Hadoop, launching containers of any kind, launching the Hadoop container, or getting it to work as documented — in your cover (cover.text), along with what type of computer you are using and the OS you are running. (This will help me debug container usage for the future.) No points off because you were not able to do the impossible.

So … don’t kill yourself trying to get Docker, etc., working on your box if you are running into issues. The assignment is not meant to assess you regarding that. That said, the assignment is meant to be also a way to get exposure to the Hadoop platform and using it. So do give it a good try. See below in the FAQ, which I will update periodically.

FAQ

Running Docker & Hadoop on your machine

MacOS

Apple is making things progressively harder for developers, both with bugs and with quite aggressive security measures.

My current primary box is a 2015 iMac running Catalina (10.15.7). I had issues with current versions of Docker. The latest version of Docker Desktop for Mac (Community) is 3.1.0Version 2.4.0.0 has been working for me. Docker changed the platform for Mac at some point to make better use of hyperkit for container virtualization. But this seems to fail in ways for older Mac boxes…

When you install Docker, you will need to grant it access to the host’s file system, if your OS is Catalina or later. To do this, in System Preferences, go to Security & Privacy, then to Full Disk Access, and add the Docker app to the list.

Forums also say not to have any volume mounts for Docker under the directory Documents. MacOS, Catalina and beyong, is especially protective of that directory.

Windows 10 Home edition

It does not seem that the Home edition of Windows is willing to offer you the full capabilities of your computer for virtualization. And the Docker Community Edition will not work on top of the Home edition. (Docker now employs Hyper-V, Microsoft’s built-in virtualization technology.)

You can try installing Docker Desktop on Windows Home from the Docker site. This uses the WSL 2 backend (Windows Subsystem for Linux), instead, and so this is a bit of a two-step process / workaround. This, in essence, runs your Docker in a Linux virtual machine behind the scenes.

To install / activate WSL 2 on your machine, see Windows Subsystem for Linux Installation Guide for Windows 10.

Some people have had success running a virtual Linux (say Ubuntu) on their machine, then installing Docker in the LinuxInception! But you could run into various port mapping and permissions issues. This way is somewhat fraught.

Windows 10 Pro / Enterprise / Education

Generally is smooth sailing. See Docker Desktop on Windows.

Linux

Linux was made for this kind of thing!

On old hardware

If your computer is old enough (pre 2012, say?), there may be no way. Virtualization needs hardware hooks to run.


Thanks to students for help, knowledge, and suggestions for the above!

Q. What is the separator character for key-value pairs in Hadoop “streaming”? If I want to have a “composite” key or value, what should I use as a separator for the components?

A. The key-value separator for Hadoop streaming is the tab character (“▭”), by default.

Hadoop has a way to set up something called composite keys that has a lot of functionality, but becomes a bit involved. For our purposes here, we can make a composite key ourselves by sticking several value strings together — say, a word and a document name — with a separator character in between. We should not choose tab as the separator, of course! We can pick a character that does not appear in any of the key values. (Also, for when the keys are sorted as input to a reducer, this separator should be a smaller value than any letter appearing in component values; that way, one is guaranteed a nested sort with respect to the key components.) For us for this assignment, comma (“,”) suffices.

For a separator between components in a composite value, one could use anything convenient, as long as it is guaranteed to not appear as a character within a value. (Also with the caveat that the separator should precede any characters used within the values’ strings, if somehow the value portion is to be sorted too.) It is common to use tab (“▭”). What appears before the first tab is considered the key. (However, there may be good reason not to use tab here, depending on how you implement.) But comma (“,”) suffices for our purposes here, too.

Q. Wait… how can we know the document name? We get the text on standard input. The document’s name is not part of that!

A. Good point! It is not part of the initial input, which is just the text of the documents coming on standard input. It is “metadata”. Since it is metadata that is common for MapReduce applications to need, Hadoop provides a mechanism to fetch it. Hadoop puts the document’s name in the environment variable mapreduce_map_input_file within the local environment of the relevant mapper task.

Python can read environment variables’ values via the os package. E.g.,

import os
    ⋮ 
fName = os.environ['mapreduce_map_input_file']

Q. I know that the input to a reducer task is sorted by key (ascending). Is the input also sorted by value within each key group?

A. No, Hadoop just sorts by key. Having the input further sorted by value is called a secondary sort. It is useful sometimes to have the input further sorted — the secondary sort — additionally by valueHadoop allows for this to be specified, however. (Note this adds negligible overhead; I am not certain why this is not a default.) This is specified via -D flags on the bin/hadoop call. E.g.,

bin/hadoop jar $STREAMINGJAR                            \
    -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
    -D stream.num.map.output.key.fields=2               \
    -D mapred.text.key.comparator.options="-k1,1 -k2,2" \
    -D mapred.text.key.partitioner.options="-k1,1"      \
    -files   $JOBDIR/wdMap.py,$JOBDIR/wdReduce.py       \
    -mapper  $JOBDIR/wdMap.py                           \
    -reducer $JOBDIR/wdReduce.py                        \
    -input   WC/'*'                                     \
    -output  WD

Q. We need to get for each word W its idf value, idf(W), and to associate that with each word,document key for word W. How do I do this without “joining”, which would violate the caching that I am told to avoid doing in §Strategy ¶Avoid?

A. Think about being a bit clever with your keys. The key of the output from a mapper is partitioned (“shuffled”) as input for its reducer. (The key that comes into the mapper and the key that goes out of the corresponding reducer need not be the same as this intermediate key!) You likely need to partition on just word at some point, to be able to count the number of documents in which it appears, whereas most of your keys will be word,document.

Q. I have installed Docker and the Hadoop image on my Windows machine. I can run an Hadoop container fine. However, I am getting strange behaviour and things failing when I try to run an Hadoop job. What could be wrong?

A. One thing could be the “curse of DOS”. Windows — inherited all the way back from DOS days — formats text files differently than everyone else on the planet (e.g., *-nix). The EOF (end-of-line) in Windows’ text files consists of two characters: cr lf (“carriage return”, “line feed”). Or, in C parlance, \r\n. In *-nix, an EOF is just \n. Unfortunately, DOS-format text files typically break *-nix programs, including bash and hadoop.

How can you check whether a text file is in DOS format (EOF=\r\n) or in “standard” format (EOF=\n)? Usually, one would use the file command that is bundled with shell. E.g.,

% file mytext.txt

This makes an educated guess what the file is (e.g., a shell script, a Python script, etc.). And says if the line terminators are CRLF (\r\n). Unforunately, the file utility in our Hadoop image does not show this clearly. Another way is to open the file with vi. E.g.,

bash# vi mytext.txt

At the bottom of the shell window, it will say “[dos]” if the file is DOS format. (To escape vi without having made changes, type “:q!” in the editor.)

How to convert a file to *-nix format? My favourite way — because it is nearly universal — is with Perl. And yes, Perl is installed within the container. E.g.,

bash# perl -pi -e 's/\r\n/\n/' mytext.txt

(If you ever wanted to convert the other way into DOS format, replace the -e flag’s value with 's/\n/\r\n/'.)

Any of your files — your shell scripts, python programs, or the text input files — being in DOS format will mess up Hadoop. Of course, feel free to edit your files under Windows. Just convert them within the container before use.

Q. I want to save into a file the Hadoop job runtime output. How can I?

A. I use tee for this. This way, the runtime output still spills to screen, which I find useful, and also goes into a file. E.g.,

bash# cd /usr/local/hadoop
bash# sh /home/freq/job.sh 2>&1 | tee /home/runLog

The “2>&1” directive is bash specific. It merges STDERR into STDOUT. This way, anything that goes onto STDERR from the Hadoop runtime output goes into the file /home/runLog too.

Q. Can we see the first 100 lines of your output to compare?

A. If you must: output-100.txt. (It is not that interesting in that words for the first lines are all numeric.)

Note that you might not get the same word / document pairs shown in your first 100 in the output file. That will depend on how the job was composed, and choices Hadoop makes on the shuffle, etc. Look to see that you are producing the correct tf-idf values.

Q. My Hadoop job is hanging after some percentage of map tasks completed and it never comes back! What is wrong?

A. I still am tracking this one down. However, I am able now to recreate this myself on occasion.

Some people have solved this by trimming the document names just to be the file names, not the whole hdfs path. One person was getting a clearer memory exceeded error. This also seems to fix the problem of the stall out. If you have trimmed the file names, that is fine.

The problem seems related to improper garbage collecting — or something along those lines — in the Hadoop installation. Killing your job (“^C”), shutting down the container and starting it up again seems to clear up the resources.

Q. Help! I get errors that the commands in my “job.sh” are not right.

A. Each command in the “job.sh” is a call to bin/hadoop to run a MapReduce task. Each is long due to the various flags and such. So in my examples, I have written each over several lines, backslashing (“\”) at the the end of each line to “hide” the newline from bash, so it treats it as if a one-line command.

Check the syntax to make certain you don’t have whitespace after any such backslash. Or put it all on one line (at the expense of readability).

Also ensure that after the “-files” flag is a single argument not broken by spaces. Or quote the argument. E.g.,

-files $JOBDIR/wdMap.py,$JOBDIR/wdReduce.py \

and not

-files $JOBDIR/wdMap.py, $JOBDIR/wdReduce.py \
What’s the frequency, Kenneth?

PG hanko