调度器#

TensorRT-LLM PyTorch 后端采用在途批处理(inflight batching),这是一种在每个 LLM 步骤中动态进行批处理和调度的机制。调度器被调用来确定当前步骤中调度哪些请求。

调度器介绍#

调度器有两种类型

  • CapacityScheduler:这个调度器决定是否为每个活动请求分配资源。它考虑 KV 缓存容量以及其他适用资源。`CapacityScheduler` 的输入包括所有需要处理的活动请求。主要输出是 fitting_requests,表示在当前步骤已保留资源的请求。另一个输出是 paused_requests,用于支持 C++ 运行时的请求暂停功能。

  • MicroBatchScheduler:这个调度器从 CapacityScheduler 选择的 fitting_requests 中选取一些请求。另一个输入是 inflight_request_ids,它支持 C++ 运行时的流水线并行或重叠执行。由于 PyTorch Flow 不支持流水线并行,因此 inflight_request_ids 是一个空集。输出是 context_requestsgeneration_requests,它们是调度好的上下文和生成请求。不在这些列表中的请求不会被选中进行模型前向传递。

SimpleScheduler 结合了这两个调度器,首先使用 CapacityScheduler,然后使用 MicroBatchScheduler,以获得最终的调度结果。`SimpleScheduler` 的输入包括 active_requestsinflight_request_ids,输出是 context_requestsgeneration_requestspaused_requests

自定义您的调度器#

要自定义调度器或批处理机制,请通过继承相应的类来实现您自己的 CapacitySchedulerMicroBatchScheduler。如果两步调度不是必需的,则直接继承 RequestScheduler 并实现 schedule_request 方法。

CapacityScheduler 的一个实现示例是 GuaranteedNoEvictScheduler 类,位于 scheduler.py 中。在 CapacityScheduler 的 C++ 绑定之前使用该类,最初采用基于 Python 的调度器。它继承 CapacityScheduler 并实现自己的 schedule_request 方法。此方法处理所有 active_requests,并尝试调度更多可以容纳在 KV 缓存中的请求。资源估算应与 kv_cache_manager 中的资源分配和释放保持一致。

以下是代码片段

class GuaranteedNoEvictScheduler(CapacityScheduler):
    # only schedule requests has no_schedule_until_state <= state < no_schedule_after_state
    no_schedule_until_state = LlmRequestState.CONTEXT_INIT
    no_schedule_after_state = LlmRequestState.GENERATION_COMPLETE

    def __init__(self, max_num_requests: int, kv_cache_manager):
        super(GuaranteedNoEvictScheduler, self).__init__()
        self.max_num_requests = max_num_requests
        self.kv_cache_manager = kv_cache_manager

    def schedule_request(
        self, active_requests: RequestList
    ) -> tuple[list[LlmRequest], list[LlmRequest]]:
        scheduled_requests = []
        pending_requests = []
        reserved_blocks = 0
        max_blocks = self.kv_cache_manager.get_max_resource_count()
        for request in active_requests:
            req_state = request.state
            # if request cannot be scheduled yet or request should no longer be scheduled, skip
            if req_state.value < self.no_schedule_until_state.value or req_state.value >= self.no_schedule_after_state.value:
                continue

            if len(scheduled_requests
                   ) >= self.max_num_requests or reserved_blocks >= max_blocks:
                break
            elif req_state == LlmRequestState.GENERATION_IN_PROGRESS or req_state == LlmRequestState.GENERATION_TO_COMPLETE:
                scheduled_requests.append(request)
                reserved_blocks += self.kv_cache_manager.get_needed_resource_to_completion(
                    request)
            else:
                pending_requests.append(request)

        avaiable_blocks = max_blocks - reserved_blocks
        for request in pending_requests:
            req_state = request.state
            if len(scheduled_requests) >= self.max_num_requests:
                break
            elif req_state == LlmRequestState.CONTEXT_INIT:
                needed_blocks = self.kv_cache_manager.get_needed_resource_to_completion(
                    request)
                if needed_blocks <= avaiable_blocks:
                    scheduled_requests.append(request)
                    avaiable_blocks -= needed_blocks
                elif needed_blocks > avaiable_blocks:
                    # If one requests fails to be scheduled, break
                    break

        assert len(scheduled_requests) > 0, (
            "no pending request can get enough resource to complete, "
            "please increase KV cache pool size.")
        return scheduled_requests, []

实现您自己的调度器后,将其集成到 PyExecutor 中。对于 PyTorch 后端,代码位于 py_executor_creator.py 中。在 create_pytorch_model_based_executor 函数中,有两行用于创建 CapacityScheduler

    capacitor_scheduler = BindCapacityScheduler(max_num_requests,
                                                kv_cache_manager.impl)

类似调整也可用于 MicroBatchScheduler。这使得 PyExecutor 能够使用您自定义的调度逻辑执行。