hama孰Q??peers乿
天DFS?amaeer?信?信??/p>
[轲賨http://blog.csdn.net/bhq2010/article/details/8741647]
件hdfs丨?http://blog.csdn.net/bhq2010/article/details/8740154
hama0.6.0?/p>
etup?aster taskeer
sp?趥趥繶?master
master亸中eerQ?
観庥֮spTask丰为丸迸毸?bsp使串spE使?夸bsp??
?/p>
import java.io.BufferedReader;import java.io.BufferedWriter;import java.io.File;import java.io.FileReader;import java.io.FileWriter;import java.io.IOException;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hama.HamaConfiguration;import org.apache.hama.bsp.BSP;import org.apache.hama.bsp.BSPJob;import org.apache.hama.bsp.BSPJobClient;import org.apache.hama.bsp.BSPPeer;import org.apache.hama.bsp.ClusterStatus;import org.apache.hama.bsp.FileOutputFormat;import org.apache.hama.bsp.NullInputFormat;import org.apache.hama.bsp.TextOutputFormat;import org.apache.hama.bsp.sync.SyncException;public class HamaTest{private static Path TMP_OUTPUT = new Path("/tmp/pi-"+ System.currentTimeMillis());public static class CommunicationTest extendsBSP<NullWritable, NullWritable, Text, Text, Text>{public static final Log LOG = LogFactory.getLog(CommunicationTest.class);private String masterTask;@Overridepublic void bsp(BSPPeer<NullWritable, NullWritable, Text, Text, Text> peer)throws IOException, SyncException, InterruptedException{File f = new File("/data/external_links_en.nt");if (f.exists()){int i = 0;FileReader fr = new FileReader("/data/external_links_en.nt");BufferedReader reader = new BufferedReader(fr);String line = null;while ((line = reader.readLine()) != null){i++;if (i > 661700){break;}peer.send(masterTask, new Text(line));}reader.close();}peer.sync();if (peer.getPeerName().equals(masterTask)){Text received;FileWriter fw = new FileWriter("/data/tmpres");BufferedWriter writer = new BufferedWriter(fw);while ((received = peer.getCurrentMessage()) != null){writer.write(received.toString() + "\n");}writer.close();}peer.sync();}@Overridepublic void setup(BSPPeer<NullWritable, NullWritable, Text, Text, Text> peer)throws IOException{// Choose one as a masterString[] allPeerNames = peer.getAllPeerNames();int port = 0;for (String peerName : allPeerNames){if (peerName.split(":")[0].equals("iir455-200")){if (port == 0|| Integer.parseInt(peerName.split(":")[1]) < port){port = Integer.parseInt(peerName.split(":")[1]);masterTask = peerName;}}}try{peer.sync();} catch (SyncException e){e.printStackTrace();} catch (InterruptedException e){e.printStackTrace();}}@Overridepublic void cleanup(BSPPeer<NullWritable, NullWritable, Text, Text, Text> peer)throws IOException{}}public static void main(String[] args) throws InterruptedException,IOException, ClassNotFoundException{HamaConfiguration conf = new HamaConfiguration();BSPJob bsp = new BSPJob(conf, HamaTest.class);bsp.setJobName("Connection Speed Test");bsp.setBspClass(CommunicationTest.class);bsp.setInputFormat(NullInputFormat.class);bsp.setOutputKeyClass(Text.class);bsp.setOutputValueClass(Text.class);bsp.setOutputFormat(TextOutputFormat.class);FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);BSPJobClient jobClient = new BSPJobClient(conf);ClusterStatus cluster = jobClient.getClusterStatus(true);if (args.length > 0){bsp.setNumBspTask(Integer.parseInt(args[0]));} else{bsp.setNumBspTask(cluster.getMaxTasks());}long startTime = System.currentTimeMillis();if (bsp.waitForCompletion(true)){System.out.println("Job Finished in "+ (System.currentTimeMillis() - startTime) / 1000.0+ " seconds");}}}??丰?61700大约70MB?10MB???ava.io.IOException: java.lang.OutOfMemoryError: Java heap space件?17.733?127.06?94.117?孳@大?买?a href="http://blog.csdn.net/bhq2010/article/details/8548070">http://blog.csdn.net/bhq2010/article/details/8548070认ama尶彾~學3GBama大??
?6170衶20?主?信?/p>?/h1>
Hamaeer乿??/p>
1410MBh?10d?0??MB/s?/p>
2常?GB丿Job追Hama颡觥档
?haman步?信䤧??/p>