0. 简介

在设计复杂的运行程序时,我们经常需要创建一定数量的线程,然而很多时候线程不都是一直执行的,会存在一些线程处于空闲状态。所以通过线程池的方式,可以有效的对线程进行分配。若线程池中有空闲线程,则从线程池中取出一个空闲的线程处理该任务,任务处理完后,该线程被放到线程池中;若线程池中无空闲线程,则将任务放入任务队列等待线程池中有线程空闲,这样的处理方式可以避免线程在建立与销毁时存在的开销。
在这里插入图片描述

1. 基础知识

  1. 任务队列(task_queue_):每个线程的DoWork()线程空闲时都会通过反复读取该队列来获得任务,各线程通过互斥锁防止同时读取。
  2. 等待队列(tasks_not_ready_):尚未ready的任务队列,其其依赖的任务(dependent_tasks_)还没准备好,直到dependent_tasks_都完成了,主任务(Task)才能执行将tasks_not_ready转为task_queue_。
  3. 任务依赖(dependent_tasks_):dependent_tasks_都完成了,主任务(Task)才能执行

核心函数

创建线程池:

ThreadPool::ThreadPool(int num_threads) {
  absl::MutexLock locker(&mutex_);
  for (int i = 0; i != num_threads; ++i) {
    pool_.emplace_back([this]() { ThreadPool::DoWork(); });
  }
}

thread_pool::DoWork():

初始化的时候每个线程都执行该死循环函数ThreadPool::DoWork(),并直到析构才返回,只要task_queue有任务,就执行操作。

void ThreadPool::DoWork() {
#ifdef __linux__
  // This changes the per-thread nice level of the current thread on Linux. We
  // do this so that the background work done by the thread pool is not taking
  // away CPU resources from more important foreground threads.
  CHECK_NE(nice(10), -1);
#endif
  const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
    return !task_queue_.empty() || !running_;
  };
  for (;;) {
    std::shared_ptr<Task> task;
    {
      absl::MutexLock locker(&mutex_);
      mutex_.Await(absl::Condition(&predicate));
      if (!task_queue_.empty()) {
        task = std::move(task_queue_.front());
        task_queue_.pop_front();
      } else if (!running_) {
        return;
      }
    }
    CHECK(task);
    CHECK_EQ(task->GetState(), common::Task::DEPENDENCIES_COMPLETED);
    Execute(task.get());
  }
}

在初始化成功并拿到task_queue_后,该Task将会按以下几个状态顺序执行:

  enum State { NEW, DISPATCHED, DEPENDENCIES_COMPLETED, RUNNING, COMPLETED };
  • NEW(新建任务,还未schedule到线程池)

  • DISPATCHED(任务已经schedule到线程池)

  • DEPENDENCIES_COMPLETED(任务依赖已经执行完成)
  • RUNNING(任务执行中)
  • COMPLETED(任务完成)

task->SetWorkItem(task)

新建task实例,状态默认为NEW,然后通过task->SetWorkItem设置任务(示例中运行的函数为DrainWorkQueue)

  auto scan_matcher_task = absl::make_unique<common::Task>();
  scan_matcher_task->SetWorkItem(
      [&submap_scan_matcher, &scan_matcher_options]() {
        submap_scan_matcher.fast_correlative_scan_matcher =
            absl::make_unique<scan_matching::FastCorrelativeScanMatcher2D>(
                *submap_scan_matcher.grid, scan_matcher_options);
      });
  submap_scan_matcher.creation_task_handle =
      thread_pool_->Schedule(std::move(scan_matcher_task));

task2->AddDependency(task1)

该函数是可选的,有依赖的任务才需添加,其含义是task2依赖于task1,只有在task1执行完后,task2才能执行。

  auto constraint_task = absl::make_unique<common::Task>();
  constraint_task->SetWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
    ComputeConstraint(submap_id, submap, node_id, true, /* match_full_submap */
                      constant_data, transform::Rigid2d::Identity(),
                      *scan_matcher, constraint);
  });
  constraint_task->AddDependency(scan_matcher->creation_task_handle);
  auto constraint_task_handle =
      thread_pool_->Schedule(std::move(constraint_task));

thread_pool->Schedule(task)

将task赋给tasks_not_ready_并将task状态变为DISPATCHED,判断其依赖的任务是否加载完成,若完成则将状态置为DEPENDENCIES_COMPLETED,然后task加入task_queue_并从tasks_not_ready_移除等待线程执行任务;若依赖未完成,则等待依赖的task执行完。在cartographer中存在有不同的数据用于动态加载,可能存在有依赖,此时Schedule起到了层级的作用。

std::weak_ptr<Task> ThreadPool::Schedule(std::unique_ptr<Task> task) {
  std::shared_ptr<Task> shared_task;
  {
    std::lock_guard<std::mutex> lock(mutex_);
    auto insert_result =
        tasks_not_ready_.insert(std::make_pair(task.get(), std::move(task)));
    if (!insert_result.second) {
      return insert_result.first->second;
      // throw std::runtime_error("a task has been scheduled twice");
    }
    shared_task = insert_result.first->second;
  }
  SetThreadPool(shared_task.get());
  return shared_task;
}

task->Execute()

执行任务时将状态设为RUNNING,执行完任务时将状态设为COMPLETED。每个任务执行完都会检查其依赖的任务并将该任务依赖数减1,当依赖它的任务的依赖数减到0时,该任务会被加入task_queue_并从tasks_not_ready_移除等待线程执行任务。

void Task::Execute() {
  {
    absl::MutexLock locker(&mutex_);
    CHECK_EQ(state_, DEPENDENCIES_COMPLETED);
    state_ = RUNNING;
  }

  // Execute the work item.
  if (work_item_) {
    work_item_();
  }

  absl::MutexLock locker(&mutex_);
  state_ = COMPLETED;
  for (Task* dependent_task : dependent_tasks_) {
    dependent_task->OnDependenyCompleted();
  }
}

void Task::OnDependenyCompleted() {
  absl::MutexLock locker(&mutex_);
  CHECK(state_ == NEW || state_ == DISPATCHED);
  --uncompleted_dependencies_;
  if (uncompleted_dependencies_ == 0 && state_ == DISPATCHED) {
    state_ = DEPENDENCIES_COMPLETED;
    CHECK(thread_pool_to_notify_);
    thread_pool_to_notify_->NotifyDependenciesCompleted(this);
  }
}

void ThreadPool::NotifyDependenciesCompleted(Task* task) {
  absl::MutexLock locker(&mutex_);
  auto it = tasks_not_ready_.find(task);
  CHECK(it != tasks_not_ready_.end());
  task_queue_.push_back(it->second);
  tasks_not_ready_.erase(it);
}

参考链接

https://www.cnblogs.com/yangang92/p/5485868.html
https://blog.csdn.net/weixin_41245988/article/details/114441545
https://blog.csdn.net/windxf/article/details/109380520
https://www.cnblogs.com/heimazaifei/p/12435875.html