读书人

MapReduce实例-内连接

发布时间: 2013-10-21 17:02:52 作者: rapoo

MapReduce实例--内连接

输入文件:

Tom Lucy

Tom Jack

Jone Lucy

Jone jack

Lucy Marry

Lucy Ben

Jack Alice

Jack Jesse

Terry Alice

Terry Jesse

Philip Terry

Philip Alma

Mark Terry

Mark Alma


输出结果:

Tom Jesse

Tom Alice

Tom Ben

Tom Marry

Jone Ben

Jone Marry

Philip Jesse

Philip Alice

Mark Jesse

Mark Alice


要求:输入文件的左列是child,右列是parent.要求从这个文件分析得出输出文件grandchild和grandparent列


程序代码:

package join;


import java.io.IOException;

import java.text.DateFormat;

import java.text.SimpleDateFormat;

import java.util.Date;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.Reducer.Context;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;




public class STjoin extends Configured implements Tool {

/**

* 计数器

* 用于计数各种异常数据

*/

enum Counter

{

LINESKIP, //出错的行

}

/**

* MAP任务

*/

public static class Map extends Mapper<Object, Text,Text,Text >

{

public void map ( Object key, Text value, Context context ) throws IOException, InterruptedException

{

String line = value.toString(); //读取源数据

try

{

//数据处理

String [] lineSplit = line.split(" ");

String chldr= lineSplit[0];

String pare = lineSplit[1];

String rep1 = "1";

String rep2 = "2";

Text key1=new Text(chldr);

Text val1=new Text(rep1+pare);

Text key2=new Text(pare);

Text val2=new Text(rep2+chldr);

context.write( key1,val1);//输出1

context.write( key2,val2);//输出2

}

catch ( java.lang.ArrayIndexOutOfBoundsException e )

{

context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1

return;

}

}

}

/**

* REDUCE任务

*/

public static class Reduce extends Reducer<Text, Text, Text, Text>

{

public void reduce ( Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException

{

String valueString;

int anum=0;

int bnum=0;

String a[]=new String[10];

String b[]=new String[10];

for ( Text val : values )

{

valueString = val.toString();

char ide=valueString.charAt(0);

if(ide=='1'){

a[anum]=valueString;

anum++;

}

else{

b[bnum]=valueString;

bnum++;

}

}

for (int i=0;i<bnum;i++){

for(int j=0;j<anum;j++){

b[i]=b[i].replace("2", "");

a[j]=a[j].replace("1", "");

context.write( new Text(b[i]),new Text(a[j]) );

}

}

}

}

@Override

public int run(String[] args) throws Exception

{

Configuration conf = getConf();


Job job = new Job(conf, "STjoin"); //任务名

job.setJarByClass(STjoin.class); //指定Class

FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径

FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径

job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码

job.setReducerClass ( Reduce.class ); //调用上面Reduce类作为Reduce任务代码

job.setOutputFormatClass( TextOutputFormat.class );

job.setOutputKeyClass( Text.class ); //指定输出的KEY的格式

job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式

job.waitForCompletion(true);

//输出任务完成情况

System.out.println( "任务名称:" + job.getJobName() );

System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );

System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );

System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );

System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );


return job.isSuccessful() ? 0 : 1;

}

/**

* 设置系统说明

* 设置MapReduce任务

*/

public static void main(String[] args) throws Exception

{

//判断参数个数是否正确

//如果无参数运行则显示以作程序说明

if ( args.length != 2 )

{

System.err.println("");

System.err.println("Usage: STjoin < input path > < output path > ");

System.err.println("Example: hadoop jar ~/STjoin.jar hdfs://localhost:9000/home/james/test hdfs://localhost:9000/home/james/join_sort");

System.err.println("Counter:");

System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");

System.exit(-1);

}

//记录开始时间

DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );

Date start = new Date();

//运行任务

int res = ToolRunner.run(new Configuration(), new STjoin(), args);


//输出任务耗时

Date end = new Date();

float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ;

System.out.println( "任务开始:" + formatter.format(start) );

System.out.println( "任务结束:" + formatter.format(end) );

System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" );


System.exit(res);

}

}


读书人网 >云计算

热点推荐