MapReduce多用户任务调度器——容量调度器(Capacity Scheduler)原理和源码研究


前言:为了研究需要,将Capacity Scheduler和Fair Scheduler的原理和代码进行学习,用两篇文章作为记录。如有理解错误之处,欢迎批评指正。

容量调度器(Capacity Scheduler)是Yahoo公司开发的多用户调度器。多用户调度器的使用场景很多,根据资料1的说法,Hadoop集群的用户量越来越大,不同用户提交的应用程序具有不同的服务质量要求(QoS):

1. 批处理作业:耗时较长,对完成时间没有严格要求。如数据挖掘、机器学习等应用。

2. 交互式作业:期望及时返回结果。如Hive等应用。

3. 生产性作业:要求一定量的的资源保证。如统计值计算、垃圾数据分析等。

传统的FIFO调度器不能满足应用对响应时间和资源的多样化要求,多用户多队列调度器应运而生。容量调度器即是其中被广泛应用的一种。


一、基本思想

容量调度器以队列为单位划分资源,每个队列都有资源使用的下限和上限。每个用户也可以设定资源使用上限。一个队列的剩余资源可以共享给另一个队列,其他队列使用后还可以归还。管理员可以约束单个队列、用户或作业的资源使用。支持资源密集型作业,可以给某些作业分配多个slot(这是比较特殊的一点)。支持作业优先级,但不支持资源抢占。

这里明确一下用户、队列和作业之间的关系。Hadoop以队列为单位管理资源,每个队列分配到一定的资源,用户只能向一个或几个队列提交作业。队列管理体现为两方面:1. 用户权限管理:Hadoop用户管理模块建立在操作系统用户和用户组之间的映射之上,允许一个操作系统用户或者用户组对应一个或者多个队列。同时可以配置每个队列的管理员用户。队列信息配置在mapred-site.xml文件中,包括队列的名称,是否启用权限管理功能等信息,且不支持动态加载。队列权限选项配置在mapred-queue-acls.xml文件中,可以配置某个用户或用户组在某个队列中的某种权限。权限包括作业提交权限和作业管理权限。2. 系统资源管理:管理员可以配置每个队列和每个用户的可用资源量信息,为调度器提供调度依据。这些信息配置在调度器自己的配置文件(如Capacity-Scheduler.xml)中。关于每个配置文件的常见内容见附录。

二、整体架构

总体来说,容量调度器的工作流程分5个步骤:

1. 用户提交作业到JobTracker。

2. JobTracker将提交的作业交给Capacity Scheduler的监听器JobQueuesManager,并将作业加入等待队列,由JobInitializationPoller线程初始化。

3. TaskTracker通过心跳信息要求JobTracker为其分配任务。

4. JobTracker调用Capacity Scheduler的assignTasks方法为其分配任务。

5. JobTracker将分配到的任务返回给TaskTracker。

接下,我们结合源代码依次研究上述过程。

三、实现细节

1. 调度器的启动

回忆一下,前面谈到调度器启动是由JobTracker调用调度器的start方法实现的,首先来看start方法:

    // initialize our queues from the config settings
    if (null == schedConf) {
      schedConf = new CapacitySchedulerConf();
    }
首先生成配置对象,容量调度器定义了自己的配置对象,构造时会加载调度器自己的配置文件作为资源,并初始化一些默认的配置选项:

public CapacitySchedulerConf() {
    rmConf = new Configuration(false);
    rmConf.addResource(SCHEDULER_CONF_FILE);
    initializeDefaults();
  }

