本文使用多种计算引擎实现词频统计
MapReduce实现
编写MapReduce程序分成三部分:Mapper、Reducer、Driver
业务逻辑
MapTask阶段处理每个数据分块的单词统计分析,每遇到一个单词,将其转换为一个k-v对,如<hello, 1>的形式,发送给ReduceTask进行汇总ReduceTask阶段接受MapTask的结果,做汇总计数
Mapper接受的四个泛型
KEYIN:输入的键的类型,在这里指的是每一行起始的偏移量VALUEIN:输入的值的类型,在这里指的是一行的内容KEYOUT:输出的键的类型,这里指的是单词,允许重复的VALUEOUT:输出的值的类型
Reducer接受的四个泛型
KEYIN:map输出的key,指的就是单词VALUEIN:map输出的value,指的就是1KEYOUT:输出的key的类型,这里指的就是单词,这里的key不可以重复VALUEOUT:输出的value的类型,这里指的就是总的词频
Hadoop中自定义的序列化和反序列化的接口
Java中的序列化和反序列化接口Serializable,将类结构一并进行序列化和反序列化,过于臃肿
long——LongWritableint——IntWritabledouble——DoubleWritablefloat——FloatWritablenull——NullWritablestring——Text
Map实现
|
|
Reduce实现
|
|
Driver实现
|
|
Scala实现
定义数据:array = Array("a b", "c c", "b c")
第一种方式实现
|
|
中间结果详解
array.map(_.split(" "))输出:Array(Array("a","b"), Array("c","c"), Array("b","c"))- 使用
flatMap(_.split(" "))输出:Array("a","b", "c","c", "b","c") - 再使用
map((_,1))输出:Array((a,1), (b,1), (c,1), (c,1), (b,1), (c,1)) - 再使用
groupBy(_._1)输出:(a,1),(b,1),(b,1),(c,1),(c,1),(c,1),即Map(b -> Array((b,1), (b,1)), a -> Array((a,1)), c -> Array((c,1), (c,1), (c,1))) - 在进行计数:
array.flatMap(_.split(" ")).map((_,1)).groupBy(_._1).map( x => (x._1, x._2.length)) - 从大到小排序:
array.flatMap(_.split(" ")).map((_,1)).groupBy(_._1).map( x => (x._1, x._2.length)).toList.sortBy(_._2).reverse
其他方式实现
|
|
Spark-Shell实现
第一种方式实现
|
|
详解
sc是SparkContext对象,该对象是提交spark程序的入口textFile("hdfs://myha/spark/wc/input/words.txt")是从HDFS中读取数据flatMap(_.split(" "))先map再压平map((_,1))将单词和1构成元组reduceByKey(_+_)按照key进行reduce,并将value累加saveAsTextFile("hdfs://myha/spark/wc/output")将结果写入到HDFS中- 其中:
reduceByKey = groupByKey + reduce = groupBy + reduce = groupBy + map
其他方式实现
|
|