MapReduce多目录输出笔记

使用MultipleOutputs实现多目录/文件输出

org.apache.hadoop.mapreduce.lib.output.MultipleOutputs

在map或者reduce类中加入如下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private MultipleOutputs<Text, NullWritable> mos;
@Override
protected void setup(Reducer<Text, NullWritable, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.setup(context);
mos = new MultipleOutputs<Text, NullWritable>(context);// 初始化mos
}
@Override
protected void cleanup(Reducer<Text, NullWritable, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.cleanup(context);
mos.close();
}

在需要输出数据的地方,可以使用定义好的 mos 进行输出

1
2
3
mos.write("outputName", key, value);
mos.write("outputName", key, value, "filePrefix");
mos.write("outputName", key, value, "path/filePrefix");//到文件夹
1
2
3
4
MultipleOutputs.addNamedOutput(job, "outputXName",
XXXOutputFormat.class, OutputXKey.class, OutputXValue.class);
MultipleOutputs.addNamedOutput(job, "outputYName",
YYYOutputFormat.class, OutputYKey.class, OutputYValue.class);

取消类似part-r-00000的空文件
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class)

例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

public class DataCleanIconAndWeb {
public static class QLMapper extends
Mapper<LongWritable, Text, Text, NullWritable> {
private String webGame = "网页游戏";
Text outputValue = new Text();
// 设置多文件输出
private MultipleOutputs<Text,NullWritable> mos;
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.setup(context);
mos = new MultipleOutputs<Text, NullWritable>(context);// 初始化mos
}
@Override
protected void cleanup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.cleanup(context);
mos.close();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 接收数据v1
String line = value.toString();
// 切分数据
String[] words = line.split(" ");
// String[] words = line.split("\t");
boolean isWeb = false;
boolean flag = true;

//一系列处理代码
//***
//***
//***
String action = words[1] + "\t" + words[0] + "\t" + words[2]
+ "\t" + words[3] + "\t" + words[5];
outputValue.set(action);
mos.write("iconRecord", outputValue, NullWritable.get(),"iconRecord/icon");



String action = words[1] + "\t" + words[0] + "\t"
+ words[2] + "\t" + words[3] + "\t" + words[4]
+ "\t" + words[5];
outputValue.set(action);
mos.write( "webRecord",outputValue, NullWritable.get(),"webRecord/web");


}
}

public static void run(String originalDataPath, String dataCleanOutputFile)
throws Exception {
// 构建Job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 注意:main方法所在的类
job.setJarByClass(DataCleanIconAndWeb.class);
job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", false);
job.getConfiguration().setStrings(
"mapreduce.reduce.shuffle.input.buffer.percent", "0.1");
job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", false);
job.setNumReduceTasks(3);
// 设置Mapper相关属性
job.setMapperClass(QLMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job, new Path(originalDataPath));

// 设置Reducer相关属性

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path(dataCleanOutputFile));

MultipleOutputs.addNamedOutput(job, "iconRecord",
TextOutputFormat.class, Text.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, "webRecord",
TextOutputFormat.class, Text.class, NullWritable.class);

// 文件格式
job.setInputFormatClass(TextInputFormat.class);
//取消part-r-00000新式文件输出
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);


//job.setOutputFormatClass(TextOutputFormat.class);
// 提交任务
job.waitForCompletion(true);
long endTime = System.currentTimeMillis();
}

}

评论