Hadoop MapReduce 学习笔记(二) 序言和准备2
? ?本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1233714
? ?欢迎加入Hadoop超级群:?180941958
??????? 本博客已迁移到本人独立博客:http://www.yun5u.com/articles/hadoop-mapreduce-sql-2.html
???? 请先阅读:
?????????? 1.Hadoop MapReduce 学习笔记(一) 序言和准备
?
???? 下一篇:Hadoop MapReduce 学习笔记(三) MapReduce实现类似SQL的SELECT MAX(ID)
?
????? 然后是两个测试子类,主要区别在于生成不同的测试数据.我想有一个又浅入深的过程,比如我们一开始接触的MapReduce是WordCount,统计单个单词的个数.这里单词只是一列,相对数据库来说单词表只有一个单词字段.而实际中可能会有多列数据.如用户表:ID INT,USER_NAME VARCHAR(32),AGE INT.所以我引入了两个子类,从简单到复杂.
? ? ?1.类似上面的单词表测试类,只有一个字段.
?
package com.guoyun.hadoop.mapreduce.study;import java.io.File;import java.io.FileWriter;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 单列数据的mapreduce测试,类似表 * CREATE TABLE TABLE_NAME( * ID INT; * ) * 有不同的子类去实现不同的功能,如求最大最小值,排序等 */public class MyMapReduceSIngleColumnTest extends MyMapReduceTest{ public static final Logger log=LoggerFactory.getLogger(MyMapReduceSIngleColumnTest.class); public MyMapReduceSIngleColumnTest(long dataLength, String inputPath, String outputPath) throws Exception { super(dataLength, inputPath, outputPath); // TODO Auto-generated constructor stub } public MyMapReduceSIngleColumnTest(long dataLength) throws Exception { super(dataLength); // TODO Auto-generated constructor stub } public MyMapReduceSIngleColumnTest(String inputPath, String outputPath) { super(inputPath, outputPath); // TODO Auto-generated constructor stub } public MyMapReduceSIngleColumnTest(String outputPath) { super(outputPath); // TODO Auto-generated constructor stub } protected void generateDatas(long length) throws Exception{ FileWriter fw=null; File file=null; long generateValue=0; file=new File(inputPath); if(!file.getParentFile().exists()){ if(!file.getParentFile().mkdirs()){ throw new Exception("generate datas error,can not create dir:"+file.getParentFile().getAbsolutePath()); } } try { fw=new FileWriter(file); for(int i=0;i<length;i++){ generateValue=(long)(Math.random()*length)+1; if(generateValue>this.maxValue){ this.maxValue=generateValue; }else if(generateValue<this.minValue){ this.minValue=generateValue; } fw.write(generateValue+NEW_LINE); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ if(fw!=null){ fw.flush(); fw.close(); } } }}?
? ?2.类似上面的用户表,有多列数据,但我这里生成的只是两列,你可以下载自己做修改
package com.guoyun.hadoop.mapreduce.study;import java.io.DataInput;import java.io.DataOutput;import java.io.File;import java.io.FileWriter;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.Mapper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 针对一行有多列数据的MapReduce test * 类似Table: * CREATE TABLE TABLE_NAME( * ID INT, * NAME VARCHAR(32), * ... * ) * 由不同的子类实现不同的功能,如求ID的最大最小值,对ID排序等 */public class MyMapReduceMultiColumnTest extends MyMapReduceTest { public static final Logger log=LoggerFactory.getLogger(MyMapReduceTest.class); public static final String DEFAULT_INPUT_PATH="testDatas/mapreduce/MRInput_MultiColumn"; public static final String DEFAULT_OUTPUT_PATH="testDatas/mapreduce/MRInput_MultiColumn"; public static final String SPLIT_TAB="\t"; private static final List<String> frameworkNames=new ArrayList<String>(); static{ frameworkNames.add("Hadoop"); frameworkNames.add("Hbase"); frameworkNames.add("Pig"); frameworkNames.add("Zookeeper"); frameworkNames.add("Chuwka"); frameworkNames.add("Avro"); frameworkNames.add("Sqoop"); frameworkNames.add("Cassandra"); frameworkNames.add("Hive"); frameworkNames.add("Mahout"); frameworkNames.add("Nutch"); frameworkNames.add("Lucene"); frameworkNames.add("Solr"); frameworkNames.add("Heritrix"); frameworkNames.add("Netty"); frameworkNames.add("Tomcat"); frameworkNames.add("Thrift"); frameworkNames.add("Ant"); frameworkNames.add("Log4j"); frameworkNames.add("CouchDB"); frameworkNames.add("Maven"); frameworkNames.add("Mina"); frameworkNames.add("OpenJPA"); frameworkNames.add("POI"); frameworkNames.add("Struts"); frameworkNames.add("Spring"); frameworkNames.add("Subversion"); frameworkNames.add("Tika"); } public MyMapReduceMultiColumnTest(long dataLength) throws Exception { super(dataLength); // TODO Auto-generated constructor stub } public MyMapReduceMultiColumnTest(String outputPath) throws Exception { super(outputPath); // TODO Auto-generated constructor stub } public MyMapReduceMultiColumnTest(String inputPath, String outputPath) { super(inputPath, outputPath); } public MyMapReduceMultiColumnTest(long dataLength, String inputPath, String outputPath) throws Exception { super(dataLength, inputPath, outputPath); } @Override protected void generateDatas(long length) throws Exception { FileWriter fw=null; File file=null; long generateValue=0; file=new File(inputPath); if(!file.getParentFile().exists()){ if(!file.getParentFile().mkdirs()){ throw new Exception("generate datas error,can not create dir:"+file.getParentFile().getAbsolutePath()); } } try { fw=new FileWriter(file); for(int i=0;i<length;i++){ generateValue=(long)(Math.random()*length)+1; if(generateValue>this.maxValue){ this.maxValue=generateValue; }else if(generateValue<this.minValue){ this.minValue=generateValue; } fw.write(this.generateFrameWork()+SPLIT_TAB+generateValue+NEW_LINE); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ if(fw!=null){ fw.flush(); fw.close(); } } } private String generateFrameWork(){ int index=(int)(Math.random()*frameworkNames.size()); return frameworkNames.get(index); } public static class MultiColumnWritable implements WritableComparable{ private String frameworkName=""; private long number=-1; public String getFrameworkName() { return frameworkName; } public void setFrameworkName(String frameworkName) { this.frameworkName = frameworkName; } public long getNumber() { return number; } public void setNumber(long number) { this.number = number; } public MultiColumnWritable() { super(); } public MultiColumnWritable(String frameworkName, long number) { super(); this.frameworkName = frameworkName; this.number = number; } @Override public int compareTo(Object obj) { int result=-1; if(obj instanceof MultiColumnWritable){ MultiColumnWritable mcw=(MultiColumnWritable)obj; if(mcw.getNumber()<this.getNumber()){ result =1; }else if(mcw.getNumber()==this.getNumber()){ result=0; } } return result; } @Override public void readFields(DataInput in) throws IOException { frameworkName=in.readUTF(); number=in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(frameworkName); out.writeLong(number); } @Override public String toString() { return frameworkName+"\t"+number; } public static MultiColumnWritable copy(MultiColumnWritable obj){ return new MultiColumnWritable(obj.getFrameworkName(),obj.getNumber()); } } /** * Map,to get the source datas */ protected static class MultiSupMapper extends Mapper<LongWritable,Text,Text,MultiColumnWritable>{ private final Text writeKey=new Text("K"); private MultiColumnWritable writeValue=new MultiColumnWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { log.debug("begin to map"); String[] split=null; try { split=value.toString().split("\\t"); if(split!=null&&split.length==2){ writeValue.setFrameworkName(split[0].trim()); writeValue.setNumber(Long.parseLong(split[1].trim())); } } catch (NumberFormatException e) { log.error("map error:"+e.getMessage()); } context.write(writeKey, writeValue); } } public static void main(String[] args) throws Exception{ MyMapReduceTest test=new MyMapReduceMultiColumnTest(1000); }}