private void initializeDefaults() {
    defaultUlimitMinimum = 
      rmConf.getInt(
          "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
    defaultUserLimitFactor = 
      rmConf.getFloat("mapred.capacity-scheduler.default-user-limit-factor", 
          1.0f);
    defaultSupportPriority = rmConf.getBoolean(
        "mapred.capacity-scheduler.default-supports-priority", false);
    defaultMaxActiveTasksPerQueueToInitialize = 
      rmConf.getInt(
          "mapred.capacity-scheduler.default-maximum-active-tasks-per-queue", 
          200000);
    defaultMaxActiveTasksPerUserToInitialize = 
      rmConf.getInt(
          "mapred.capacity-scheduler.default-maximum-active-tasks-per-user", 
          100000);
    defaultInitToAcceptJobsFactor =
      rmConf.getInt("mapred.capacity-scheduler.default-init-accept-jobs-factor", 
          10);
  }
例如,第一个默认值表示每个用户的最低资源保障,默认为100%。第三个默认值表示是否考虑作业优先级,默认是不考虑。其他配置可以参考资料1中的讲解。接下来,初始化队列信息,队列信息由QueueManager对象获得,该对象的构造过程如下:

  public QueueManager(Configuration conf) {
    checkDeprecation(conf);
    conf.addResource(QUEUE_ACLS_FILE_NAME);
    
    // Get configured ACLs and state for each queue
    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);

    queues.putAll(parseQueues(conf)); 
  }
  
  synchronized private Map<String, Queue> parseQueues(Configuration conf) {
    Map<String, Queue> queues = new HashMap<String, Queue>();
    // First get the queue names
    String[] queueNameValues = conf.getStrings("mapred.queue.names",
        new String[]{JobConf.DEFAULT_QUEUE_NAME});
    for (String name : queueNameValues) {
      Map queueACLs = getQueueAcls(name, conf);
      if (queueACLs == null) {
        LOG.error("The queue, " + name + " does not have a configured ACL list");
      }
      queues.put(name, new Queue(name, getQueueAcls(name, conf),
          getQueueState(name, conf), QueueMetrics.create(name, conf)));
    }
    
    return queues;
  }
首先,获取用户权限配置文件mapred-queue-acls.xml。然后通过mapred-site.xml中的配置解析并生成队列的列表queues。解析的过程是,先获取每个队列的名字,再通过名字获取队列的权限配置,最后依据这些信息以及队列状态和队列度量对象构造一个队列并加入结果列表。如上面代码。在初始化队列之前还有构造出每个队列对应的CapacitySchedulerQueue对象:

  Map<String, CapacitySchedulerQueue> 
  parseQueues(Collection<String> queueNames, CapacitySchedulerConf schedConf) 
  throws IOException {
    Map<String, CapacitySchedulerQueue> queueInfoMap = 
      new HashMap<String, CapacitySchedulerQueue>();
    
    // Sanity check: there should be at least one queue. 
    if (0 == queueNames.size()) {
      throw new IllegalStateException("System has no queue configured");
    }

    float totalCapacityPercent = 0.0f;
    for (String queueName: queueNames) {
      float capacityPercent = schedConf.getCapacity(queueName);
      if (capacityPercent == -1.0) {
        throw new IOException("Queue '" + queueName + 
            "' doesn't have configured capacity!");
      } 
      
      totalCapacityPercent += capacityPercent;

      // create our Queue and add to our hashmap
      CapacitySchedulerQueue queue = 
        new CapacitySchedulerQueue(queueName, schedConf);
      queueInfoMap.put(queueName, queue);
    }
    
    if (Math.floor(totalCapacityPercent) != 100) {
      throw new IllegalArgumentException(
        "Sum of queue capacities not 100% at "
          + totalCapacityPercent);
    }    

    return queueInfoMap;
  }
