Hadoop学习总结:Map-Reduce的过程解析
?
一、客户端
Map-Reduce的过程首先是由客户端提交一个任务开始的。
提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:
public static RunningJob runJob(JobConf job) throws IOException {
? //首先生成一个JobClient对象
? JobClient jc = new JobClient(job);
? ……
? //调用submitJob来提交一个任务
? running = jc.submitJob(job);
? JobID jobId = running.getID();
? ……
? while (true) {
???? //while循环中不断得到此任务的状态,并打印到客户端console中
? }
? return running;
}
其中JobClient的submitJob函数实现如下:
public RunningJob submitJob(JobConf job) throws FileNotFoundException,
??????????????????????????????? InvalidJobConfException, IOException {
? //从JobTracker得到当前任务的id
? JobID jobId = jobSubmitClient.getNewJobId();
? //准备将任务运行所需要的要素写入HDFS:
? //任务运行程序所在的jar封装成job.jar
? //任务所要处理的input split信息写入job.split
? //任务运行的配置项汇总写入job.xml
? Path submitJobDir = new Path(getSystemDir(), jobId.toString());
? Path submitJarFile = new Path(submitJobDir, "job.jar");
? Path submitSplitFile = new Path(submitJobDir, "job.split");
? //此处将-libjars命令行指定的jar上传至HDFS
? configureCommandLineOptions(job, submitJobDir, submitJarFile);
? Path submitJobFile = new Path(submitJobDir, "job.xml");
? ……
? //通过input format的格式获得相应的input split,默认类型为FileSplit
? InputSplit[] splits =
??? job.getInputFormat().getSplits(job, job.getNumMapTasks());
?
? // 生成一个写入流,将input split得信息写入job.split文件
? FSDataOutputStream out = FileSystem.create(fs,
????? submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));
? try {
??? //写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个
input split的信息。
??? //对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于
FileSplit,写入文件名,此split 在文件中的起始位置),split的location信息(即在那个DataNode上)。
??? writeSplitsFile(splits, out);
? } finally {
??? out.close();
? }
? job.set("mapred.job.split.file", submitSplitFile.toString());
? //根据split的个数设定map task的个数
? job.setNumMapTasks(splits.length);
? // 写入job的配置信息入job.xml文件??????
? out = FileSystem.create(fs, submitJobFile,
????? new FsPermission(JOB_FILE_PERMISSION));
? try {
??? job.writeXml(out);
? } finally {
??? out.close();
? }
? //真正的调用JobTracker来提交任务
? JobStatus status = jobSubmitClient.submitJob(jobId);
? ……
}
?
二、JobTracker
JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:
- 调用静态函数startTracker(new JobConf())创建一个JobTracker对象调用JobTracker.offerService()函数提供服务
在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。
在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:
- JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化
EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。
在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:
synchronized (jobs) {
? synchronized (taskScheduler) {
??? jobs.put(job.getProfile().getJobID(), job);
??? //对JobTracker的每一个listener都调用jobAdded函数
??? for (JobInProgressListener listener : jobInProgressListeners) {
????? listener.jobAdded(job);
??? }
? }
}
?
EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:
?
public synchronized void initTasks() throws IOException {
? ……
? //从HDFS中读取job.split文件从而生成input splits
? String jobFile = profile.getJobFile();
? Path sysDir = new Path(this.jobtracker.getSystemDir());
? FileSystem fs = sysDir.getFileSystem(conf);
? DataInputStream splitFile =
??? fs.open(new Path(conf.get("mapred.job.split.file")));
? JobClient.RawSplit[] splits;
? try {
??? splits = JobClient.readSplitFile(splitFile);
? } finally {
??? splitFile.close();
? }
? //map task的个数就是input split的个数
? numMapTasks = splits.length;
? //为每个map tasks生成一个TaskInProgress来处理一个input split
? maps = new TaskInProgress[numMapTasks];
? for(int i=0; i < numMapTasks; ++i) {
??? inputLength += splits[i].getDataLength();
??? maps[i] = new TaskInProgress(jobId, jobFile,
???????????????????????????????? splits[i],
???????????????????????????????? jobtracker, conf, this, i);
? }
? //对于map task,将其放入nonRunningMapCache,是一个
Map<Node, List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input split所在
的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。
? if (numMapTasks > 0) {?
??? nonRunningMapCache = createCache(splits, maxLevel);
? }
?
? //创建reduce task
? this.reduces = new TaskInProgress[numReduceTasks];
? for (int i = 0; i < numReduceTasks; i++) {
??? reduces[i] = new TaskInProgress(jobId, jobFile,
??????????????????????????????????? numMapTasks, i,
??????????????????????????????????? jobtracker, conf, this);
??? //reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task
的时候使用。
??? nonRunningReduces.add(reduces[i]);
? }
?
? //创建两个cleanup task,一个用来清理map,一个用来清理reduce.
? cleanup = new TaskInProgress[2];
? cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],
????????? jobtracker, conf, this, numMapTasks);
? cleanup[0].setJobCleanupTask();
? cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
???????????????????? numReduceTasks, jobtracker, conf, this);
? cleanup[1].setJobCleanupTask();
? //创建两个初始化 task,一个初始化map,一个初始化reduce.
? setup = new TaskInProgress[2];
? setup[0] = new TaskInProgress(jobId, jobFile, splits[0],
????????? jobtracker, conf, this, numMapTasks + 1 );
? setup[0].setJobSetupTask();
? setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
???????????????????? numReduceTasks + 1, jobtracker, conf, this);
? setup[1].setJobSetupTask();
? tasksInited.set(true);//初始化完毕
? ……
}
?
三、TaskTracker
TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:
?
State offerService() throws Exception {
? long lastHeartbeat = 0;
? //TaskTracker进行是一直存在的
? while (running && !shuttingDown) {
????? ……
????? long now = System.currentTimeMillis();
????? //每隔一段时间就向JobTracker发送heartbeat
????? long waitTime = heartbeatInterval - (now - lastHeartbeat);
????? if (waitTime > 0) {
??????? synchronized(finishedCount) {
????????? if (finishedCount[0] == 0) {
??????????? finishedCount.wait(waitTime);
????????? }
????????? finishedCount[0] = 0;
??????? }
????? }
????? ……
????? //发送Heartbeat到JobTracker,得到response
????? HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
????? ……
???? //从Response中得到此TaskTracker需要做的事情
????? TaskTrackerAction[] actions = heartbeatResponse.getActions();
????? ……
????? if (actions != null){
??????? for(TaskTrackerAction action: actions) {
????????? if (action instanceof LaunchTaskAction) {
??????????? //如果是运行一个新的Task,则将Action添加到任务队列中
??????????? addToTaskQueue((LaunchTaskAction)action);
????????? } else if (action instanceof CommitTaskAction) {
??????????? CommitTaskAction commitAction = (CommitTaskAction)action;
??????????? if (!commitResponses.contains(commitAction.getTaskID())) {
????????????? commitResponses.add(commitAction.getTaskID());
??????????? }
????????? } else {
??????????? tasksToCleanup.put(action);
????????? }
??????? }
????? }
? }
? return State.NORMAL;
}
其中transmitHeartBeat主要逻辑如下:
?
private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
? //每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息
? boolean sendCounters;
? if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
??? sendCounters = true;
??? previousUpdate = now;
? }
? else {
??? sendCounters = false;
? }
? ……
? //报告给JobTracker,此TaskTracker的当前状态
? if (status == null) {
??? synchronized (this) {
????? status = new TaskTrackerStatus(taskTrackerName, localHostname,
???????????????????????????????????? httpPort,
???????????????????????????????????? cloneAndResetRunningTaskStatuses(
?????????????????????????????????????? sendCounters),
???????????????????????????????????? failures,
???????????????????????????????????? maxCurrentMapTasks,
???????????????????????????????????? maxCurrentReduceTasks);
??? }
? }
? ……
? //当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:
? //当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数
? //当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数
? boolean askForNewTask;
? long localMinSpaceStart;
? synchronized (this) {
??? askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||
???????????????????? status.countReduceTasks() < maxCurrentReduceTasks) &&
??????????????????? acceptNewTasks;
??? localMinSpaceStart = minSpaceStart;
? }
? ……
? //向JobTracker发送heartbeat,这是一个RPC调用
? HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
??????????????????????????????????????????????????????????? justStarted, askForNewTask,
??????????????????????????????????????????????????????????? heartbeatResponseId);
? ……
? return heartbeatResponse;
}
?
四、JobTracker
当 JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:
?
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
??????????????????????????????????????????????? boolean initialContact, boolean acceptNewTasks, short responseId)
? throws IOException {
? ……
? String trackerName = status.getTrackerName();
? ……
? short newResponseId = (short)(responseId + 1);
? ……
? HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
? List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
? //如果TaskTracker向JobTracker请求一个task运行
? if (acceptNewTasks) {
??? TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
??? if (taskTrackerStatus == null) {
????? LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
??? } else {
????? //setup和cleanup的task优先级最高
????? List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
????? if (tasks == null ) {
??????? //任务调度器分配任务
??????? tasks = taskScheduler.assignTasks(taskTrackerStatus);
????? }
????? if (tasks != null) {
??????? for (Task task : tasks) {
????????? //将任务放入actions列表,返回给TaskTracker
????????? expireLaunchingTasks.addNewTask(task.getTaskID());
????????? actions.add(new LaunchTaskAction(task));
??????? }
????? }
??? }
? }
? ……
? int nextInterval = getNextHeartbeatInterval();
? response.setHeartbeatInterval(nextInterval);
? response.setActions(
????????????????????? actions.toArray(new TaskTrackerAction[actions.size()]));
? ……
? return response;
}
默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:
?
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
??? throws IOException {
? ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
? int numTaskTrackers = clusterStatus.getTaskTrackers();
? Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();
? int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
? int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
? int numMaps = taskTracker.countMapTasks();
? int numReduces = taskTracker.countReduceTasks();
? //计算剩余的map和reduce的工作量:remaining
? int remainingReduceLoad = 0;
? int remainingMapLoad = 0;
? synchronized (jobQueue) {
??? for (JobInProgress job : jobQueue) {
????? if (job.getStatus().getRunState() == JobStatus.RUNNING) {
??????? int totalMapTasks = job.desiredMaps();
??????? int totalReduceTasks = job.desiredReduces();
??????? remainingMapLoad += (totalMapTasks - job.finishedMaps());
??????? remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
????? }
??? }
? }
? //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以
TaskTracker的个数。
? int maxMapLoad = 0;
? int maxReduceLoad = 0;
? if (numTaskTrackers > 0) {
??? maxMapLoad = Math.min(maxCurrentMapTasks,
????????????????????????? (int) Math.ceil((double) remainingMapLoad /
????????????????????????????????????????? numTaskTrackers));
??? maxReduceLoad = Math.min(maxCurrentReduceTasks,
???????????????????????????? (int) Math.ceil((double) remainingReduceLoad
???????????????????????????????????????????? / numTaskTrackers));
? }
? ……
?
? //map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map
task
? if (numMaps < maxMapLoad) {
??? int totalNeededMaps = 0;
??? synchronized (jobQueue) {
????? for (JobInProgress job : jobQueue) {
??????? if (job.getStatus().getRunState() != JobStatus.RUNNING) {
????????? continue;
??????? }
??????? Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
??????????? taskTrackerManager.getNumberOfUniqueHosts());
??????? if (t != null) {
????????? return Collections.singletonList(t);
??????? }
??????? ……
????? }
??? }
? }
? //分配完map task,再分配reduce task
? if (numReduces < maxReduceLoad) {
??? int totalNeededReduces = 0;
??? synchronized (jobQueue) {
????? for (JobInProgress job : jobQueue) {
??????? if (job.getStatus().getRunState() != JobStatus.RUNNING ||
??????????? job.numReduceTasks == 0) {
????????? continue;
??????? }
??????? Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
??????????? taskTrackerManager.getNumberOfUniqueHosts());
??????? if (t != null) {
????????? return Collections.singletonList(t);
??????? }
??????? ……
????? }
??? }
? }
? return null;
}
从 上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找 TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。
?
五、TaskTracker
在 向JobTracker发送heartbeat后,返回的reponse中有分配好的任务LaunchTaskAction,将其加入队列,调用 addToTaskQueue,如果是map task则放入mapLancher(类型为TaskLauncher),如果是reduce task则放入reduceLancher(类型为TaskLauncher):
private void addToTaskQueue(LaunchTaskAction action) {
? if (action.getTask().isMapTask()) {
??? mapLauncher.addToTaskQueue(action);
? } else {
??? reduceLauncher.addToTaskQueue(action);
? }
}
TaskLauncher 是一个线程,其run函数从上面放入的queue中取出一个TaskInProgress,然后调用 startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgress tip):
?
private void localizeJob(TaskInProgress tip) throws IOException {
? //首先要做的一件事情是有关Task的文件从HDFS拷贝的TaskTracker的本地文件系统中:job.split,
job.xml以及job.jar
? Path localJarFile = null;
? Task t = tip.getTask();
? JobID jobId = t.getJobID();
? Path jobFile = new Path(t.getJobFile());
? ……
? Path localJobFile = lDirAlloc.getLocalPathForWrite(
????????????????????????????????? getLocalJobDir(jobId.toString())
????????????????????????????????? + Path.SEPARATOR + "job.xml",
????????????????????????????????? jobFileSize, fConf);
? RunningJob rjob = addTaskToJob(jobId, tip);
? synchronized (rjob) {
??? if (!rjob.localized) {
????? FileSystem localFs = FileSystem.getLocal(fConf);
????? Path jobDir = localJobFile.getParent();
????? ……
????? //将job.split拷贝到本地
????? systemFS.copyToLocalFile(jobFile, localJobFile);
????? JobConf localJobConf = new JobConf(localJobFile);
????? Path workDir = lDirAlloc.getLocalPathForWrite(
?????????????????????? (getLocalJobDir(jobId.toString())
?????????????????????? + Path.SEPARATOR + "work"), fConf);
????? if (!localFs.mkdirs(workDir)) {
??????? throw new IOException("Mkdirs failed to create "
??????????????????? + workDir.toString());
????? }
????? System.setProperty("job.local.dir", workDir.toString());
????? localJobConf.set("job.local.dir", workDir.toString());
????? // copy Jar file to the local FS and unjar it.
????? String jarFile = localJobConf.getJar();
????? long jarFileSize = -1;
????? if (jarFile != null) {
??????? Path jarFilePath = new Path(jarFile);
??????? localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
?????????????????????????????????? getLocalJobDir(jobId.toString())
?????????????????????????????????? + Path.SEPARATOR + "jars",
?????????????????????????????????? 5 * jarFileSize, fConf), "job.jar");
??????? if (!localFs.mkdirs(localJarFile.getParent())) {
????????? throw new IOException("Mkdirs failed to create jars directory ");
??????? }
??????? //将job.jar拷贝到本地
??????? systemFS.copyToLocalFile(jarFilePath, localJarFile);
??????? localJobConf.setJar(localJarFile.toString());
?????? //将job得configuration写成job.xml
??????? OutputStream out = localFs.create(localJobFile);
??????? try {
????????? localJobConf.writeXml(out);
??????? } finally {
????????? out.close();
??????? }
??????? // 解压缩job.jar
??????? RunJar.unJar(new File(localJarFile.toString()),
???????????????????? new File(localJarFile.getParent().toString()));
????? }
????? rjob.localized = true;
????? rjob.jobConf = localJobConf;
??? }
? }
? //真正的启动此Task
? launchTaskForJob(tip, new JobConf(rjob.jobConf));
}
当所有的task运行所需要的资源都拷贝到本地后,则调用launchTaskForJob,其又调用TaskInProgress的launchTask函数:
public synchronized void launchTask() throws IOException {
??? ……
??? //创建task运行目录
??? localizeTask(task);
??? if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
????? this.taskStatus.setRunState(TaskStatus.State.RUNNING);
??? }
??? //创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建
的是ReduceTaskRunner
??? this.runner = task.createRunner(TaskTracker.this, this);
??? this.runner.start();
??? this.taskStatus.setStartTime(System.currentTimeMillis());
}
TaskRunner是一个线程,其run函数如下:
?
public final void run() {
??? ……
??? TaskAttemptID taskid = t.getTaskID();
??? LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
??? File jobCacheDir = null;
??? if (conf.getJar() != null) {
????? jobCacheDir = new File(
??????????????????????? new Path(conf.getJar()).getParent().toString());
??? }
??? File workDir = new File(lDirAlloc.getLocalPathToRead(
????????????????????????????? TaskTracker.getLocalTaskDir(
??????????????????????????????? t.getJobID().toString(),
??????????????????????????????? t.getTaskID().toString(),
??????????????????????????????? t.isTaskCleanupTask())
????????????????????????????? + Path.SEPARATOR + MRConstants.WORKDIR,
????????????????????????????? conf). toString());
??? FileSystem fileSystem;
??? Path localPath;
??? ……
??? //拼写classpath
??? String baseDir;
??? String sep = System.getProperty("path.separator");
??? StringBuffer classPath = new StringBuffer();
??? // start with same classpath as parent process
??? classPath.append(System.getProperty("java.class.path"));
??? classPath.append(sep);
??? if (!workDir.mkdirs()) {
????? if (!workDir.isDirectory()) {
??????? LOG.fatal("Mkdirs failed to create " + workDir.toString());
????? }
??? }
??? String jar = conf.getJar();
??? if (jar != null) {??????
????? // if jar exists, it into workDir
????? File[] libs = new File(jobCacheDir, "lib").listFiles();
????? if (libs != null) {
??????? for (int i = 0; i < libs.length; i++) {
????????? classPath.append(sep);??????????? // add libs from jar to classpath
????????? classPath.append(libs[i]);
??????? }
????? }
????? classPath.append(sep);
????? classPath.append(new File(jobCacheDir, "classes"));
????? classPath.append(sep);
????? classPath.append(jobCacheDir);
??? }
??? ……
??? classPath.append(sep);
??? classPath.append(workDir);
??? //拼写命令行java及其参数
??? Vector<String> vargs = new Vector<String>(8);
??? File jvm =
????? new File(new File(System.getProperty("java.home"), "bin"), "java");
??? vargs.add(jvm.toString());
??? String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
??? javaOpts = javaOpts.replace("@taskid@", taskid.toString());
??? String [] javaOptsSplit = javaOpts.split(" ");
??? String libraryPath = System.getProperty("java.library.path");
??? if (libraryPath == null) {
????? libraryPath = workDir.getAbsolutePath();
??? } else {
????? libraryPath += sep + workDir;
??? }
??? boolean hasUserLDPath = false;
??? for(int i=0; i<javaOptsSplit.length ;i++) {
????? if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
??????? javaOptsSplit[i] += sep + libraryPath;
??????? hasUserLDPath = true;
??????? break;
????? }
??? }
??? if(!hasUserLDPath) {
????? vargs.add("-Djava.library.path=" + libraryPath);
??? }
??? for (int i = 0; i < javaOptsSplit.length; i++) {
????? vargs.add(javaOptsSplit[i]);
??? }
??? //添加Child进程的临时文件夹
??? String tmp = conf.get("mapred.child.tmp", "./tmp");
??? Path tmpDir = new Path(tmp);
??? if (!tmpDir.isAbsolute()) {
????? tmpDir = new Path(workDir.toString(), tmp);
??? }
??? FileSystem localFs = FileSystem.getLocal(conf);
??? if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
????? throw new IOException("Mkdirs failed to create " + tmpDir.toString());
??? }
??? vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
??? // Add classpath.
??? vargs.add("-classpath");
??? vargs.add(classPath.toString());
??? //log文件夹
??? long logSize = TaskLog.getTaskLogLength(conf);
??? vargs.add("-Dhadoop.log.dir=" +
??????? new File(System.getProperty("hadoop.log.dir")
??????? ).getAbsolutePath());
??? vargs.add("-Dhadoop.root.logger=INFO,TLA");
??? vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
??? vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
??? // 运行map task和reduce task的子进程的main class是Child
??? vargs.add(Child.class.getName());? // main of Child
??? ……
??? //运行子进程
??? jvmManager.launchJvm(this,
??????? jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
??????????? workDir, env, pidFile, conf));
}
?
六、Child
真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:
?
while (true) {
? //从TaskTracker通过网络通信得到JvmTask对象
? JvmTask myTask = umbilical.getTask(jvmId);
? ……
? idleLoopCount = 0;
? task = myTask.getTask();
? taskid = task.getTaskID();
? isCleanup = task.isTaskCleanupTask();
? JobConf job = new JobConf(task.getJobFile());
? TaskRunner.setupWorkDir(job);
? numTasksToExecute = job.getNumTasksToExecutePerJvm();
? task.setConf(job);
? defaultConf.addResource(new Path(task.getJobFile()));
? ……
? //运行task
? task.run(job, umbilical);???????????? // run the task
? if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
??? break;
? }
}
6.1、MapTask如果task是MapTask,则其run函数如下:
?
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
? throws IOException {
? //用于同TaskTracker进行通信,汇报运行状况
? final Reporter reporter = getReporter(umbilical);
? startCommunicationThread(umbilical);
? initialize(job, reporter);
? ……
? //map task的输出
? int numReduceTasks = conf.getNumReduceTasks();
? MapOutputCollector collector = null;
? if (numReduceTasks > 0) {
??? collector = new MapOutputBuffer(umbilical, job, reporter);
? } else {
??? collector = new DirectMapOutputCollector(umbilical, job, reporter);
? }
? //读取input split,按照其中的信息,生成RecordReader来读取数据
instantiatedSplit = (InputSplit)
????? ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
? DataInputBuffer splitBuffer = new DataInputBuffer();
? splitBuffer.reset(split.getBytes(), 0, split.getLength());
? instantiatedSplit.readFields(splitBuffer);
? if (instantiatedSplit instanceof FileSplit) {
??? FileSplit fileSplit = (FileSplit) instantiatedSplit;
??? job.set("map.input.file", fileSplit.getPath().toString());
??? job.setLong("map.input.start", fileSplit.getStart());
??? job.setLong("map.input.length", fileSplit.getLength());
? }
? RecordReader rawIn =????????????????? // open input
??? job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
? RecordReader in = isSkipping() ?
????? new SkippingRecordReader(rawIn, getCounters(), umbilical) :
????? new TrackedRecordReader(rawIn, getCounters());
? job.setBoolean("mapred.skip.on", isSkipping());
? //对于map task,生成一个MapRunnable,默认是MapRunner
? MapRunnable runner =
??? ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
? try {
??? //MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行
处理。
??? runner.run(in, collector, reporter);?????
??? collector.flush();
? } finally {
??? in.close();?????????????????????????????? // close input
??? collector.close();
? }
? done(umbilical);
}
MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理:
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
??????????????? Reporter reporter)
? throws IOException {
? try {
??? K1 key = input.createKey();
??? V1 value = input.createValue();
??? while (input.next(key, value)) {
????? mapper.map(key, value, output, reporter);
????? if(incrProcCount) {
??????? reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
??????????? SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
????? }
??? }
? } finally {
??? mapper.close();
? }
}
结果集全部收集到MapOutputBuffer中,其collect函数如下:
?
public synchronized void collect(K key, V value)
??? throws IOException {
? reporter.progress();
? ……
? //从此处看,此buffer是一个ring的数据结构
? final int kvnext = (kvindex + 1) % kvoffsets.length;
? spillLock.lock();
? try {
??? boolean kvfull;
??? do {
????? //在ring中,如果下一个空闲位置接上起始位置的话,则表示满了
????? kvfull = kvnext == kvstart;
????? //在ring中计算是否需要将buffer写入硬盘的阈值
????? final boolean kvsoftlimit = ((kvnext > kvend)
????????? ? kvnext - kvend > softRecordLimit
????????? : kvend - kvnext <= kvoffsets.length - softRecordLimit);
????? //如果到达阈值,则开始将buffer写入硬盘,写成spill文件。
????? //startSpill主要是notify一个背后线程SpillThread的run()函数,开始调用sortAndSpill()开始排
序,合并,写入硬盘
????? if (kvstart == kvend && kvsoftlimit) {
??????? startSpill();
????? }
????? //如果buffer满了,则只能等待写入完毕
????? if (kvfull) {
????????? while (kvstart != kvend) {
??????????? reporter.progress();
??????????? spillDone.await();
????????? }
????? }
??? } while (kvfull);
? } finally {
??? spillLock.unlock();
? }
? try {
??? //如果buffer不满,则将key, value写入buffer
??? int keystart = bufindex;
??? keySerializer.serialize(key);
??? final int valstart = bufindex;
??? valSerializer.serialize(value);
??? int valend = bb.markRecord();
??? //调用设定的partitioner,根据key, value取得partition id
??? final int partition = partitioner.getPartition(key, value, partitions);
??? mapOutputRecordCounter.increment(1);
??? mapOutputByteCounter.increment(valend >= keystart
??????? ? valend - keystart
??????? : (bufvoid - keystart) + valend);
??? //将parition id以及key, value在buffer中的偏移量写入索引数组
??? int ind = kvindex * ACCTSIZE;
??? kvoffsets[kvindex] = ind;
??? kvindices[ind + PARTITION] = partition;
??? kvindices[ind + KEYSTART] = keystart;
??? kvindices[ind + VALSTART] = valstart;
??? kvindex = kvnext;
? } catch (MapBufferTooSmallException e) {
??? LOG.info("Record too large for in-memory buffer: " + e.getMessage());
??? spillSingleRecord(key, value);
??? mapOutputRecordCounter.increment(1);
??? return;
? }
}
内存buffer的格式如下:
(见几位hadoop大侠的分析http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx?以及http://caibinbupt.javaeye.com/)

kvoffsets是为了写入内存前排序使用的。
从上面可知,内存buffer写入硬盘spill文件的函数为sortAndSpill:
?
?
private void sortAndSpill() throws IOException {
? ……
? FSDataOutputStream out = null;
? FSDataOutputStream indexOut = null;
? IFileOutputStream indexChecksumOut = null;
? //创建硬盘上的spill文件
? Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
????????????????????????????????? numSpills, size);
? out = rfs.create(filename);
? ……
? final int endPosition = (kvend > kvstart)
??? ? kvend
??? : kvoffsets.length + kvend;
? //按照partition的顺序对buffer中的数据进行排序
? sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
? int spindex = kvstart;
? InMemValBytes value = new InMemValBytes();
? //依次一个一个parition的写入文件
? for (int i = 0; i < partitions; ++i) {
??? IFile.Writer<K, V> writer = null;
??? long segmentStart = out.getPos();
??? writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
??? //如果combiner为空,则直接写入文件
??? if (null == combinerClass) {
??????? ……
??????? writer.append(key, value);
??????? ++spindex;
???? }
???? else {
??????? ……
??????? //如果combiner不为空,则先combine,调用combiner.reduce(…)函数后再写入文件
??????? combineAndSpill(kvIter, combineInputCounter);
???? }
? }
? ……
}
当map阶段结束的时候,MapOutputBuffer的flush函数会被调用,其也会调用sortAndSpill将buffer中的写入文件,然后再调用mergeParts来合并写入在硬盘上的多个spill:
?
private void mergeParts() throws IOException {
??? ……
??? //对于每一个partition
??? for (int parts = 0; parts < partitions; parts++){
????? //create the segments to be merged
????? List<Segment<K, V>> segmentList =
??????? new ArrayList<Segment<K, V>>(numSpills);
????? TaskAttemptID mapId = getTaskID();
?????? //依次从各个spill文件中收集属于当前partition的段
????? for(int i = 0; i < numSpills; i++) {
??????? final IndexRecord indexRecord =
????????? getIndexInformation(mapId, i, parts);
??????? long segmentOffset = indexRecord.startOffset;
??????? long segmentLength = indexRecord.partLength;
??????? Segment<K, V> s =
????????? new Segment<K, V>(job, rfs, filename[i], segmentOffset,
??????????????????????????? segmentLength, codec, true);
??????? segmentList.add(i, s);
????? }
????? //将属于同一个partition的段merge到一起
????? RawKeyValueIterator kvIter =
??????? Merger.merge(job, rfs,
???????????????????? keyClass, valClass,
???????????????????? segmentList, job.getInt("io.sort.factor", 100),
???????????????????? new Path(getTaskID().toString()),
???????????????????? job.getOutputKeyComparator(), reporter);
????? //写入合并后的段到文件
????? long segmentStart = finalOut.getPos();
????? Writer<K, V> writer =
????????? new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
????? if (null == combinerClass || numSpills < minSpillsForCombine) {
??????? Merger.writeFile(kvIter, writer, reporter, job);
????? } else {
??????? combineCollector.setWriter(writer);
??????? combineAndSpill(kvIter, combineInputCounter);
????? }
????? ……
??? }
}
6.2、ReduceTaskReduceTask的run函数如下:
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
? throws IOException {
? job.setBoolean("mapred.skip.on", isSkipping());
? //对于reduce,则包含三个步骤:拷贝,排序,Reduce
? if (isMapOrReduce()) {
??? copyPhase = getProgress().addPhase("copy");
??? sortPhase? = getProgress().addPhase("sort");
??? reducePhase = getProgress().addPhase("reduce");
? }
? startCommunicationThread(umbilical);
? final Reporter reporter = getReporter(umbilical);
? initialize(job, reporter);
? //copy阶段,主要使用ReduceCopier的fetchOutputs函数获得map的输出。创建多个线程
MapOutputCopier,其中copyOutput进行拷贝。
? boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
? if (!isLocal) {
??? reduceCopier = new ReduceCopier(umbilical, job);
??? if (!reduceCopier.fetchOutputs()) {
??????? ……
??? }
? }
? copyPhase.complete();
? //sort阶段,将得到的map输出合并,直到文件数小于io.sort.factor时停止,返回一个Iterator用于
访问key-value
? setPhase(TaskStatus.Phase.SORT);
? statusUpdate(umbilical);
? final FileSystem rfs = FileSystem.getLocal(job).getRaw();
? RawKeyValueIterator rIter = isLocal
??? ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
??????? job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
??????? !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
??????? new Path(getTaskID().toString()), job.getOutputKeyComparator(),
??????? reporter)
??? : reduceCopier.createKVIterator(job, rfs, reporter);
? mapOutputFilesOnDisk.clear();
? sortPhase.complete();
? //reduce阶段
? setPhase(TaskStatus.Phase.REDUCE);
? ……
? Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
? Class keyClass = job.getMapOutputKeyClass();
? Class valClass = job.getMapOutputValueClass();
? ReduceValuesIterator values = isSkipping() ?
???? new SkippingReduceValuesIterator(rIter,
????????? job.getOutputValueGroupingComparator(), keyClass, valClass,
????????? job, reporter, umbilical) :
????? new ReduceValuesIterator(rIter,
????? job.getOutputValueGroupingComparator(), keyClass, valClass,
????? job, reporter);
? //逐个读出key-value list,然后调用Reducer的reduce函数
? while (values.more()) {
??? reduceInputKeyCounter.increment(1);
??? reducer.reduce(values.getKey(), values, collector, reporter);
??? values.nextKey();
??? values.informReduceProgress();
? }
? reducer.close();
? out.close(reporter);
? done(umbilical);
}
?
七、总结
Map-Reduce的过程总结如下图:
