读书人

MapReduce运作流程源码分析(二)

发布时间: 2012-12-22 12:05:06 作者: rapoo

MapReduce运行流程源码分析(二)


这篇博客是接着昨天分析MapReduce的流程继续进行分析的:

4.JobTracker接收Heartbeat并向TaskTracker分配任务

上一步中TaskTracker调用transmitHeartBeat方法发送Heartbeat给JobTracker,当JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean ,initialContact, booleanacceptNewTasks, short responseId)函数被调用。

(1)我们看一下JobTracker类的heartbeat方法

?

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,                                                   boolean restarted,                                                  boolean initialContact,                                                  boolean acceptNewTasks,                                                   short responseId)     throws IOException {    if (LOG.isDebugEnabled()) {      LOG.debug("Got heartbeat from: " + status.getTrackerName() +                 " (restarted: " + restarted +                 " initialContact: " + initialContact +                 " acceptNewTasks: " + acceptNewTasks + ")" +                " with responseId: " + responseId);    }    // Make sure heartbeat is from a tasktracker allowed by the jobtracker.    if (!acceptTaskTracker(status)) {      throw new DisallowedTaskTrackerException(status);    }    ....    //初始化一个HeartbeatResponse对象    HeartbeatResponse prevHeartbeatResponse =      trackerToHeartbeatResponseMap.get(trackerName);    if (initialContact != true) {      // If this isn't the 'initial contact' from the tasktracker,      // there is something seriously wrong if the JobTracker has      // no record of the 'previous heartbeat'; if so, ask the       // tasktracker to re-initialize itself.      if (prevHeartbeatResponse == null) {        // This is the first heartbeat from the old tracker to the newly         // started JobTracker                // Jobtracker might have restarted but no recovery is needed        // otherwise this code should not be reached        LOG.warn("Serious problem, cannot find record of 'previous' " +                 "heartbeat for '" + trackerName +                  "'; reinitializing the tasktracker");        return new HeartbeatResponse(responseId,             new TaskTrackerAction[] {new ReinitTrackerAction()});            } else {                        // It is completely safe to not process a 'duplicate' heartbeat from a         // {@link TaskTracker} since it resends the heartbeat when rpcs are         // lost see {@link TaskTracker.transmitHeartbeat()};        // acknowledge it by re-sending the previous response to let the         // {@link TaskTracker} go forward.         if (prevHeartbeatResponse.getResponseId() != responseId) {          LOG.info("Ignoring 'duplicate' heartbeat from '" +               trackerName + "'; resending the previous 'lost' response");          return prevHeartbeatResponse;        }      }    }          // Process this heartbeat     short newResponseId = (short)(responseId + 1);    status.setLastSeen(now);    if (!processHeartbeat(status, initialContact)) {      if (prevHeartbeatResponse != null) {        trackerToHeartbeatResponseMap.remove(trackerName);      }      return new HeartbeatResponse(newResponseId,                    new TaskTrackerAction[] {new ReinitTrackerAction()});    }          // Initialize the response to be sent for the heartbeat    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);    List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();    isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());    // Check for new tasks to be executed on the tasktracker  //如果TaskTracker向JobTracker请求一个task运行    if (acceptNewTasks && !isBlacklisted) {      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;      if (taskTrackerStatus == null) {        LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);      } else { //setup和cleanup的task的优先次序最高        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);        if (tasks == null ) {//taskScheduler.assignTasks方法注册一个task          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));        }        if (tasks != null) {          for (Task task : tasks) {  //将任务放入actions列表,返回给TaskTracker            expireLaunchingTasks.addNewTask(task.getTaskID());            if (LOG.isDebugEnabled()) {              LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());            }            actions.add(new LaunchTaskAction(task));          }        }      }    }    ....       return response;  }

?

?

通过调用上面的方法其中实现了task在taskScheduler的注册,JobQueueTaskScheduler是JobTracker默认的Task调度器,上面方法中taskScheduler.assignTasks(),注册一个Task。

(2)下一步我们看一下JobQueueTaskScheduler类的assignTasks方法

?

public synchronized List<Task> assignTasks(TaskTracker taskTracker)      throws IOException {    TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();    final int numTaskTrackers = clusterStatus.getTaskTrackers();    final int clusterMapCapacity = clusterStatus.getMaxMapTasks();    final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();    Collection<JobInProgress> jobQueue =      jobQueueJobInProgressListener.getJobQueue();    ....    //    // Compute (running + pending) map and reduce task numbers across pool    // 计算剩余的map和reduce的工作量:remaining    int remainingReduceLoad = 0;    int remainingMapLoad = 0;    synchronized (jobQueue) {      for (JobInProgress job : jobQueue) {        if (job.getStatus().getRunState() == JobStatus.RUNNING) {          remainingMapLoad += (job.desiredMaps() - job.finishedMaps());          if (job.scheduleReduces()) {            remainingReduceLoad +=               (job.desiredReduces() - job.finishedReduces());          }        }      }    }    // Compute the 'load factor' for maps and reduces    double mapLoadFactor = 0.0;    if (clusterMapCapacity > 0) {      mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;    }    double reduceLoadFactor = 0.0;    if (clusterReduceCapacity > 0) {      reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;    }                final int trackerCurrentMapCapacity =       Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),                               trackerMapCapacity);    int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;    boolean exceededMapPadding = false;    if (availableMapSlots > 0) {      exceededMapPadding =         exceededPadding(true, clusterStatus, trackerMapCapacity);    }    //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。    int numLocalMaps = 0;    int numNonLocalMaps = 0;    scheduleMaps:    for (int i=0; i < availableMapSlots; ++i) {      synchronized (jobQueue) {        for (JobInProgress job : jobQueue) {          if (job.getStatus().getRunState() != JobStatus.RUNNING) {            continue;          }          Task t = null;                    // Try to schedule a node-local or rack-local Map task          t =             job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers,                                      taskTrackerManager.getNumberOfUniqueHosts());          if (t != null) {            assignedTasks.add(t);            ++numLocalMaps;                        // Don't assign map tasks to the hilt!            // Leave some free slots in the cluster for future task-failures,            // speculative tasks etc. beyond the highest priority job            if (exceededMapPadding) {              break scheduleMaps;            }                       // Try all jobs again for the next Map task             break;          }                    // Try to schedule a node-local or rack-local Map task          t =             job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,                                   taskTrackerManager.getNumberOfUniqueHosts());                    if (t != null) {            assignedTasks.add(t);            ++numNonLocalMaps;                        // We assign at most 1 off-switch or speculative task            // This is to prevent TaskTrackers from stealing local-tasks            // from other TaskTrackers.            break scheduleMaps;          }        }      }    }    int assignedMaps = assignedTasks.size();    ....    return assignedTasks;  }

?

?

以上的过程可能要经过一个复杂的计算,由jobTracker调度Task,从上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配调度map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找TaskInProgress.同样的道理JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。然后又JobTracker进行任务的分配,这个步骤就结束了,由于这个步骤比较简单这里就不画流程图了。

5.TaskTracker接收HeartbeatResponse并执行任务

在向JobTracker发送heartbeat后,如果返回的heartbeatreponse中含有分配好的任务LaunchTaskAction,TaskTracker则调用addToTaskQueue方法,将其加入TaskTracker类中MapLauncher或者ReduceLauncher对象的taskToLaunch队列。具体的怎么通过RPC接收到heartbeatreponse这里不做分析,接收到分配的任务后,调用

addToTaskQueue方法。

(1)所以我们先看一下addToTaskQueue方法

?

private void addToTaskQueue(LaunchTaskAction action) {    if (action.getTask().isMapTask()) {      mapLauncher.addToTaskQueue(action);    } else {      reduceLauncher.addToTaskQueue(action);    }  }

?

?

在此,MapLauncher和ReduceLauncher对象均为TaskLauncher类的实例。该类是TaskTracker类的一个内部类,具有一个数据成员,是TaskTracker.TaskInProgress类型的队列。在此特别注意,在TaskTracker类内部所提到的TaskInProgress类均为TaskTracker的内部类,我们用TaskTracker.TaskInProgress表示,一定要和MapRed包中的TaskInProgress类区分,后者我们直接用TaskInProgress表示。如果应答包中包含的任务是map task则放入mapLancher的taskToLaunch队列,如果是reduce task则放入reduceLancher的taskToLaunch队列。

(2)不管是map task还是reduce task都要调用TaskLauncher中的addToTaskQueue方法

?

public void addToTaskQueue(LaunchTaskAction action) {      synchronized (tasksToLaunch) {        TaskInProgress tip = registerTask(action, this);        tasksToLaunch.add(tip);        tasksToLaunch.notifyAll();      }    }

?

?

(3)然后继续registerTask方法

?

 private TaskInProgress registerTask(LaunchTaskAction action,       TaskLauncher launcher) {  //从action中获取Task对象    Task t = action.getTask();    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +             " task's state:" + t.getState());    TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);    synchronized (this) {   //在相应的数据结构中增加所生成的TaskTracker.TaskInProgress对象,以通知程序其他部分该任务的建立      tasks.put(t.getTaskID(), tip);      runningTasks.put(t.getTaskID(), tip);      boolean isMap = t.isMapTask();      if (isMap) {        mapTotal++;      } else {        reduceTotal++;      }    }    return tip;  }

?

?

同时,TaskLauncher类继承了Thread类,所以在程序运行过程中,它们各自都以一个线程独立运行。它们的启动在TaskTracker初始化过程中已经完成。该类的run函数就是不断监测taskToLaunch队列中是否有新的TaskTracker.TaskInProgress对象加入。如果有则从中取出一个对象,然后调用TaskTracker类的startNewTask(TaskInProgress tip)来启动一个task。

(4)我们看一下启动的run方法

?

 public void run() {      while (!Thread.interrupted()) {        try {          TaskInProgress tip;          Task task;          synchronized (tasksToLaunch) {            while (tasksToLaunch.isEmpty()) {              tasksToLaunch.wait();            }           .....          synchronized (tip) {            //to make sure that there is no kill task action for this            if (!tip.canBeLaunched()) {              //got killed externally while still in the launcher queue              LOG.info("Not launching task " + task.getTaskID() + " as it got"                + " killed externally. Task's state is " + tip.getRunState());              addFreeSlots(task.getNumSlotsRequired());              continue;            }            tip.slotTaken = true;          }          //got a free slot. launch the task          //启动一个新的Task          startNewTask(tip);        } catch (InterruptedException e) {           return; // ALL DONE        } catch (Throwable th) {          LOG.error("TaskLauncher error " +               StringUtils.stringifyException(th));        }      }

?而startNewTask方法主要是调用了localizeJob(tip)方法实现本地化,完成从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar以及Task运行的必须的文件,这个过程属于非常重要的部分

?

(5)看一下重要的localizeJob方法

?

 RunningJob localizeJob(TaskInProgress tip                           ) throws IOException, InterruptedException {    Task t = tip.getTask();    JobID jobId = t.getJobID();    RunningJob rjob = addTaskToJob(jobId, tip);    // Initialize the user directories if needed.    //初始化用户文件目录    getLocalizer().initializeUserDirs(t.getUser());    synchronized (rjob) {      if (!rjob.localized) {       //初始化本地配置文件        JobConf localJobConf = localizeJobFiles(t, rjob);        // 初始化日志目录        initializeJobLogDir(jobId, localJobConf);        // Now initialize the job via task-controller so as to set        // ownership/permissions of jars, job-work-dir. Note that initializeJob        // should be the last call after every other directory/file to be        // directly under the job directory is created.        JobInitializationContext context = new JobInitializationContext();        context.jobid = jobId;        context.user = t.getUser();        context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));        taskController.initializeJob(context);        rjob.jobConf = localJobConf;        rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||                             localJobConf.getKeepFailedTaskFiles());        rjob.localized = true;      }    }    return rjob;  }

?

?

(6)然后进行了一系列本地化的操作,这个步骤比较繁琐,我们简单看一下几个方法

?

PathlocalJobFile = lDirAlloc.getLocalPathForWrite(                  getLocalJobDir(jobId.toString())                 + Path.SEPARATOR + "job.xml",                  jobFileSize, fConf); RunningJob rjob = addTaskToJob(jobId, tip); synchronized (rjob) {    if(!rjob.localized) {     FileSystem localFs = FileSystem.getLocal(fConf);      PathjobDir = localJobFile.getParent();      ……      //将job.split拷贝到本地     systemFS.copyToLocalFile(jobFile, localJobFile);     JobConf localJobConf = new JobConf(localJobFile);      PathworkDir = lDirAlloc.getLocalPathForWrite(                      (getLocalJobDir(jobId.toString())                       + Path.SEPARATOR +"work"), fConf);      if(!localFs.mkdirs(workDir)) {       throw new IOException("Mkdirs failed to create "                    + workDir.toString());      }     System.setProperty("job.local.dir", workDir.toString());     localJobConf.set("job.local.dir", workDir.toString());      //copy Jar file to the local FS and unjar it.     String jarFile = localJobConf.getJar();      longjarFileSize = -1;      if(jarFile != null) {       Path jarFilePath = new Path(jarFile);       localJarFile = new Path(lDirAlloc.getLocalPathForWrite(                                  getLocalJobDir(jobId.toString())                                   +Path.SEPARATOR + "jars",                                   5 *jarFileSize, fConf), "job.jar");        if(!localFs.mkdirs(localJarFile.getParent())) {         throw new IOException("Mkdirs failed to create jars directory");        }        //将job.jar拷贝到本地       systemFS.copyToLocalFile(jarFilePath, localJarFile);       localJobConf.setJar(localJarFile.toString());       //将job得configuration写成job.xml       OutputStream out = localFs.create(localJobFile);        try{         localJobConf.writeXml(out);        }finally {         out.close();        }        // 解压缩job.jar       RunJar.unJar(new File(localJarFile.toString()),                     newFile(localJarFile.getParent().toString()));      }     rjob.localized = true;     rjob.jobConf = localJobConf;    }  }  //真正的启动此Task launchTaskForJob(tip, new JobConf(rjob.jobConf));}

当所有的task运行所需要的资源都拷贝到本地后,则调用TaskTracker的launchTaskForJob方法,其又调用TaskTracker.TaskInProgress的launchTask函数

?

public synchronized void launchTask() throwsIOException {    ……    //创建task运行目录   localizeTask(task);    if(this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {     this.taskStatus.setRunState(TaskStatus.State.RUNNING);    }    //创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner   this.runner = task.createRunner(TaskTracker.this, this);   this.runner.start();   this.taskStatus.setStartTime(System.currentTimeMillis());}TaskRunner是抽象类,是Thread类的子类,其run函数如下:public final void run() {    ……   TaskAttemptID taskid = t.getTaskID();   LocalDirAllocator lDirAlloc = newLocalDirAllocator("mapred.local.dir");    FilejobCacheDir = null;    if(conf.getJar() != null) {     jobCacheDir = new File(                        newPath(conf.getJar()).getParent().toString());    }    File workDir = newFile(lDirAlloc.getLocalPathToRead(                             TaskTracker.getLocalTaskDir(                               t.getJobID().toString(),                               t.getTaskID().toString(),                                t.isTaskCleanupTask())           + Path.SEPARATOR + MRConstants.WORKDIR,                              conf).toString());   FileSystem fileSystem;    PathlocalPath;    ……    //拼写classpath    StringbaseDir;    Stringsep = System.getProperty("path.separator");   StringBuffer classPath = new StringBuffer();    //start with same classpath as parent process   classPath.append(System.getProperty("java.class.path"));   classPath.append(sep);    if(!workDir.mkdirs()) {      if(!workDir.isDirectory()) {       LOG.fatal("Mkdirs failed to create " + workDir.toString());      }    }    Stringjar = conf.getJar();    if (jar!= null) {           // ifjar exists, it into workDir     File[] libs = new File(jobCacheDir, "lib").listFiles();      if(libs != null) {        for(int i = 0; i < libs.length; i++) {         classPath.append(sep);         //add libs from jar to classpath         classPath.append(libs[i]);        }      }     classPath.append(sep);     classPath.append(new File(jobCacheDir, "classes"));     classPath.append(sep);     classPath.append(jobCacheDir);    }    ……   classPath.append(sep);   classPath.append(workDir);    //拼写命令行java及其参数   Vector<String> vargs = new Vector<String>(8);    Filejvm =      newFile(new File(System.getProperty("java.home"), "bin"),"java");   vargs.add(jvm.toString());    StringjavaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");   javaOpts = javaOpts.replace("@taskid@", taskid.toString());    String[] javaOptsSplit = javaOpts.split(" ");    StringlibraryPath = System.getProperty("java.library.path");    if(libraryPath == null) {     libraryPath = workDir.getAbsolutePath();    } else{     libraryPath += sep + workDir;    }    booleanhasUserLDPath = false;    for(inti=0; i<javaOptsSplit.length ;i++) {     if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {       javaOptsSplit[i] += sep + libraryPath;        hasUserLDPath = true;       break;      }    }   if(!hasUserLDPath) {     vargs.add("-Djava.library.path=" + libraryPath);    }    for(int i = 0; i < javaOptsSplit.length; i++) {     vargs.add(javaOptsSplit[i]);    }    //添加Child进程的临时文件夹    Stringtmp = conf.get("mapred.child.tmp", "./tmp");    PathtmpDir = new Path(tmp);    if(!tmpDir.isAbsolute()) {     tmpDir = new Path(workDir.toString(), tmp);    }   FileSystem localFs = FileSystem.getLocal(conf);    if(!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {      thrownew IOException("Mkdirs failed to create " + tmpDir.toString());    }   vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());    // Addclasspath.   vargs.add("-classpath");   vargs.add(classPath.toString());    //log文件夹    longlogSize = TaskLog.getTaskLogLength(conf);   vargs.add("-Dhadoop.log.dir=" +        newFile(System.getProperty("hadoop.log.dir")       ).getAbsolutePath());   vargs.add("-Dhadoop.root.logger=INFO,TLA");   vargs.add("-Dhadoop.tasklog.taskid=" + taskid);   vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);    // 运行map task和reduce task的子进程的main class是Child   vargs.add(Child.class.getName()); // main of Child

?最后贴一张这个步骤的流程图,后续流程明天继续分析!!!

MapReduce运作流程源码分析(二)

?

读书人网 >编程

热点推荐