容量调度器队列对象被装入一个以队列名为键的map中返回并用于初始化。获取队列后要进行初始化,由函数initialize完成:

    void initialize(QueueManager queueManager,
      Map<String, CapacitySchedulerQueue> newQueues,
      Configuration conf, CapacitySchedulerConf schedConf) {
    // Memory related configs
    initializeMemoryRelatedConf(conf);

    // Setup queues
    for (Map.Entry<String, CapacitySchedulerQueue> e : newQueues.entrySet()) {
      String newQueueName = e.getKey();
      CapacitySchedulerQueue newQueue = e.getValue();
      CapacitySchedulerQueue currentQueue = queueInfoMap.get(newQueueName);
      if (currentQueue != null) {
        currentQueue.initializeQueue(newQueue);
        LOG.info("Updated queue configs for " + newQueueName);
      } else {
        queueInfoMap.put(newQueueName, newQueue);
        LOG.info("Added new queue: " + newQueueName);
      }
    }

    // Set SchedulingDisplayInfo
    for (String queueName : queueInfoMap.keySet()) {
      SchedulingDisplayInfo schedulingInfo = 
        new SchedulingDisplayInfo(queueName, this);
      queueManager.setSchedulerInfo(queueName, schedulingInfo);
    }

    // Inform the queue manager 
    jobQueuesManager.setQueues(queueInfoMap);
    
    // let our mgr objects know about the queues
    mapScheduler.initialize(queueInfoMap);
    reduceScheduler.initialize(queueInfoMap);
    
    // scheduling tunables
    maxTasksPerHeartbeat = schedConf.getMaxTasksPerHeartbeat();
    maxTasksToAssignAfterOffSwitch = 
      schedConf.getMaxTasksToAssignAfterOffSwitch();
  }
具体过程如下:首先根据配置对象初始化跟内存相关的一些变量;然后检查某个队列是否在queueInfoMap数据结构中,若在,就更新队列信息,若不在,则加入其中,该数据结构提供了一个快速通过队列名访问队列的途径;接下来设置每个队列的调度信息用于展示或日志;然后将队列map交给监听器对象JobQueuesMananger;接着,将队列信息再交给map和reduce调度器对象,每个调度器对象维护了可以获取任务的队列列表,用于调度时的队列选择;最后设置批量任务分配的最大数量。

上述过程中,用于不同任务调度的mapScheduler和reduceScheduler值得进一步研究。队列会被加入到map或reduce调度器的优先级队列中:

     queuesForAssigningTasks.clear();
     queuesForAssigningTasks.addAll(queues.values());
     Collections.sort(queuesForAssigningTasks, queueComparator);
队列的优先级由queueComparator定义,map和reduce的比较器实现基本相同,只是任务类型不同:

     public int compare(CapacitySchedulerQueue q1, CapacitySchedulerQueue q2) {
        // look at how much capacity they've filled. Treat a queue with
        // capacity=0 equivalent to a queue running at capacity
        TaskType taskType = getTaskType();
        double r1 = (0 == q1.getCapacity(taskType))? 1.0f:
          (double)q1.getNumSlotsOccupied(taskType)/(double) q1.getCapacity(taskType);
        double r2 = (0 == q2.getCapacity(taskType))? 1.0f:
          (double)q2.getNumSlotsOccupied(taskType)/(double) q2.getCapacity(taskType);
        if (r1<r2) return -1;
        else if (r1>r2) return 1;
        else return 0;
      }
上述compare方法的实现表明,队列的资源使用率越高,在队列列表中的顺序越靠后,优先级越低。也就是说,Capacity Scheduler总是选择资源利用率最低的队列。至此,队列初始化分析完毕。

接下来,调度器将监听器对象注册到JobTracker:

    // listen to job changes
    taskTrackerManager.addJobInProgressListener(jobQueuesManager);
然后启动初始化线程:

    //Start thread for initialization
    if (initializationPoller == null) {
      this.initializationPoller = new JobInitializationPoller(
          jobQueuesManager, schedConf, queueNames, taskTrackerManager);
    }
    initializationPoller.init(queueNames.size(), schedConf);
    initializationPoller.setDaemon(true);
    initializationPoller.start();
初始化线程initializationPoller是个后台线程。init方法为每个队列指定一个初始化线程,线程总数总是小于或等于队列的数量。然后启动每个初始化线程。
最后设置用于显示调度器信息的Servlet:

    if (taskTrackerManager instanceof JobTracker) {
      JobTracker jobTracker = (JobTracker) taskTrackerManager;
      HttpServer infoServer = jobTracker.infoServer;
      infoServer.setAttribute("scheduler", this);
      infoServer.addServlet("scheduler", "/scheduler",
          CapacitySchedulerServlet.class);
    }
