读书人

Hadoop 中 DistributedCache 的施用

发布时间: 2012-08-11 20:50:31 作者: rapoo

Hadoop 中 DistributedCache 的应用
前一段时间在写一个MapReduce程序时,需要用到一些第三方的Jar包,于是运行程序时需要把这些Jar包一一上传到所有Hadoop节点。
当集群中增加一个节点或换一套集群环境时,又需要重新布置一遍这些Jar包,这样不仅费时,而且还容易遗漏、出错。并且,当我不再需要这些Jar包时,又得一个一个节点去清理。非常耗时。
后来查阅相关资料,发现Hadoop提供一个DistributedCache,用它可以把HDFS中的文件加载到DistributedCache中,当我们需要这些文件时,DistributedCache自动把这些文件下载到集群中节点的本地存储上(mapred.local.dir)。这样就不需要一一布置第三方的Jar包,并且Hadoop集群增加节点也不需要再上传了。
DistributedCache的使用方法,参考http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/filecache/DistributedCache.html。

// Setting up the cache for the application          1. Copy the requisite files to the FileSystem:          $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat       $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip       $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar     $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar     $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz     $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz          2. Setup the application's JobConf:          JobConf job = new JobConf();     DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),                                    job);     DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);     DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);     DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);     DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);     DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);          3. Use the cached files in the Mapper     or Reducer:          public static class MapClass extends MapReduceBase       implements Mapper<K, V, K, V> {            private Path[] localArchives;       private Path[] localFiles;              public void configure(JobConf job) {         // Get the cached archives/files         localArchives = DistributedCache.getLocalCacheArchives(job);         localFiles = DistributedCache.getLocalCacheFiles(job);       }              public void map(K key, V value,                        OutputCollector<K, V> output, Reporter reporter)        throws IOException {         // Use data from the cached archives/files here         // ...         // ...         output.collect(k, v);       }     }

DistributedCache 不光对第三方Jar有用,对于Read-Only的数据都可以,这样也扩展了我们编写MapReduce程序的思路。

读书人网 >开源软件

热点推荐