本文使用多种计算引擎实现词频统计
MapReduce实现
编写MapReduce程序分成三部分:Mapper
、Reducer
、Driver
业务逻辑
MapTask
阶段处理每个数据分块的单词统计分析,每遇到一个单词,将其转换为一个k-v
对,如<hello, 1>
的形式,发送给ReduceTask
进行汇总ReduceTask
阶段接受MapTask
的结果,做汇总计数
Mapper接受的四个泛型
KEYIN
:输入的键的类型,在这里指的是每一行起始的偏移量VALUEIN
:输入的值的类型,在这里指的是一行的内容KEYOUT
:输出的键的类型,这里指的是单词,允许重复的VALUEOUT
:输出的值的类型
Reducer接受的四个泛型
KEYIN
:map输出的key,指的就是单词VALUEIN
:map输出的value,指的就是1KEYOUT
:输出的key的类型,这里指的就是单词,这里的key不可以重复VALUEOUT
:输出的value的类型,这里指的就是总的词频
Hadoop中自定义的序列化和反序列化的接口
Java中的序列化和反序列化接口Serializable,将类结构一并进行序列化和反序列化,过于臃肿
long——LongWritable
int——IntWritable
double——DoubleWritable
float——FloatWritable
null——NullWritable
string——Text
Map实现
|
|
Reduce实现
|
|
Driver实现
|
|
Scala实现
定义数据:array = Array("a b", "c c", "b c")
第一种方式实现
|
|
中间结果详解
array.map(_.split(" "))
输出:Array(Array("a","b"), Array("c","c"), Array("b","c"))
- 使用
flatMap(_.split(" "))
输出:Array("a","b", "c","c", "b","c")
- 再使用
map((_,1))
输出:Array((a,1), (b,1), (c,1), (c,1), (b,1), (c,1))
- 再使用
groupBy(_._1)
输出:(a,1),(b,1),(b,1),(c,1),(c,1),(c,1)
,即Map(b -> Array((b,1), (b,1)), a -> Array((a,1)), c -> Array((c,1), (c,1), (c,1)))
- 在进行计数:
array.flatMap(_.split(" ")).map((_,1)).groupBy(_._1).map( x => (x._1, x._2.length))
- 从大到小排序:
array.flatMap(_.split(" ")).map((_,1)).groupBy(_._1).map( x => (x._1, x._2.length)).toList.sortBy(_._2).reverse
其他方式实现
|
|
Spark-Shell实现
第一种方式实现
|
|
详解
sc
是SparkContext
对象,该对象是提交spark
程序的入口textFile("hdfs://myha/spark/wc/input/words.txt")
是从HDFS
中读取数据flatMap(_.split(" "))
先map
再压平map((_,1))
将单词和1构成元组reduceByKey(_+_)
按照key
进行reduce
,并将value
累加saveAsTextFile("hdfs://myha/spark/wc/output")
将结果写入到HDFS
中- 其中:
reduceByKey = groupByKey + reduce = groupBy + reduce = groupBy + map
其他方式实现
|
|