至此,调度器启动完毕。

2. 作业初始化

由于初始化的作业不能得到调度会占用过多内存,容量调度器通过两种策略初始化作业:1. 优先初始化最可能被调度器调度的作业;2. 限制用户初始化作业数目。详细过程如下:作业被提交到JobTracker后,调度器的监听器调用jobAdded方法:

    // add job to the right queue
    CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());
这条语句将作业加入对应的队列中。接下来调用队列的addWaitingJob方法:

    synchronized void addWaitingJob(JobInProgress job) throws IOException {
    JobSchedulingInfo jobSchedInfo = new JobSchedulingInfo(job);
    
    String user = job.getProfile().getUser();

    // Check acceptance limits
    checkJobSubmissionLimits(job, user);
    
    waitingJobs.put(jobSchedInfo, job);
    
    // Update user stats
    UserInfo userInfo = users.get(user);
    if (userInfo == null) {
      userInfo = new UserInfo(comparator);
      users.put(user, userInfo);
    }
    userInfo.jobAdded(jobSchedInfo, job);
  }
在该方法中,首先生成调度信息对象,此对象与默认的FIFO调度器的调度信息对象一样。然后检查三个约束:

1. 作业的任务数是否超过每个用户最大任务数

    if (job.desiredTasks() > maxActiveTasksPerUser) {
      throw new IOException(
          "Job '" + job.getJobID() + "' from user '" + user  +
          "' rejected since it has " + job.desiredTasks() + " tasks which" +
          " exceeds the limit of " + maxActiveTasksPerUser + 
          " tasks per-user which can be initialized for queue '" + 
          queueName + "'"
          );
    }
2. 队列中等待初始化、已经初始化的作业数目和在运行的作业数不能超过可接受值

// Across all jobs in queue
    int queueWaitingJobs = getNumWaitingJobs();
    int queueInitializingJobs = getNumInitializingJobs();
    int queueRunningJobs = getNumRunningJobs();
    if ((queueWaitingJobs + queueInitializingJobs + queueRunningJobs) >= 
      maxJobsToAccept) {
      throw new IOException(
          "Job '" + job.getJobID() + "' from user '" + user  + 
          "' rejected since queue '" + queueName + 
          "' already has " + queueWaitingJobs + " waiting jobs, " + 
          queueInitializingJobs + " initializing jobs and " + 
          queueRunningJobs + " running jobs - Exceeds limit of " +
          maxJobsToAccept + " jobs to accept");
    }
3. 用户等待初始化、已经初始化和在运行的作业数不能超过可接受值

    // Across all jobs of the user
    int userWaitingJobs = getNumWaitingJobsByUser(user);
    int userInitializingJobs = getNumInitializingJobsByUser(user);
    int userRunningJobs = getNumRunningJobsByUser(user);
    if ((userWaitingJobs + userInitializingJobs + userRunningJobs) >= 
        maxJobsPerUserToAccept) {
      throw new IOException(
          "Job '" + job.getJobID() + "' rejected since user '" + user +  
          "' already has " + userWaitingJobs + " waiting jobs, " +
          userInitializingJobs + " initializing jobs and " +
          userRunningJobs + " running jobs - " +
          " Exceeds limit of " + maxJobsPerUserToAccept + " jobs to accept" +
          " in queue '" + queueName + "' per user");
    }

若有一个约束不满足,则抛出异常。否则将作业加入等待初始化队列。最后调用调度器的jobAdded方法通知调度器:

  // called when a job is added
  synchronized void jobAdded(JobInProgress job) throws IOException {
    CapacitySchedulerQueue queue = 
      queueInfoMap.get(job.getProfile().getQueueName());
    
    // Inform the queue
    queue.jobAdded(job);
    
    // setup scheduler specific job information
    preInitializeJob(job);
  }
