调度器#
TensorRT-LLM PyTorch 后端采用在途批处理(inflight batching),这是一种在每个 LLM 步骤中动态进行批处理和调度的机制。调度器被调用来确定当前步骤中调度哪些请求。
调度器介绍#
调度器有两种类型
CapacityScheduler
:这个调度器决定是否为每个活动请求分配资源。它考虑 KV 缓存容量以及其他适用资源。`CapacityScheduler` 的输入包括所有需要处理的活动请求。主要输出是fitting_requests
,表示在当前步骤已保留资源的请求。另一个输出是paused_requests
,用于支持 C++ 运行时的请求暂停功能。MicroBatchScheduler
:这个调度器从CapacityScheduler
选择的fitting_requests
中选取一些请求。另一个输入是inflight_request_ids
,它支持 C++ 运行时的流水线并行或重叠执行。由于 PyTorch Flow 不支持流水线并行,因此inflight_request_ids
是一个空集。输出是context_requests
和generation_requests
,它们是调度好的上下文和生成请求。不在这些列表中的请求不会被选中进行模型前向传递。
SimpleScheduler
结合了这两个调度器,首先使用 CapacityScheduler
,然后使用 MicroBatchScheduler
,以获得最终的调度结果。`SimpleScheduler` 的输入包括 active_requests
和 inflight_request_ids
,输出是 context_requests
、generation_requests
和 paused_requests
。
自定义您的调度器#
要自定义调度器或批处理机制,请通过继承相应的类来实现您自己的 CapacityScheduler
和 MicroBatchScheduler
。如果两步调度不是必需的,则直接继承 RequestScheduler
并实现 schedule_request
方法。
CapacityScheduler
的一个实现示例是 GuaranteedNoEvictScheduler
类,位于 scheduler.py 中。在 CapacityScheduler
的 C++ 绑定之前使用该类,最初采用基于 Python 的调度器。它继承 CapacityScheduler
并实现自己的 schedule_request
方法。此方法处理所有 active_requests
,并尝试调度更多可以容纳在 KV 缓存中的请求。资源估算应与 kv_cache_manager
中的资源分配和释放保持一致。
以下是代码片段
class GuaranteedNoEvictScheduler(CapacityScheduler):
# only schedule requests has no_schedule_until_state <= state < no_schedule_after_state
no_schedule_until_state = LlmRequestState.CONTEXT_INIT
no_schedule_after_state = LlmRequestState.GENERATION_COMPLETE
def __init__(self, max_num_requests: int, kv_cache_manager):
super(GuaranteedNoEvictScheduler, self).__init__()
self.max_num_requests = max_num_requests
self.kv_cache_manager = kv_cache_manager
def schedule_request(
self, active_requests: RequestList
) -> tuple[list[LlmRequest], list[LlmRequest]]:
scheduled_requests = []
pending_requests = []
reserved_blocks = 0
max_blocks = self.kv_cache_manager.get_max_resource_count()
for request in active_requests:
req_state = request.state
# if request cannot be scheduled yet or request should no longer be scheduled, skip
if req_state.value < self.no_schedule_until_state.value or req_state.value >= self.no_schedule_after_state.value:
continue
if len(scheduled_requests
) >= self.max_num_requests or reserved_blocks >= max_blocks:
break
elif req_state == LlmRequestState.GENERATION_IN_PROGRESS or req_state == LlmRequestState.GENERATION_TO_COMPLETE:
scheduled_requests.append(request)
reserved_blocks += self.kv_cache_manager.get_needed_resource_to_completion(
request)
else:
pending_requests.append(request)
avaiable_blocks = max_blocks - reserved_blocks
for request in pending_requests:
req_state = request.state
if len(scheduled_requests) >= self.max_num_requests:
break
elif req_state == LlmRequestState.CONTEXT_INIT:
needed_blocks = self.kv_cache_manager.get_needed_resource_to_completion(
request)
if needed_blocks <= avaiable_blocks:
scheduled_requests.append(request)
avaiable_blocks -= needed_blocks
elif needed_blocks > avaiable_blocks:
# If one requests fails to be scheduled, break
break
assert len(scheduled_requests) > 0, (
"no pending request can get enough resource to complete, "
"please increase KV cache pool size.")
return scheduled_requests, []
实现您自己的调度器后,将其集成到 PyExecutor 中。对于 PyTorch 后端,代码位于 py_executor_creator.py 中。在 create_pytorch_model_based_executor
函数中,有两行用于创建 CapacityScheduler
capacitor_scheduler = BindCapacityScheduler(max_num_requests,
kv_cache_manager.impl)
类似调整也可用于 MicroBatchScheduler
。这使得 PyExecutor
能够使用您自定义的调度逻辑执行。