读书人

MapReduce兑现reduce端join多数据源

发布时间: 2013-03-26 09:54:34 作者: rapoo

MapReduce实现reduce端join,多数据源
public class EfcOrderProRangeOdJob extends Configured implements Tool {//TODO 路径private final static String INTPUT_A = "D:/order/order/";private final static String INTPUT_B = "D:/order/address/";private final static String OUTPUT = "D:/testAAAAA/";//private final static String OUTPUT = "/warehouse/tmp/pt_eft_order_pro_range/";private final static String OUTPUT_TABLE = "fct_pt_icr_trade_day";public static void main(String[] args) {try {int res = ToolRunner.run(new Configuration(), new EfcOrderProRangeOdJob(), args);System.exit(res);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] args) throws Exception {try {String start = "20130217"; //TODOConfiguration conf = ConfUtil.getConf(getConf());conf.set("start", start);Job job1 = Job.getInstance(conf, "pt_eft_order_pro_range_first");Path pathOrder = new Path(INTPUT_A);Path pathAddress = new Path(INTPUT_B);Path output = new Path(OUTPUT + start + "/");FileSystem fs = FileSystem.get(conf);if(fs.exists(output)){fs.delete(output,true);}job1.setMapOutputKeyClass(TextPair.class);job1.setMapOutputValueClass(Text.class);FileOutputFormat.setOutputPath(job1, output);MultipleInputs.addInputPath(job1, pathOrder, TextInputFormat.class, EfcOrderProRangeOrderMapper.class);MultipleInputs.addInputPath(job1, pathAddress, TextInputFormat.class, EfcOrderProRangeAddressMapper.class);job1.setReducerClass(EfcOrderProRangeReducer.class);job1.setJarByClass(EfcOrderProRangeOdJob.class);Job job2 = Job.getInstance(conf,"pt_eft_order_pro_range_second");FileInputFormat.setInputPaths(job2, output);job2.setMapperClass(EfcOrderProRangeSecondMapper.class);job2.setMapOutputKeyClass(Text.class);job2.setMapOutputValueClass(IntWritable.class);TableMapReduceUtil.initTableReducerJob(OUTPUT_TABLE, EfcOrderProRangeSecondReducer.class, job2);return JobChainHandler.handleJobChain(job1, job2, "pt_eft_order_pro_range");} catch (Exception e) {e.printStackTrace();return 0;}}public static class TextPair implements WritableComparable<TextPair> {private Text first;private Text second;public TextPair() {set(new Text(), new Text());}public TextPair(String first, String second) {set(new Text(first), new Text(second));}public TextPair(Text first, Text second) {set(first, second);}public void set(Text first, Text second) {this.first = first;this.second = second;}public Text getFirst() {return first;}public Text getSecond() {return second;}public void write(DataOutput out) throws IOException {first.write(out);second.write(out);}public void readFields(DataInput in) throws IOException {first.readFields(in);second.readFields(in);}public int compareTo(TextPair tp) {return first.compareTo(tp.first);}}}

?

mapper1类

public class EfcOrderProRangeOrderMapper extends Mapper<LongWritable, Text, TextPair, Text>{private static final int ORDER_ID_INDEX = 2;private static final int ORDER_STATUS_INDEX = 5;private static final String EFFECTIVE_STATUS = "3";private static final String COL_SPLITER = "\001";@Overridepublic void map(LongWritable key, Text value, Context context) {try {String [] order = value.toString().split(COL_SPLITER);String orderId = order[ORDER_ID_INDEX];String status = order[ORDER_STATUS_INDEX];if(!EFFECTIVE_STATUS.equals(status)){return;}TextPair textPair = new TextPair(new Text(orderId),new Text("order"));context.write(textPair, new Text(status));} catch (Exception e) {e.printStackTrace();}}}

?

mapper2类

public class EfcOrderProRangeAddressMapper extends Mapper<LongWritable, Text, TextPair, Text>{//TODO 通过hivemeta去取indexprivate static final int ORDER_ID_INDEX = 0;private static final int PROVINCE_ID_INDEX = 1;private static final String COL_SPLITER = "\001";@Overridepublic void map(LongWritable key, Text value, Context context) {try {String [] address = value.toString().split(COL_SPLITER);String orderId = address[ORDER_ID_INDEX];String province = address[PROVINCE_ID_INDEX];TextPair textPair = new TextPair(new Text(orderId),new Text("address"));context.write(textPair, new Text(province));} catch (Exception e) {e.printStackTrace();}}}

?

reducer端做join操作,通过TextPair中的second来获取来源,取得需要取得的维度。

public class EfcOrderProRangeReducer extends Reducer<TextPair,Text,Text,Text>{private static final String COL_SPLITER = "\001";@Overrideprotected void reduce(TextPair key, Iterable<Text> values, Context context) {try {Text tag = key.getSecond();Text orderId = key.getFirst();String status = null;String province = null;StringBuilder out = new StringBuilder();for (Text value : values) {if(tag.toString().equals("order")){status = value.toString();}if(tag.toString().equals("address")){province = value.toString();}}if (province != null && status != null){out.append(orderId.toString()).append(COL_SPLITER).append(status).append(COL_SPLITER).append(province);context.write(null, new Text(out.toString()));}} catch (Exception e) {e.printStackTrace();}}}

?

读书人网 >其他数据库

热点推荐