首先获取队列,然后告诉队列有作业加入,并将队列中提交该作业的用户的作业数更新。最后依据配置计算每个任务需要的slot数目(容量调度器支持大内存作业)。

作业初始化线程的入口在JobInitializationPoller(以下简称poller)的run方法。

  public void run() {
    while (running) {
      try {
        cleanUpInitializedJobsList();
        selectJobsToInitialize();
        if (!this.isInterrupted()) {
          Thread.sleep(sleepInterval);
        }
      } catch (InterruptedException e) {
        LOG.error("Job Initialization poller interrupted"
            + StringUtils.stringifyException(e));
      }
    }
  }

在该方法中,首先从initializedJobs数据结构中清除一些作业,这些作业是正在运行且获得调度的作业或者是运行完成的作业。接着,调用selectJobsToInitialize方法来选择等待初始化的作业。具体过程如下,对于每个队列,首先选择该队列中处于waitingJobs列表中的作业:

ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);
选择的原则是:一看该作业是否已经初始化;若不是,二检查队列中作业总数(正在运行和正在初始化)和允许的活动任务数是否超过上限;若没有,检查提交该作业的用户是不是有过多的作业(正在运行和正在初始化)或活动的任务;若仍不是,则进一步检查作业是否处于PREP状态(没有被kill掉),然后放入筛选结果列表,并通知所在队列,将其放入initializingJobs列表。以上过程详见getJobsToInitialized方法的实现,这里不赘述。

下面获取一个分配给该队列的初始化线程,并将选择初始化的作业加入属于相应队列的调度列表中:

    JobInitializationThread t = threadsToQueueMap.get(queue);
      for (JobInProgress job : jobsToInitialize) {
        t.addJobsToQueue(queue, job);
    }
每个初始化线程维护了一个map(jobsPerQueue),通过队列名字可以找到由该线程初始化的队列的作业调度列表。
最后,我们来看初始化线程JobInitializationThread的run方法,该方法中不停地调用initializeJobs方法:

     void initializeJobs() {
      // while there are more jobs to initialize...
      while (currentJobCount.get() > 0) {
        Set<String> queues = jobsPerQueue.keySet();
        for (String queue : queues) {
          JobInProgress job = getFirstJobInQueue(queue);
          if (job == null) {
            continue;
          }
          LOG.info("Initializing job : " + job.getJobID() + " in Queue "
              + job.getProfile().getQueueName() + " For user : "
              + job.getProfile().getUser());
          if (startIniting) {
            setInitializingJob(job);
            ttm.initJob(job);
            setInitializingJob(null);
          } else {
            break;
          }
        }
      }
    }

从代码可见,获取队列中第一个作业,将其交给JobTracker的initJob初始化,初始化详细过程见前面的一系列文章。至此,容量调度器的作业初始化过程分析完毕。

作为该小节的结束,这里说一下每个队列中维护的几个数据结构:

    this.waitingJobs = 
      new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
    this.initializingJobs =
      new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
    this.runningJobs = 
      new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
正如名称暗示的那样,三个列表分别持有等待初始化的作业、正在初始化的作业和正在运行的作业。它们共有的参数为一个Comparator对象,用于定义列表中作业的顺序。
它的初始化如下:

    if (supportsPriorities) {
      // use the default priority-aware comparator
      comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;
    }
    else {
      comparator = STARTTIME_JOB_COMPARATOR;
    }
如果调度器支持优先级,则比较器对象初始化为FIFO调度器中的FIFO比较器,原则是首先比较优先级,再比较开始时间,最后比较作业ID。如果调度器不支持优先级,则初始化为开始时间比较器,即先来先服务。

初始化线程会从waitingJobs列表中选择要初始化的作业,被选择的作业会放入initializingJobs列表,初始化后得到调度的作业会进入runningJobs列表。有关作业的调度见下一小节。

