字數統計程式(在 Java Python 中)

字數統計程式就像 MapReduce 中的 Hello World 程式。

Hadoop MapReduce 是一個軟體框架,用於輕鬆編寫應用程式,以可靠,容錯的方式在大型叢集(數千個節點)的商用硬體上並行處理大量資料(多 TB 資料集)。

MapReduce 作業通常將輸入資料集拆分為獨立的塊,這些塊由 map 任務以完全並行的方式處理。框架對地圖的輸出進行排序,然後輸入到 reduce 任務。通常,作業的輸入和輸出都儲存在檔案系統中。該框架負責排程任務,監視它們並重新執行失敗的任務。

字數計數示例:

WordCount 示例讀取文字檔案並計算單詞出現的頻率。輸入是文字檔案,輸出是文字檔案,每行包含一個單詞以及由標籤分隔的單詞次數。

每個對映器都將一行作為輸入並將其分解為單詞。然後它發出一個單詞的鍵/值對,每個 reducer 對每個單詞的計數求和,並用單詞和 sum 發出單個鍵/值。

作為優化,減速器還用作地圖輸出上的組合器。這通過將每個單片語合成單個記錄來減少通過網路傳送的資料量。

字數程式碼:

package org.myorg;
        
import java.io.IOException;
import java.util.*;
        
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
        
public class WordCount {
        
 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
        
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
 } 
        
 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
 }
        
 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
        
        Job job = new Job(conf, "wordcount");
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
        
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
        
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
        
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
    job.waitForCompletion(true);
 }
        
}

要執行該示例,命令語法為:

bin/hadoop jar hadoop-*-examples.jar wordcount [-m <#maps>] [-r <#reducers>] <in-dir> <out-dir>

讀取輸入目錄中的所有檔案(在上面的命令列中稱為 in-dir),並將輸入中的單詞計數寫入輸出目錄(上面稱為 out-dir)。假設輸入和輸出都儲存在 HDFS 中。如果你的輸入不在 HDFS 中,而是在某個本地檔案系統中,則需要使用如下命令將資料複製到 HDFS:

bin/hadoop dfs -mkdir <hdfs-dir> //not required in hadoop 0.17.2 and later
bin/hadoop dfs -copyFromLocal <local-dir> <hdfs-dir>

Python 中的字數計數示例:

mapper.py

import sys 
for line in sys.stdin: 
    # remove leading and trailing whitespace 
    line = line.strip() 
    # split the line into words 
    words = line.split() 
    # increase counters 
    for word in words: 
        print '%s\t%s' % (word, 1)

reducer.py

import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
    # remove leading and trailing whitespaces
    line = line.strip()
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

上述程式可以使用 cat filename.txt | python mapper.py | sort -k1,1 | python reducer.py 執行