mahout源码分析之DistributedLanczosSolver(七)总结篇
Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit。
看svd算法官网上面使用的是亚马逊的云平台计算的,不过给出了svd算法的调用方式,当算出了eigenVectors后,应该怎么做呢?比如原始数据是600*60(600行,60列)的数据,计算得到的eigenVectors是24*60(其中的24是不大于rank的一个值),那么最后得到的结果应该是original_data乘以eigenVectors的转置这样就会得到一个600*24的矩阵,这样就达到了降维的目的。
本篇介绍一个可以直接使用svd工具类,可以在http://download.csdn.net/detail/fansy1990/6479451下载;
下载后一共有三个文件,其中一个是synthetic_control.data数据文件,一个svd.jar文件,一个crunch-0.5.0-incubating.jar文件(要放在云平台的lib下面);
运行方式:1)把crunch-0.5.0-incubating.jar放在hadoop 的/lib下面,然后重启集群;
2) 上传synthetic_control.data文件到HDFS;
3)运行svd.jar,参考下面的指令:
package mahout.fansy.svd;import java.io.IOException;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Map.Entry;import mahout.fansy.utils.read.ReadArbiKV;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.util.ToolRunner;import org.apache.mahout.common.AbstractJob;import org.apache.mahout.common.HadoopUtil;import org.apache.mahout.math.DenseVector;import org.apache.mahout.math.Vector;import org.apache.mahout.math.VectorWritable;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.google.common.collect.Lists;/** * Dimension Reduction<br> * * the last job to transform the input to the right one * @author fansy * */public class SvdReductionTranform extends AbstractJob {private final static String EIGENPATH="/eigenPath";private final static String VECTORCOLUMN="vectorColumn";private static final Logger log = LoggerFactory.getLogger(SvdReductionTranform.class);@Overridepublic int run(String[] args) throws Exception {addInputOption(); addOutputOption(); addOption("numCols", "nc", "Number of columns of the input matrix"); addOption("eigenPath","e","eigen vectors path"); if (parseArguments(args) == null) { return -1; } Path input=getInputPath(); Path output=getOutputPath(); String eigenPath=getOption("eigenPath"); String column=getOption("numCols"); Configuration conf=new Configuration(getConf() != null ? getConf() : new Configuration()); conf.set(EIGENPATH, eigenPath); try{ int col=Integer.parseInt(column); conf.setInt(VECTORCOLUMN, col); }catch(Exception e){ return -2; // format exception:-2 } log.info("delete file "+output); HadoopUtil.delete(conf, output); // delete output Job job=new Job(conf,"prepare svd vector from "+input.toUri()); job.setJarByClass(SvdReductionTranform.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(VectorWritable.class); SequenceFileInputFormat.addInputPath(job, input); SequenceFileOutputFormat.setOutputPath(job, output); job.setMapperClass(TransMapper.class); job.setNumReduceTasks(0); boolean succeeded = job.waitForCompletion(true); if (!succeeded) { throw new IllegalStateException("Job failed!"); }return 0;}public static class TransMapper extends Mapper<LongWritable,VectorWritable,NullWritable,VectorWritable>{List<Vector> list=Lists.newArrayList();int column;int transCol;@Overridepublic void setup(Context cxt) throws IOException{log.info("in the first row in setup()");column=cxt.getConfiguration().getInt(VECTORCOLUMN, -1);String eigenPath=cxt.getConfiguration().get(EIGENPATH);log.info("eigenPath:"+eigenPath);log.info("cxt.getConfiguration().get(\"mapred.job.tracker\")"+cxt.getConfiguration().get("mapred.job.tracker"));Map<Writable,Writable> eigenMap=null;try {eigenMap=ReadArbiKV.readFromFile(eigenPath,cxt.getConfiguration().get("mapred.job.tracker"));} catch (Exception e) {log.info("读取不到数据?");//e.printStackTrace();}Iterator<Entry<Writable, Writable>> eigenIter=eigenMap.entrySet().iterator();// initial eigen vectorswhile(eigenIter.hasNext()){Map.Entry<Writable, Writable> set=eigenIter.next();VectorWritable eigenV=(VectorWritable) set.getValue();if(eigenV.get().size()==column){list.add(eigenV.get());}}log.info("the last row in setup()"+list.size());transCol=list.size();} @Override public void map(LongWritable key,VectorWritable value,Context cxt) throws IOException, InterruptedException{ Vector transVector=new DenseVector(transCol); for(int i=0;i<transCol;i++){ double d=value.get().dot(list.get(i)); // dot multiply transVector.setQuick(i, d); } VectorWritable vector=new VectorWritable(transVector); cxt.write(NullWritable.get(), vector); }}public static void main(String[] args) throws Exception{ToolRunner.run(new Configuration(), new SvdReductionTranform(), args);}}
最后,总结一点就是rank的值,应该可以设置为小于原始数据列数且接近这个值,这样就应该会得到比较好的结果。因为最后得到的eigenVectors的行数就是最后降维的列数,而eigenVectors的行数是一个不大于rank的整型值,所以说rank的值要设置好才行;
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990