容量调度器采用三层调度模型:首先选择一个队列,其次选择一个作业,最后选择作业的任务。任务选择由调度器的assignTasks方法完成,下面详述该方法。

首先调用下面方法更新各个队列的资源使用信息:

updateAllQueues(mapClusterCapacity, reduceClusterCapacity);
具体到每个队列中,调用队列的updateAll方法。

首先更新队列最新的map和reduce资源量:

    // Compute new capacities for maps and reduces
    mapSlots.updateCapacities(capacityPercent, maxCapacityPercent, 
        mapClusterCapacity);
    reduceSlots.updateCapacities(capacityPercent, maxCapacityPercent, 
        reduceClusterCapacity);
接下来将以下信息更新到每个作业的调度信息对象中:

    j.setSchedulingInfo(
          CapacityTaskScheduler.getJobQueueSchedInfo(numMapsRunningForThisJob, 
              numRunningMapSlots,
              numReservedMapSlotsForThisJob,
              numReducesRunningForThisJob, 
              numRunningReduceSlots,
              numReservedReduceSlotsForThisJob));
包括:作业正在运行的map和reduce作业数,作业正在使用的map和reduce资源数和为这个作业保留的map和reduce资源数。

然后将每个作业的资源使用信息反映到该作业所在队列的相关信息中:

    update(TaskType.MAP, j, j.getProfile().getUser(), 
          numMapsRunningForThisJob, numMapSlotsForThisJob);
    update(TaskType.REDUCE, j, j.getProfile().getUser(), 
          numReducesRunningForThisJob, numReduceSlotsForThisJob);
包括队列中正在运行的任务数,正在使用的资源量和用户使用的资源量等信息。

更新后,通过addMapTasks和addReduceTask两个方法调度任务:

    // schedule tasks
    List<Task> result = new ArrayList<Task>();
    addMapTasks(taskTracker, result, maxMapSlots, currentMapSlots);
    addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
关于队列和作业的优先级前面已经提到,这里关注任务的优先顺序。在addMapTasks方法中,调用CapacityScheduler的assignTasks方法:

    JobInProgress job = taskTracker.getJobForFallowSlot(type);
      if (job != null) {
        if (availableSlots >= job.getNumSlotsPerTask(type)) {
          // Unreserve 
          taskTracker.unreserveSlots(type, job);
          
          // We found a suitable job. Get task from it.
          if (type == TaskType.MAP) {
            // Don't care about locality!
            job.overrideSchedulingOpportunities();
          }
          return obtainNewTask(taskTrackerStatus, job, true);
        } else {
          // Re-reserve the current tasktracker
          taskTracker.reserveSlots(type, job, availableSlots);

          return TaskLookupResult.getMemFailedResult(); 
        }
      }
首先,判断TaskTracker是否正为某个作业预留资源(该作业为内存密集型,一个任务可能需要多个slot,上次调度没有足够的slot分配,故将其预留给该作业用于下次调度。这是容量调度器的大内存任务调度机制),若有预留,则判断当前可用的资源是否能满足该作业,若能则不再预留资源,并调用obtainNewTask方法将资源分配给该作业执行;若不能,继续将当前资源预留给该作业,并返回内存失败的结果。

如果TaskTracker没有为某个作业预留资源,对于队列集合中的每个队里,从中选择一个作业,并调用obtainNewTask方法获得一个任务。当遇到当前可用资源不能满足一个任务时,也要预留资源。注意,每次获取一个任务都会返回获取的状态,代码如下:

    for (CapacitySchedulerQueue queue : queuesForAssigningTasks) {
        //This call is for optimization if we are already over the
        //maximum-capacity we avoid traversing the queues.
        if (!queue.assignSlotsToQueue(type, 1)) {
          continue;
        }
        
        TaskLookupResult tlr = 
          getTaskFromQueue(taskTracker, availableSlots, queue, assignOffSwitch);
        TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();

        if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {
          continue; // Look in other queues.
        }

        // if we find a task, return
        if (lookUpStatus == TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND ||
            lookUpStatus == TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND) {
          return tlr;
        }
        // if there was a memory mismatch, return
        else if (lookUpStatus == 
          TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT) {
            return tlr;
        }
      }
