MapReduce运行流程源码分析(二)
这篇博客是接着昨天分析MapReduce的流程继续进行分析的:
4.JobTracker接收Heartbeat并向TaskTracker分配任务
上一步中TaskTracker调用transmitHeartBeat方法发送Heartbeat给JobTracker,当JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean ,initialContact, booleanacceptNewTasks, short responseId)函数被调用。
(1)我们看一下JobTracker类的heartbeat方法
?
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Got heartbeat from: " + status.getTrackerName() + " (restarted: " + restarted + " initialContact: " + initialContact + " acceptNewTasks: " + acceptNewTasks + ")" + " with responseId: " + responseId); } // Make sure heartbeat is from a tasktracker allowed by the jobtracker. if (!acceptTaskTracker(status)) { throw new DisallowedTaskTrackerException(status); } .... //初始化一个HeartbeatResponse对象 HeartbeatResponse prevHeartbeatResponse = trackerToHeartbeatResponseMap.get(trackerName); if (initialContact != true) { // If this isn't the 'initial contact' from the tasktracker, // there is something seriously wrong if the JobTracker has // no record of the 'previous heartbeat'; if so, ask the // tasktracker to re-initialize itself. if (prevHeartbeatResponse == null) { // This is the first heartbeat from the old tracker to the newly // started JobTracker // Jobtracker might have restarted but no recovery is needed // otherwise this code should not be reached LOG.warn("Serious problem, cannot find record of 'previous' " + "heartbeat for '" + trackerName + "'; reinitializing the tasktracker"); return new HeartbeatResponse(responseId, new TaskTrackerAction[] {new ReinitTrackerAction()}); } else { // It is completely safe to not process a 'duplicate' heartbeat from a // {@link TaskTracker} since it resends the heartbeat when rpcs are // lost see {@link TaskTracker.transmitHeartbeat()}; // acknowledge it by re-sending the previous response to let the // {@link TaskTracker} go forward. if (prevHeartbeatResponse.getResponseId() != responseId) { LOG.info("Ignoring 'duplicate' heartbeat from '" + trackerName + "'; resending the previous 'lost' response"); return prevHeartbeatResponse; } } } // Process this heartbeat short newResponseId = (short)(responseId + 1); status.setLastSeen(now); if (!processHeartbeat(status, initialContact)) { if (prevHeartbeatResponse != null) { trackerToHeartbeatResponseMap.remove(trackerName); } return new HeartbeatResponse(newResponseId, new TaskTrackerAction[] {new ReinitTrackerAction()}); } // Initialize the response to be sent for the heartbeat HeartbeatResponse response = new HeartbeatResponse(newResponseId, null); List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>(); isBlacklisted = faultyTrackers.isBlacklisted(status.getHost()); // Check for new tasks to be executed on the tasktracker //如果TaskTracker向JobTracker请求一个task运行 if (acceptNewTasks && !isBlacklisted) { TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ; if (taskTrackerStatus == null) { LOG.warn("Unknown task tracker polling; ignoring: " + trackerName); } else { //setup和cleanup的task的优先次序最高 List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); if (tasks == null ) {//taskScheduler.assignTasks方法注册一个task tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName)); } if (tasks != null) { for (Task task : tasks) { //将任务放入actions列表,返回给TaskTracker expireLaunchingTasks.addNewTask(task.getTaskID()); if (LOG.isDebugEnabled()) { LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID()); } actions.add(new LaunchTaskAction(task)); } } } } .... return response; }?
?
通过调用上面的方法其中实现了task在taskScheduler的注册,JobQueueTaskScheduler是JobTracker默认的Task调度器,上面方法中taskScheduler.assignTasks(),注册一个Task。
(2)下一步我们看一下JobQueueTaskScheduler类的assignTasks方法
?
public synchronized List<Task> assignTasks(TaskTracker taskTracker) throws IOException { TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); final int numTaskTrackers = clusterStatus.getTaskTrackers(); final int clusterMapCapacity = clusterStatus.getMaxMapTasks(); final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks(); Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue(); .... // // Compute (running + pending) map and reduce task numbers across pool // 计算剩余的map和reduce的工作量:remaining int remainingReduceLoad = 0; int remainingMapLoad = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() == JobStatus.RUNNING) { remainingMapLoad += (job.desiredMaps() - job.finishedMaps()); if (job.scheduleReduces()) { remainingReduceLoad += (job.desiredReduces() - job.finishedReduces()); } } } } // Compute the 'load factor' for maps and reduces double mapLoadFactor = 0.0; if (clusterMapCapacity > 0) { mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity; } double reduceLoadFactor = 0.0; if (clusterReduceCapacity > 0) { reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity; } final int trackerCurrentMapCapacity = Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), trackerMapCapacity); int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps; boolean exceededMapPadding = false; if (availableMapSlots > 0) { exceededMapPadding = exceededPadding(true, clusterStatus, trackerMapCapacity); } //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。 int numLocalMaps = 0; int numNonLocalMaps = 0; scheduleMaps: for (int i=0; i < availableMapSlots; ++i) { synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } Task t = null; // Try to schedule a node-local or rack-local Map task t = job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numLocalMaps; // Don't assign map tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededMapPadding) { break scheduleMaps; } // Try all jobs again for the next Map task break; } // Try to schedule a node-local or rack-local Map task t = job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numNonLocalMaps; // We assign at most 1 off-switch or speculative task // This is to prevent TaskTrackers from stealing local-tasks // from other TaskTrackers. break scheduleMaps; } } } } int assignedMaps = assignedTasks.size(); .... return assignedTasks; }?
?
以上的过程可能要经过一个复杂的计算,由jobTracker调度Task,从上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配调度map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找TaskInProgress.同样的道理JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。然后又JobTracker进行任务的分配,这个步骤就结束了,由于这个步骤比较简单这里就不画流程图了。
5.TaskTracker接收HeartbeatResponse并执行任务
在向JobTracker发送heartbeat后,如果返回的heartbeatreponse中含有分配好的任务LaunchTaskAction,TaskTracker则调用addToTaskQueue方法,将其加入TaskTracker类中MapLauncher或者ReduceLauncher对象的taskToLaunch队列。具体的怎么通过RPC接收到heartbeatreponse这里不做分析,接收到分配的任务后,调用
addToTaskQueue方法。
(1)所以我们先看一下addToTaskQueue方法
?
private void addToTaskQueue(LaunchTaskAction action) { if (action.getTask().isMapTask()) { mapLauncher.addToTaskQueue(action); } else { reduceLauncher.addToTaskQueue(action); } }?
?
在此,MapLauncher和ReduceLauncher对象均为TaskLauncher类的实例。该类是TaskTracker类的一个内部类,具有一个数据成员,是TaskTracker.TaskInProgress类型的队列。在此特别注意,在TaskTracker类内部所提到的TaskInProgress类均为TaskTracker的内部类,我们用TaskTracker.TaskInProgress表示,一定要和MapRed包中的TaskInProgress类区分,后者我们直接用TaskInProgress表示。如果应答包中包含的任务是map task则放入mapLancher的taskToLaunch队列,如果是reduce task则放入reduceLancher的taskToLaunch队列。
(2)不管是map task还是reduce task都要调用TaskLauncher中的addToTaskQueue方法
?
public void addToTaskQueue(LaunchTaskAction action) { synchronized (tasksToLaunch) { TaskInProgress tip = registerTask(action, this); tasksToLaunch.add(tip); tasksToLaunch.notifyAll(); } }?
?
(3)然后继续registerTask方法
?
private TaskInProgress registerTask(LaunchTaskAction action, TaskLauncher launcher) { //从action中获取Task对象 Task t = action.getTask(); LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() + " task's state:" + t.getState()); TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher); synchronized (this) { //在相应的数据结构中增加所生成的TaskTracker.TaskInProgress对象,以通知程序其他部分该任务的建立 tasks.put(t.getTaskID(), tip); runningTasks.put(t.getTaskID(), tip); boolean isMap = t.isMapTask(); if (isMap) { mapTotal++; } else { reduceTotal++; } } return tip; }?
?
同时,TaskLauncher类继承了Thread类,所以在程序运行过程中,它们各自都以一个线程独立运行。它们的启动在TaskTracker初始化过程中已经完成。该类的run函数就是不断监测taskToLaunch队列中是否有新的TaskTracker.TaskInProgress对象加入。如果有则从中取出一个对象,然后调用TaskTracker类的startNewTask(TaskInProgress tip)来启动一个task。
(4)我们看一下启动的run方法
?
public void run() { while (!Thread.interrupted()) { try { TaskInProgress tip; Task task; synchronized (tasksToLaunch) { while (tasksToLaunch.isEmpty()) { tasksToLaunch.wait(); } ..... synchronized (tip) { //to make sure that there is no kill task action for this if (!tip.canBeLaunched()) { //got killed externally while still in the launcher queue LOG.info("Not launching task " + task.getTaskID() + " as it got" + " killed externally. Task's state is " + tip.getRunState()); addFreeSlots(task.getNumSlotsRequired()); continue; } tip.slotTaken = true; } //got a free slot. launch the task //启动一个新的Task startNewTask(tip); } catch (InterruptedException e) { return; // ALL DONE } catch (Throwable th) { LOG.error("TaskLauncher error " + StringUtils.stringifyException(th)); } }?而startNewTask方法主要是调用了localizeJob(tip)方法实现本地化,完成从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar以及Task运行的必须的文件,这个过程属于非常重要的部分
?
(5)看一下重要的localizeJob方法
?
RunningJob localizeJob(TaskInProgress tip ) throws IOException, InterruptedException { Task t = tip.getTask(); JobID jobId = t.getJobID(); RunningJob rjob = addTaskToJob(jobId, tip); // Initialize the user directories if needed. //初始化用户文件目录 getLocalizer().initializeUserDirs(t.getUser()); synchronized (rjob) { if (!rjob.localized) { //初始化本地配置文件 JobConf localJobConf = localizeJobFiles(t, rjob); // 初始化日志目录 initializeJobLogDir(jobId, localJobConf); // Now initialize the job via task-controller so as to set // ownership/permissions of jars, job-work-dir. Note that initializeJob // should be the last call after every other directory/file to be // directly under the job directory is created. JobInitializationContext context = new JobInitializationContext(); context.jobid = jobId; context.user = t.getUser(); context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR)); taskController.initializeJob(context); rjob.jobConf = localJobConf; rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) || localJobConf.getKeepFailedTaskFiles()); rjob.localized = true; } } return rjob; }?
?
(6)然后进行了一系列本地化的操作,这个步骤比较繁琐,我们简单看一下几个方法
?
PathlocalJobFile = lDirAlloc.getLocalPathForWrite( getLocalJobDir(jobId.toString()) + Path.SEPARATOR + "job.xml", jobFileSize, fConf); RunningJob rjob = addTaskToJob(jobId, tip); synchronized (rjob) { if(!rjob.localized) { FileSystem localFs = FileSystem.getLocal(fConf); PathjobDir = localJobFile.getParent(); …… //将job.split拷贝到本地 systemFS.copyToLocalFile(jobFile, localJobFile); JobConf localJobConf = new JobConf(localJobFile); PathworkDir = lDirAlloc.getLocalPathForWrite( (getLocalJobDir(jobId.toString()) + Path.SEPARATOR +"work"), fConf); if(!localFs.mkdirs(workDir)) { throw new IOException("Mkdirs failed to create " + workDir.toString()); } System.setProperty("job.local.dir", workDir.toString()); localJobConf.set("job.local.dir", workDir.toString()); //copy Jar file to the local FS and unjar it. String jarFile = localJobConf.getJar(); longjarFileSize = -1; if(jarFile != null) { Path jarFilePath = new Path(jarFile); localJarFile = new Path(lDirAlloc.getLocalPathForWrite( getLocalJobDir(jobId.toString()) +Path.SEPARATOR + "jars", 5 *jarFileSize, fConf), "job.jar"); if(!localFs.mkdirs(localJarFile.getParent())) { throw new IOException("Mkdirs failed to create jars directory"); } //将job.jar拷贝到本地 systemFS.copyToLocalFile(jarFilePath, localJarFile); localJobConf.setJar(localJarFile.toString()); //将job得configuration写成job.xml OutputStream out = localFs.create(localJobFile); try{ localJobConf.writeXml(out); }finally { out.close(); } // 解压缩job.jar RunJar.unJar(new File(localJarFile.toString()), newFile(localJarFile.getParent().toString())); } rjob.localized = true; rjob.jobConf = localJobConf; } } //真正的启动此Task launchTaskForJob(tip, new JobConf(rjob.jobConf));}当所有的task运行所需要的资源都拷贝到本地后,则调用TaskTracker的launchTaskForJob方法,其又调用TaskTracker.TaskInProgress的launchTask函数
?
public synchronized void launchTask() throwsIOException { …… //创建task运行目录 localizeTask(task); if(this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { this.taskStatus.setRunState(TaskStatus.State.RUNNING); } //创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner this.runner = task.createRunner(TaskTracker.this, this); this.runner.start(); this.taskStatus.setStartTime(System.currentTimeMillis());}TaskRunner是抽象类,是Thread类的子类,其run函数如下:public final void run() { …… TaskAttemptID taskid = t.getTaskID(); LocalDirAllocator lDirAlloc = newLocalDirAllocator("mapred.local.dir"); FilejobCacheDir = null; if(conf.getJar() != null) { jobCacheDir = new File( newPath(conf.getJar()).getParent().toString()); } File workDir = newFile(lDirAlloc.getLocalPathToRead( TaskTracker.getLocalTaskDir( t.getJobID().toString(), t.getTaskID().toString(), t.isTaskCleanupTask()) + Path.SEPARATOR + MRConstants.WORKDIR, conf).toString()); FileSystem fileSystem; PathlocalPath; …… //拼写classpath StringbaseDir; Stringsep = System.getProperty("path.separator"); StringBuffer classPath = new StringBuffer(); //start with same classpath as parent process classPath.append(System.getProperty("java.class.path")); classPath.append(sep); if(!workDir.mkdirs()) { if(!workDir.isDirectory()) { LOG.fatal("Mkdirs failed to create " + workDir.toString()); } } Stringjar = conf.getJar(); if (jar!= null) { // ifjar exists, it into workDir File[] libs = new File(jobCacheDir, "lib").listFiles(); if(libs != null) { for(int i = 0; i < libs.length; i++) { classPath.append(sep); //add libs from jar to classpath classPath.append(libs[i]); } } classPath.append(sep); classPath.append(new File(jobCacheDir, "classes")); classPath.append(sep); classPath.append(jobCacheDir); } …… classPath.append(sep); classPath.append(workDir); //拼写命令行java及其参数 Vector<String> vargs = new Vector<String>(8); Filejvm = newFile(new File(System.getProperty("java.home"), "bin"),"java"); vargs.add(jvm.toString()); StringjavaOpts = conf.get("mapred.child.java.opts", "-Xmx200m"); javaOpts = javaOpts.replace("@taskid@", taskid.toString()); String[] javaOptsSplit = javaOpts.split(" "); StringlibraryPath = System.getProperty("java.library.path"); if(libraryPath == null) { libraryPath = workDir.getAbsolutePath(); } else{ libraryPath += sep + workDir; } booleanhasUserLDPath = false; for(inti=0; i<javaOptsSplit.length ;i++) { if(javaOptsSplit[i].startsWith("-Djava.library.path=")) { javaOptsSplit[i] += sep + libraryPath; hasUserLDPath = true; break; } } if(!hasUserLDPath) { vargs.add("-Djava.library.path=" + libraryPath); } for(int i = 0; i < javaOptsSplit.length; i++) { vargs.add(javaOptsSplit[i]); } //添加Child进程的临时文件夹 Stringtmp = conf.get("mapred.child.tmp", "./tmp"); PathtmpDir = new Path(tmp); if(!tmpDir.isAbsolute()) { tmpDir = new Path(workDir.toString(), tmp); } FileSystem localFs = FileSystem.getLocal(conf); if(!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) { thrownew IOException("Mkdirs failed to create " + tmpDir.toString()); } vargs.add("-Djava.io.tmpdir=" + tmpDir.toString()); // Addclasspath. vargs.add("-classpath"); vargs.add(classPath.toString()); //log文件夹 longlogSize = TaskLog.getTaskLogLength(conf); vargs.add("-Dhadoop.log.dir=" + newFile(System.getProperty("hadoop.log.dir") ).getAbsolutePath()); vargs.add("-Dhadoop.root.logger=INFO,TLA"); vargs.add("-Dhadoop.tasklog.taskid=" + taskid); vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize); // 运行map task和reduce task的子进程的main class是Child vargs.add(Child.class.getName()); // main of Child?最后贴一张这个步骤的流程图,后续流程明天继续分析!!!

?