手记

MapReduce实现wordcount

Mapper程序

import  sysdef read_input(file):
    for line in file:        yield  line.split()def main():
    data = read_input(sys.stdin)    for words in data:        for word in words:
            print("%s%s%d" %(word,'\t',1))if __name__ == "__main__":
    main()

分割单词,以一下形式输出

a   1
b   1
c   1
a   1

reducer程序,统计词语频率

import sysfrom operator import itemgetterfrom  itertools import groupbydef read_mapper_output(file,separator='\t'):
    for line in file:        yield  line.rstrip().split(separator,1)def main():
    data =read_mapper_output(sys.stdin)    for current_word,group in groupby(data,itemgetter(0)):
        total_count = sum(int(count) for current_word,count in group)
        print("%s %s %d" %(current_word,'\t',total_count))if __name__ =='__main__':
    main()

本地运行测试,命令行输入

echo "a b c d e"|python MapTest.py|python ReduceTest

确认无误后提交到集群上执行,输入命令

/usr/local/hadoop/hadoop-2.8.3/bin/hadoop 
jar 
/usr/local/hadoop/hadoop-2.8.3/share/hadoop/tools/lib/hadoop-streaming-2.8.3.jar  
-files "/home/tobin/PycharmProjects/untitled/MapTest.py,/home/tobin/PycharmProjects/untitled/ReduceTest.py" -input /LICENSE 
-output /tmp/wordcounttest 
-mapper "python MapTest.py" -reducer "python ReduceTest.py"

-files :将map和reduce程序(这里最好使用绝对路径,不然可能出错)提交到集群中,-input-output指定的输入输出文件都在hdfs中,-reducer指定reduce程序,-mapper指定map程序

/tmp/wordcounttest文件夹下有两个文件,一个是输出文件,另一个是状态信息

结果类似下面:

own      4
owner    4
owner.   1
ownership    2
page"    1
part     4
patent   5
patent,      1
percent      1
perform,     1
permission   1
permissions      3
perpetual,   2
pertain      2
places:      1
possibility      1
power,   1
preferred    1
prepare      1
product      1
prominent    1
provide      1
provided     5
provides     2
publicly     2
purpose      2
purposes     4
readable     1
reason   1
reasonable   1
received     1



作者:dpengwang
链接:https://www.jianshu.com/p/8cf3c207470e


0人推荐
随时随地看视频
慕课网APP