1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
| import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.util.Arrays;
// 求解共同好友 public class FridensDemo { public static class StepOneMapper extends Mapper<LongWritable, Text, Text, Text> { Text k = new Text(); Text v = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // <A B,C,D,F,E,O> String[] fields = value.toString().split(":"); String person = fields[0]; String[] friends = fields[1].split(","); for (String friend : friends) { k.set(friend); v.set(person); context.write(k, v); }
// 输出多个<firend,person> } }
public static class StepOneReducer extends Reducer<Text, Text, Text, Text> { Text v = new Text();
@Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 接收,就是一个friend,多个person StringBuilder sb = new StringBuilder(); for (Text value : values) { if (sb.length() != 0) { sb.append(","); } sb.append(value.toString()); } v.set(sb.toString()); // 人,人,人.. context.write(key, v); } }
public static class StepTwoMapper extends Mapper<LongWritable, Text, Text, Text> { Text k = new Text(); Text v = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // <好友 人,人,人,人...> String[] fields = value.toString().split("\t"); String friend = fields[0]; String[] persons = fields[1].split(",");
// 排序 Arrays.sort(persons);
// 输出<人-人,好友> for (int i = 0; i < persons.length - 2; i++) { for (int j = i + 1; j < persons.length - 1; j++) { k.set(persons[i] + "-" + persons[j]); v.set(friend); context.write(k, v); } } } }
public static class StepTwoReducer extends Reducer<Text, Text, Text, Text> { Text v = new Text();
@Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 接收多个<人-人,好友>对 StringBuilder sb = new StringBuilder(); for (Text value : values) { if (sb.length() != 0) { sb.append(","); } sb.append(value.toString()); } // 人,人,人.. v.set(sb.toString()); context.write(key, v); } }
public static void main(String[] args) throws IOException, InterruptedException { Configuration conf = new Configuration(); Job oneJob = Job.getInstance(conf);
oneJob.setJarByClass(FridensDemo.class);
oneJob.setMapperClass(StepOneMapper.class); oneJob.setMapOutputKeyClass(Text.class); oneJob.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(oneJob, new Path(args[0]));
oneJob.setReducerClass(StepOneReducer.class); oneJob.setOutputKeyClass(Text.class); oneJob.setOutputValueClass(Text.class); FileOutputFormat.setOutputPath(oneJob, new Path(args[1]));
Job twoJob = Job.getInstance(conf);
twoJob.setJarByClass(FridensDemo.class);
twoJob.setMapperClass(StepTwoMapper.class); twoJob.setMapOutputKeyClass(Text.class); twoJob.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(twoJob, new Path(args[1]));
twoJob.setReducerClass(StepTwoReducer.class); twoJob.setOutputKeyClass(Text.class); twoJob.setOutputValueClass(Text.class); FileOutputFormat.setOutputPath(twoJob, new Path(args[2]));
// 我觉得最最主要的技术点为,用到了job链 ControlledJob one = new ControlledJob(oneJob.getConfiguration()); ControlledJob two = new ControlledJob(twoJob.getConfiguration());
two.addDependingJob(one);
JobControl jc = new JobControl("friend"); jc.addJob(one); jc.addJob(two);
Thread th = new Thread(jc); th.start();
/*if (jc.allFinished()) { Thread.sleep(2000); jc.stop(); th.stop(); System.exit(0); }*/ } }
|