MapRecuce 中文分词+词频统计+排序

  • 词频统计和排序的思路&步骤:

    1. SpliterMapper实现分词

      分词可以借助其他分词工具,分词完成后,以key-value (word,1)形式wirte。

      意思是把分词结果的每个词和其出现的次数(1次)以key-value的形式生成中间结果。

    2. CounterReduce统计词频

      统计mapper的结果,遍历values(values是一个Iteraor),得到一个key的所有value,每个value的值都是1,把所有value相加,就得到了这个key的频数。

      到这里词频统计就完成了。

    3. 排序

      MR是以key为关键字排序的,而且默认升序,但是通常都需要降序的结果,这时只要继承Comparator 写一个类就可以了。

      /**
       * 自定义IntWritableDecreasingComparator继承自IntWritable.Comparator
       * 因为Hadoop默认对IntWritable按升序排序
       * 所以对compare取反,得到降序
       */
      private static class IntWritableDecreasingComparator extends IntWritable.Comparator {
          public int compare(WritableComparable a, WritableComparable b) {
              return -super.compare(a, b);
          }
          public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
              return -super.compare(b1, s1, l1, b2, s2, l2);
          }
      }
      

      因为MR是以key为关键字排序,所以排序job的mapper使用hadoop提供的InverseMapper类把key和value反转

      然后再job里面指定setSortComparatorClass为这个类。

      至此,排序完成。

  • 对排序结果的“美化”

    因为上面排序的时候把key-value反转了,排序的结果是词频在前,词在后。

    92    发展
    65    推进
    63    建设
    59    创新
    48    经济
    46    改革
    

    如果需要反过来…. 可以参考:hadoop MapReduce sort by value only

    但是,,我走了几个弯路后发现,只要给sortJob重新写个sortRecucer就轻松解决问题了!

    reduce sort结果的时候,把key和velue反写就ok。

    public static class SortReducer extends Reducer<IntWritable,Text,Text,IntWritable>{
        private Text result = new Text();
        public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            for(Text val : values){
                result.set(val);
                context.write(result, key);
            }
        }
    }
    

    参考:hadoop中文分词、词频统计及排序


附上源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package com.company;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;
import java.io.IOException;
import java.io.StringReader;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Random;
public class Main {
static final String HDFS_URL = "hdfs://192.168.43.235:9000";
static final String USER = "hadoop";
public static void main(String[] args) {
// spliterTest();
try {
HDFS hdfs = new HDFS();
hdfs.Init(HDFS_URL,USER);
// hdfs.UpLoad("D://2017政府工作报告全文.txt", "hdfs://192.168.43.235:9000/mapreduce/input/2017.txt");
Date start = new Date();
run("hdfs://192.168.43.235:9000/mapreduce/input","/mapreduce/output");
Date end = new Date();
float time = (float) ((end.getTime() - start.getTime()) / 60000.0);
System.out.println("任务耗时:" + String.valueOf(time) + " 分钟");
hdfs.Down("/mapreduce/output/part-r-00000","f://工作报告分词词频");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 分词器的测试
*/
public static void spliterTest(){
String[] temp = IKAnalyzerSpliter("\n" +
"  过去一年,我国发展面临国内外诸多矛盾叠加、风险隐患交汇的严峻挑战。在以习近平同志为核心的党中央坚强领导下,全国各族人民迎难而上,砥砺前行,推动经济社会持续健康发展。党的十八届六中全会正式明确习近平总书记的核心地位,体现了党和人民的根本利益,对保证党和国家兴旺发达、长治久安,具有十分重大而深远的意义。各地区、各部门不断增强政治意识、大局意识、核心意识、看齐意识,推动全面建成小康社会取得新的重要进展,全面深化改革迈出重大步伐,全面依法治国深入实施,全面从严治党纵深推进,全年经济社会发展主要目标任务圆满完成,“十三五”实现了良好开局。\n"); // 不能识别英文句号
for(String s:temp){
System.out.println(s);
}
}
/**
* IKAnalyzer分词器,返回String[]结果
* @param text
* @return
*/
public static String[] IKAnalyzerSpliter(String text){
ArrayList list = new ArrayList();
StringReader sr=new StringReader(text);
IKSegmenter ik=new IKSegmenter(sr, true);
Lexeme lex=null;
try {
while ((lex = ik.next()) != null) {
list.add(lex.getLexemeText());
}
}catch (IOException e){
e.printStackTrace();
}
return (String[])list.toArray(new String[list.size()]);
}
/**
* 清空缓存和输出文件夹,并运行Job
* @param inputPath
* @param outputPath
* @throws Exception
*/
public static void run(String inputPath, String outputPath) throws Exception {
String tempPath = "/mapreduce/count-temp-" + new Random().nextInt(1000); // 临时存储统计结果
HDFS hdfs = new HDFS();
hdfs.Init(HDFS_URL,USER);
if(counterJob(inputPath,tempPath)) {
sorterJob(tempPath,outputPath);
}
hdfs.Del(tempPath);
}
/**
* 运行分词和统计的任务
* @param inputPath
* @param outputPath
* @return
* @throws Exception
*/
public static boolean counterJob(String inputPath, String outputPath) throws Exception{
Job job = Job.getInstance(new Configuration());
job.setJarByClass(Main.class);
job.setMapperClass(SpliterMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(inputPath));
job.setReducerClass(CounterReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
checkPath(outputPath);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true);
}
/**
* 运行对统计结果排序的任务
* @param outputPath
* @param inputPath
* @throws Exception
*/
public static void sorterJob(String inputPath, String outputPath) throws Exception{
Job sortJob = Job.getInstance(new Configuration());
sortJob.setJarByClass(Main.class);
FileInputFormat.addInputPath(sortJob,new Path(inputPath));
sortJob.setInputFormatClass(SequenceFileInputFormat.class);
sortJob.setMapperClass(InverseMapper.class); //实现map()之后的数据对的key和value交换
sortJob.setNumReduceTasks(1); // reducer个数
sortJob.setReducerClass(SortReducer.class);
sortJob.setOutputKeyClass(IntWritable.class);
sortJob.setOutputValueClass(Text.class);
checkPath(outputPath);
FileOutputFormat.setOutputPath(sortJob,new Path(outputPath));
sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class); // 设置排序
sortJob.waitForCompletion(true);
}
/**
* 检查路径是否存在,如果存在就删除
* @param outputPath
* @throws Exception
*/
private static void checkPath(String outputPath) throws Exception{
FileSystem fs = FileSystem.get(new Configuration());
Path output = new Path(outputPath);
if(fs.exists(output)){
fs.delete(output,true);
}
}
public static class SpliterMapper extends Mapper<Object, Text, Text, IntWritable> {
public SpliterMapper(){}
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = IKAnalyzerSpliter(line);
for (String w : words) {
context.write(new Text(w), new IntWritable(1));
}
}
}
public static class CounterReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public CounterReducer(){}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int counter = 0;
for (IntWritable l : values) {
counter += l.get();
}
context.write(key, new IntWritable(counter));
}
}
public static class SortReducer extends Reducer<IntWritable,Text,Text,IntWritable>{
private Text result = new Text();
public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
for(Text val : values){
result.set(val);
context.write(result, key);
}
}
}
/**
* 自定义IntWritableDecreasingComparator继承自IntWritable.Comparator
* 因为Hadoop默认对IntWritable按升序排序
* 所以对compare取反,得到降序
*/
private static class IntWritableDecreasingComparator extends IntWritable.Comparator {
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
}
坚持原创技术分享,您的支持将鼓励我继续创作!