最后来分析一下,获取任务的核心方法obtainNewTask:

    TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
                                   JobInProgress job, boolean assignOffSwitch) 
    throws IOException {
      ClusterStatus clusterStatus = 
        scheduler.taskTrackerManager.getClusterStatus();
      int numTaskTrackers = clusterStatus.getTaskTrackers();
      int numUniqueHosts = scheduler.taskTrackerManager.getNumberOfUniqueHosts();
      
      // Inform the job it is about to get a scheduling opportunity
      job.schedulingOpportunity();
      
      // First, try to get a 'local' task
      Task t = job.obtainNewNodeOrRackLocalMapTask(taskTracker,
                                                   numTaskTrackers,
                                                   numUniqueHosts);
      
      if (t != null) {
        return TaskLookupResult.getTaskFoundResult(t, job); 
      }
      
      // Next, try to get an 'off-switch' task if appropriate
      // Do not bother as much about locality for High-RAM jobs
      if (job.getNumSlotsPerMap() > 1 || 
          (assignOffSwitch && 
              job.scheduleOffSwitch(numTaskTrackers))) {
        t = 
          job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers, numUniqueHosts);
      }
      
      return (t != null) ? 
          TaskLookupResult.getOffSwitchTaskFoundResult(t, job) :
          TaskLookupResult.getNoTaskFoundResult();
    }
与FIFO调度器的实现类似,首先也要试图找到一个具有数据本地新的任务。若没找到,则分配一个内存密集型任务或off-switch的任务。具体的分配过程参见前面的文章对FIFO任务调度的分析(MapReduce任务调度与执行原理之任务调度)。若仍然没有找到,则返回没有找到结果。

如果获取到的任务数达到一次心跳返回的任务最大数量,则返回:

    if (tasks.size() >= maxTasksPerHeartbeat) {
        return;
      }

为了尽量提高任务的数据本地性,容量调度器采用了作业延迟调度机制:如果一个作业中未找到满足数据本地性的任务,则会让该作业跳过一定数目的机会,直到找到一个满足数据本地性的任务或到达跳过次数上限。

    if (job.getNumSlotsPerMap() > 1 || 
          (assignOffSwitch && 
              job.scheduleOffSwitch(numTaskTrackers))) {
        t = 
          job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers, numUniqueHosts);
      }
assignOffSwitch为true表示还未分配过不具有数据本地性的任务,scheduleOffSwitch用于判断作业是否到达跳过次数上限:

  public boolean scheduleOffSwitch(int numTaskTrackers) {
    long missedTaskTrackers = getNumSchedulingOpportunities();
    long requiredSlots = 
      Math.min((desiredMaps() - finishedMaps()), numTaskTrackers);
    
    return (requiredSlots  * localityWaitFactor) < missedTaskTrackers;
  }
localityWaitFactor表示作业输入数据所在结点数占结点总数的比例,requiredSlots表示作业还需要的资源数,二者的乘积来衡量跳过次数的上限,而missedTaskTrackers即为跳过次数。missedTaskTrackers每次分配任务时都会增加,如果分配到本地任务,则返回任务,该变量会重置为0;若没有分配到,则表示跳过一次。在分配到非本地性任务后跳过次数也会重置为0。

reduce任务的分配机制相对简单,只采用了大内存任务调度策略,调度器只要找到一个合适的reduce任务即返回,且没有延迟调度。至此,容量调度器任务调度分析结束。

下一篇文章计划学习Fair Scheduler。如有错误和问题,欢迎批评指正。

【1】《Hadoop技术内幕--深入解析MapReduce架构设计与实现原理》董西成 【2】  Hadoop 1.0.0 源码

2013年10月7日















相关内容