diff --git a/benchmarks/benchmark_long_document_qa.py b/benchmarks/benchmark_long_document_qa.py new file mode 100644 index 0000000000000..8d4425b6fb8a5 --- /dev/null +++ b/benchmarks/benchmark_long_document_qa.py @@ -0,0 +1,258 @@ +""" +Benchmark the efficiency of prefix caching. + +This script allows you to benchmark the performance of +a model with prefix-caching or cpu-offloading using fixed prompts + +Fixed example usage: + # This command run the vllm with 50GB CPU memory for offloading + # The workload samples 8 different prompts with a default input + # length of 20010 tokens, then replicates each prompt 2 times. + python benchmark_long_document_qa.py \ + --model meta-llama/Llama-2-7b-chat-hf \ + --enable-prefix-caching \ + --block-allocator CpuOffloadingBlockAllocator \ + --num-documents 8 \ + --repeat-count 2 \ + --cpu-memory-gb 50 + +Commandline arguments: + + # Basic arguments + --model: The model to use for the benchmark. + + --enable-prefix-caching: Enable prefix caching or not. + + --block-allocator: The block allocator that vLLM uses. + - CpuGpuBlockAllocator: The default block allocator. + - CpuOffloadingBlockAllocator: The block allocator that supports + cpu offloading + + --gpu-memory-utilization: GPU memory utilization for vLLM. + + --cpu-memory-gb: The amount of CPU memory (GB) that is used by vLLM. + NOTE: CPU memory should be larger than GPU KV cache size when + using CpuOffloadingBlockAllocator. + + # Workload-related arguments + --num-documents: The number of documents to sample prompts from. + + --repeat-count: The number of times to repeat each prompt. + + # Other functionality + --seed: Random seed for reproducibility. + + --profile-swap-blocks: Profile the swap_blocks function in the custom ops. +""" + +import random +import time + +import torch + +from vllm import LLM, SamplingParams +from vllm.utils import FlexibleArgumentParser + +execution_times = {} + + +def build_result_dict(start_time, end_time, *args): + total_time = end_time - start_time + length = -1 + if len(args) > 1 and isinstance(args[1], torch.Tensor): + length = len(args[1]) + + return { + "start_time": start_time, + "total_time": total_time, + "swap_len": length + } + + +def timing_decorator(func): + + def wrapper(*args, **kwargs): + global execution_times + torch.cuda.synchronize() + start_time = time.time() # Record the start time + result = func(*args, **kwargs) # Call the wrapped function + torch.cuda.synchronize() + end_time = time.time() # Record the end time + if func.__name__ not in execution_times: + execution_times[func.__name__] = [] + + res = build_result_dict(start_time, end_time, *args) + execution_times[func.__name__].append(res) + return result # Return the result of the original function + + return wrapper + + +def process_timing_results(): + global execution_times + for key in execution_times: + len_to_time = {} + len_to_count = {} + for item in execution_times[key]: + swap_len = item["swap_len"] + if swap_len not in len_to_time: + len_to_time[swap_len] = 0 + len_to_time[swap_len] += item["total_time"] + + if swap_len not in len_to_count: + len_to_count[swap_len] = 0 + len_to_count[swap_len] += 1 + + for swap_len in len_to_time: + total_time = len_to_time[swap_len] + count = len_to_count[swap_len] + print(f"{key} on {swap_len} pages: " + f"{(count * swap_len) / total_time} pages per second") + + +def test_long_document_qa(llm=None, sampling_params=None, prompts=None): + + start_time = time.time() + llm.generate(prompts, sampling_params=sampling_params) + end_time = time.time() + print(f"cost time {end_time - start_time}") + + +def repeat_prompts(prompts, repeat_count): + repeated_prompts = prompts * repeat_count + random.shuffle(repeated_prompts) + return repeated_prompts + + +def main(args): + if args.profile_swap_blocks: + from vllm.worker.cache_engine import CacheEngine + CacheEngine.swap_out = timing_decorator(CacheEngine.swap_out) + CacheEngine.swap_in = timing_decorator(CacheEngine.swap_in) + + random.seed(args.seed) + + # append the document id at the beginning to avoid any of the document + # being the prefix of other documents + prompts = [ + str(i) + ' '.join(['hi'] * args.document_length) + for i in range(args.num_documents) + ] + + preemption_mode = "" + if args.block_allocator == "CpuOffloadingBlockAllocator": + preemption_mode = "recompute" + else: + preemption_mode = "swap" + + llm = LLM(model=args.model, + tokenizer_mode='auto', + trust_remote_code=True, + enforce_eager=True, + tensor_parallel_size=args.tensor_parallel_size, + enable_prefix_caching=args.enable_prefix_caching, + block_allocator=args.block_allocator, + preemption_mode=preemption_mode, + swap_space=args.cpu_memory_gb, + enable_chunked_prefill=False, + gpu_memory_utilization=args.gpu_memory_utilization, + max_model_len=30000) + + sampling_params = SamplingParams(temperature=0, max_tokens=args.output_len) + + prompts = repeat_prompts(prompts, args.repeat_count) + + print("------warm up------") + test_long_document_qa( + llm=llm, + prompts=prompts, + sampling_params=sampling_params, + ) + + random.shuffle(prompts) + + print("------start generating------") + test_long_document_qa( + llm=llm, + prompts=prompts, + sampling_params=sampling_params, + ) + + if args.profile_swap_blocks: + process_timing_results() + + +if __name__ == "__main__": + parser = FlexibleArgumentParser( + description= + 'Benchmark the performance with or without automatic prefix caching.') + parser.add_argument( + '--model', + type=str, + # this test aims to test long document QA capability, + # so we use llama 3.1 8B as it can process long context + default='meta-llama/Llama-3.1-8B') + parser.add_argument("--dataset-path", + type=str, + default=None, + help="Path to the dataset.") + parser.add_argument('--tensor-parallel-size', '-tp', type=int, default=1) + parser.add_argument('--output-len', type=int, default=10) + parser.add_argument('--enable-prefix-caching', + action='store_true', + help='enable prefix caching') + parser.add_argument('--repeat-count', + type=int, + default=2, + help='Number of times to repeat each prompt') + parser.add_argument( + '--document-length', + type=int, + # Roughly the number of tokens for a system paper, + # excluding images + default=20010, + help='Range of input lengths for sampling prompts,' + 'specified as "min:max" (e.g., "128:256").') + parser.add_argument('--num-documents', + type=int, + default=8, + help='Range of input lengths for sampling prompts,' + 'specified as "min:max" (e.g., "128:256").') + parser.add_argument("--seed", + type=int, + default=0, + help='Random seed for reproducibility') + parser.add_argument('--gpu-memory-utilization', + type=float, + default=0.9, + help='GPU memory utilization for vLLM. Should be a ' + 'float point number ranging from 0 to 1. For this ' + 'test please use a small value so that the GPU ' + 'cannot hold all KV caches of all documents, ' + 'and the effect of CPU offloading can be tested.') + parser.add_argument( + '--cpu-memory-gb', + type=float, + default=1, + help="The amount of CPU memory (GB) that is used by vLLM. Not very " + "useful for CpuGpuBlockAllocator, but useful for " + "CpuOffloadingBlockAllocator to have more CPU KV cache space") + parser.add_argument( + '--block-allocator', + type=str, + default='CpuGpuBlockAllocator', + choices=['CpuGpuBlockAllocator', 'CpuOffloadingBlockAllocator'], + help='The block allocator that vLLM uses. Currently' + ' can be CpuGpuBlockAllocator (the default) and ' + 'CpuOffloadingBlockAllocator (experimental) that ' + 'supports offloading the KV cache to CPU . ' + 'When using CpuOffloadingBlockAllocator, the ' + 'preemption mode must be recompute.') + + parser.add_argument( + '--profile-swap-blocks', + action='store_true', + help='Profile the swap_blocks function in the custom ops') + + args = parser.parse_args() + main(args) diff --git a/benchmarks/benchmark_prefix_caching.py b/benchmarks/benchmark_prefix_caching.py index 5e9381f712e10..9a8ecae7b65df 100644 --- a/benchmarks/benchmark_prefix_caching.py +++ b/benchmarks/benchmark_prefix_caching.py @@ -244,4 +244,4 @@ def main(args): parser = EngineArgs.add_cli_args(parser) args = parser.parse_args() - main(args) + main(args) \ No newline at end of file diff --git a/csrc/cache_kernels.cu b/csrc/cache_kernels.cu index 8a95279f9a25a..dea40c1904a26 100644 --- a/csrc/cache_kernels.cu +++ b/csrc/cache_kernels.cu @@ -11,6 +11,7 @@ #include "quantization/fp8/nvidia/quant_utils.cuh" #endif +#include #include #include #include @@ -21,8 +22,64 @@ typedef __hip_bfloat16 __nv_bfloat16; #endif -void swap_blocks(torch::Tensor& src, torch::Tensor& dst, - const torch::Tensor& block_mapping) { +namespace vllm { + +template +__global__ void paged_copy(T* __restrict__ dst, const T* __restrict__ src, + ACC_T src_to_dst, const int num_pages, + const int num_elements_per_page) { + const int64_t srcPageIdx = src_to_dst[blockIdx.x][0]; + const int64_t dstPageIdx = src_to_dst[blockIdx.x][1]; + + const int64_t srcPageOffset = srcPageIdx * num_elements_per_page; + const int64_t dstPageOffset = dstPageIdx * num_elements_per_page; + + for (int i = threadIdx.x; i < num_elements_per_page; i += blockDim.x) { + dst[dstPageOffset + i] = src[srcPageOffset + i]; + } +} + +} // namespace vllm + +template +void launch_swap_block_kernel(DTYPE* dst, const DTYPE* src, + const torch::Tensor& block_mapping, + const int num_blocks, + const int block_size_in_bytes) { + c10::cuda::CUDAGuard device_guard(block_mapping.device()); + auto block_mapping_accessor = + block_mapping.packed_accessor32(); + + int num_threads = 1024; + int grid_size = num_blocks; + + const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); + vllm::paged_copy<<>>( + dst, src, block_mapping_accessor, num_blocks, + block_size_in_bytes / DTYPE_LEN); +} + +template +T* get_kernel_ptr(torch::Tensor& tensor) { + // Get the kernel-accessible pointer of the given type T + // Returns NULL if the tensor is on CPU and non-pinned + torch::Device device = tensor.device(); + if (device.is_cuda()) { + return static_cast(tensor.data_ptr()); + } else if (device.is_cpu() && tensor.is_pinned()) { + T* ptr; + cudaHostGetDevicePointer((void**)&ptr, static_cast(tensor.data_ptr()), + 0); + return ptr; + } else if (device.is_cpu()) { + return NULL; + } else { + TORCH_CHECK(false, "Invalid device"); + } +} + +void swap_blocks_slow(torch::Tensor& src, torch::Tensor& dst, + const torch::Tensor& block_mapping) { torch::Device src_device = src.device(); torch::Device dst_device = dst.device(); cudaMemcpyKind memcpy_type; @@ -62,6 +119,41 @@ void swap_blocks(torch::Tensor& src, torch::Tensor& dst, } } +void swap_blocks(torch::Tensor& src, torch::Tensor& dst, + const torch::Tensor& block_mapping) { + int64_t* src_ptr = get_kernel_ptr(src); + int64_t* dst_ptr = get_kernel_ptr(dst); + if (src_ptr == NULL || dst_ptr == NULL) { + // fall back to the slow implementation + swap_blocks_slow(src, dst, block_mapping.cpu()); + } else { + // Check the device + torch::Device src_device = src.device(); + torch::Device dst_device = dst.device(); + torch::Device block_mapping_device = block_mapping.device(); + TORCH_CHECK(block_mapping_device.is_cuda(), "block_mapping must be on GPU"); + if (src_device.is_cuda() && dst_device.is_cuda()) { + TORCH_CHECK(src_device.index() == dst_device.index(), + "src and dst must be on the same GPU"); + } + if (src_device.is_cuda()) { + TORCH_CHECK(src_device.index() == block_mapping_device.index(), + "src and block_mapping must be on the same GPU"); + } + if (dst_device.is_cuda()) { + TORCH_CHECK(dst_device.index() == block_mapping_device.index(), + "src and block_mapping must be on the same GPU"); + } + + const int64_t num_blocks = block_mapping.size(0); + const int64_t block_size_in_bytes = src.element_size() * src[0].numel(); + + launch_swap_block_kernel<8, int64_t>(dst_ptr, (const int64_t*)src_ptr, + block_mapping, num_blocks, + block_size_in_bytes); + } +} + namespace vllm { // Grid: (num_layers, num_pairs) diff --git a/tests/core/block/test_cpu_offloading_block_allocator.py b/tests/core/block/test_cpu_offloading_block_allocator.py new file mode 100644 index 0000000000000..d4c8b1e37ff12 --- /dev/null +++ b/tests/core/block/test_cpu_offloading_block_allocator.py @@ -0,0 +1,146 @@ +import pytest + +from vllm.core.block.cpu_offloading_block_allocator import ( + CpuOffloadingBlockAllocator) +from vllm.utils import Device, chunk_list + + +@pytest.mark.parametrize("num_cpu_blocks", [1024]) +@pytest.mark.parametrize("num_gpu_blocks", [256]) +@pytest.mark.parametrize("block_size", [16]) +@pytest.mark.parametrize("allocator_type", ["prefix_caching"]) +def test_allocate_mutable_block(num_cpu_blocks: int, num_gpu_blocks: int, + block_size: int, allocator_type: str): + allocator = CpuOffloadingBlockAllocator.create( + allocator_type=allocator_type, + num_gpu_blocks=num_gpu_blocks, + num_cpu_blocks=num_cpu_blocks, + block_size=block_size, + ) + + assert allocator.get_num_free_blocks(Device.CPU) == num_cpu_blocks + assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks + + gpu_blocks = [ + allocator.allocate_mutable_block(prev_block=None, device=Device.GPU) + for _ in range(num_gpu_blocks) + ] + assert allocator.get_num_free_blocks(Device.CPU) == num_cpu_blocks + assert allocator.get_num_free_blocks(Device.GPU) == 0 + assert len(allocator._uncached_blocks) == num_gpu_blocks + + blocks_to_swap_out, blocks_to_swap_in = allocator.get_and_reset_swaps() + allocator.access_cpu_hit_blocks(0.0) + assert len(blocks_to_swap_out) == 0 + assert len(blocks_to_swap_in) == 0 + assert len(allocator._uncached_blocks) == num_gpu_blocks + + _ = [allocator.free(block) for block in gpu_blocks] + assert allocator.get_num_free_blocks(Device.CPU) == num_cpu_blocks + assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks + + blocks_to_swap_out, blocks_to_swap_in = allocator.get_and_reset_swaps() + allocator.access_cpu_hit_blocks(1.0) + assert len(blocks_to_swap_out) == 0 + assert len(blocks_to_swap_in) == 0 + assert len(allocator._uncached_blocks) == 0 + + +@pytest.mark.parametrize("num_cpu_blocks", [1024]) +@pytest.mark.parametrize("num_gpu_blocks", [256]) +@pytest.mark.parametrize("block_size", [2]) +@pytest.mark.parametrize("allocator_type", ["prefix_caching"]) +def test_allocate_immutable_block(num_cpu_blocks: int, num_gpu_blocks: int, + block_size: int, allocator_type: str): + allocator = CpuOffloadingBlockAllocator.create( + allocator_type=allocator_type, + num_gpu_blocks=num_gpu_blocks, + num_cpu_blocks=num_cpu_blocks, + block_size=block_size, + ) + + unique_token_ids = list( + range((num_cpu_blocks + num_gpu_blocks) * block_size)) + gpu_token_ids = list( + chunk_list(unique_token_ids[:num_gpu_blocks * block_size], block_size)) + gpu_token_ids2 = list( + chunk_list( + unique_token_ids[num_gpu_blocks * block_size:2 * num_gpu_blocks * + block_size], block_size)) + + gpu_blocks = [ + allocator.allocate_immutable_block(prev_block=None, + token_ids=token_ids, + device=Device.GPU) + for token_ids in gpu_token_ids + ] + + assert allocator.get_num_free_blocks(Device.CPU) == num_cpu_blocks + assert allocator.get_num_free_blocks(Device.GPU) == 0 + assert len(allocator._uncached_blocks) == num_gpu_blocks + + blocks_to_swap_out, blocks_to_swap_in = allocator.get_and_reset_swaps() + allocator.access_cpu_hit_blocks(0.0) + assert len(blocks_to_swap_out) == num_gpu_blocks + assert len(blocks_to_swap_in) == 0 + assert len(allocator._uncached_blocks) == 0 + + allocator.mark_blocks_as_computed([block.block_id for block in gpu_blocks]) + blocks_to_swap_out, blocks_to_swap_in = allocator.get_and_reset_swaps() + allocator.access_cpu_hit_blocks(1.0) + assert len(blocks_to_swap_out) + len(blocks_to_swap_in) == 0 + assert len(allocator._uncached_blocks) == 0 + + _ = [allocator.free(block) for block in gpu_blocks] + assert allocator.get_num_free_blocks(Device.CPU) == num_cpu_blocks + assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks + + blocks_to_swap_out, blocks_to_swap_in = allocator.get_and_reset_swaps() + allocator.access_cpu_hit_blocks(1.0) + assert len(blocks_to_swap_out) == 0 + assert len(blocks_to_swap_in) == 0 + assert len(allocator._uncached_blocks) == 0 + + # allocate another gpu sequence to flush out the GPU cache + gpu_blocks = [ + allocator.allocate_immutable_block(prev_block=None, + token_ids=token_ids, + device=Device.GPU) + for token_ids in gpu_token_ids2 + ] + + assert allocator.get_num_free_blocks(Device.CPU) == num_cpu_blocks + assert allocator.get_num_free_blocks(Device.GPU) == 0 + assert all([ + not allocator._allocators[Device.GPU].block_is_computed(block.block_id) + for block in gpu_blocks + ]) + + _ = [allocator.free(block) for block in gpu_blocks] + assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks + + blocks_to_swap_out, blocks_to_swap_in = allocator.get_and_reset_swaps() + allocator.access_cpu_hit_blocks(2.0) + assert len(blocks_to_swap_out) == 0 + assert len(blocks_to_swap_in) == 0 + assert len(allocator._uncached_blocks) == 0 + + # allocate original gpu sequence. It should hit CPU cache. + gpu_blocks = [ + allocator.allocate_immutable_block(prev_block=None, + token_ids=token_ids, + device=Device.GPU) + for token_ids in gpu_token_ids + ] + + delta = num_cpu_blocks - num_gpu_blocks + assert allocator.get_num_free_blocks(Device.CPU) == delta + assert allocator.get_num_free_blocks(Device.GPU) == 0 + assert all([ + allocator._allocators[Device.GPU].block_is_computed(block.block_id) + for block in gpu_blocks + ]) + + blocks_to_swap_out, blocks_to_swap_in = allocator.get_and_reset_swaps() + allocator.access_cpu_hit_blocks(3.0) + assert allocator.get_num_free_blocks(Device.CPU) == num_cpu_blocks diff --git a/tests/kernels/test_cache.py b/tests/kernels/test_cache.py index 40550ed51e2c7..ef90c36dd81ab 100644 --- a/tests/kernels/test_cache.py +++ b/tests/kernels/test_cache.py @@ -362,7 +362,7 @@ def test_swap_blocks( block_mapping = list(zip(src_blocks, dst_blocks)) block_mapping_tensor = torch.tensor(block_mapping, dtype=torch.int64, - device="cpu").view(-1, 2) + device=device).view(-1, 2) # Create the KV caches on the first device. src_key_caches, src_value_caches = kv_cache_factory( diff --git a/tests/kernels/test_encoder_decoder_attn.py b/tests/kernels/test_encoder_decoder_attn.py index d943b048b7934..c46b02194653a 100644 --- a/tests/kernels/test_encoder_decoder_attn.py +++ b/tests/kernels/test_encoder_decoder_attn.py @@ -625,7 +625,7 @@ def _run_encoder_attention_test( attn_type = AttentionType.ENCODER packed_qkv = encoder_test_params.packed_qkvo.packed_qkv assert packed_qkv is not None - with set_forward_context(attn_metadata, vllm_config): + with set_forward_context({"attn_metadata": attn_metadata}, vllm_config): # In the test setup the shape of the query is # [batch_size, seq_len, num_heads, head_size]. However # the attention backend expect the shape to be @@ -680,7 +680,7 @@ def _run_decoder_self_attention_test( kv_cache = test_rsrcs.kv_cache packed_qkv = decoder_test_params.packed_qkvo.packed_qkv assert packed_qkv is not None - with set_forward_context(attn_metadata, vllm_config): + with set_forward_context({"attn_metadata": attn_metadata}, vllm_config): # In the test setup the shape of the query is # [batch_size, seq_len, num_heads, head_size]. However # the attention backend expect the shape to be @@ -752,7 +752,7 @@ def _run_encoder_decoder_cross_attention_test( cross_pckd_qkv = cross_test_params.packed_qkvo.packed_qkv key = (None if cross_pckd_qkv is None else cross_pckd_qkv.key) value = (None if cross_pckd_qkv is None else cross_pckd_qkv.value) - with set_forward_context(attn_metadata, vllm_config): + with set_forward_context({"attn_metadata": attn_metadata}, vllm_config): # In the test setup the shape of the query is # [batch_size, seq_len, num_heads, head_size]. However # the attention backend expect the shape to be diff --git a/vllm/attention/layer.py b/vllm/attention/layer.py index 05d997279893b..70910393fdb2d 100644 --- a/vllm/attention/layer.py +++ b/vllm/attention/layer.py @@ -130,15 +130,31 @@ def forward( attn_type: str = AttentionType.DECODER, ) -> torch.Tensor: + forward_context: ForwardContext = get_forward_context() + cache_engine = None + worker_input = None + if forward_context.dynamic_forward_context is not None: + cache_engine = \ + forward_context.dynamic_forward_context.get("cache_engine") + worker_input= \ + forward_context.dynamic_forward_context.get("worker_input") + if self.use_direct_call: - return self.impl.forward(query, - key, - value, - kv_cache, - attn_metadata, - self._k_scale, - self._v_scale, - attn_type=attn_type) + if (cache_engine is not None): + assert (worker_input is not None) + cache_engine.swap_in_sync(worker_input.running_sequence_ids) + ret = self.impl.forward(query, + key, + value, + kv_cache, + attn_metadata, + self._k_scale, + self._v_scale, + attn_type=attn_type) + if (cache_engine is not None): + assert (worker_input is not None) + cache_engine.swap_out(worker_input.blocks_to_swap_out) + return ret elif self.use_output: output = torch.empty_like(query) hidden_size = query.size(-1) @@ -151,14 +167,26 @@ def forward( key = key.view(-1, self.num_kv_heads, self.head_size) if value is not None: value = value.view(-1, self.num_kv_heads, self.head_size) + if (cache_engine is not None): + assert (worker_input is not None) + cache_engine.swap_in_sync(worker_input.running_sequence_ids) torch.ops.vllm.unified_attention_with_output( query, key, value, output, kv_cache, attn_type, self.layer_name) + if (cache_engine is not None): + assert (worker_input is not None) + cache_engine.swap_out(worker_input.blocks_to_swap_out) return output.view(-1, hidden_size) else: - return torch.ops.vllm.unified_attention(query, key, value, - kv_cache, attn_type, - self.layer_name) + if (cache_engine is not None): + assert (worker_input is not None) + cache_engine.swap_in_sync(worker_input.running_sequence_ids) + ret = torch.ops.vllm.unified_attention(query, key, value, kv_cache, + attn_type, self.layer_name) + if (cache_engine is not None): + assert (worker_input is not None) + cache_engine.swap_out(worker_input.blocks_to_swap_out) + return ret def extra_repr(self) -> str: s = f"head_size={self.impl.head_size}" # type: ignore @@ -240,7 +268,10 @@ def unified_attention( layer_name: str, ) -> torch.Tensor: forward_context: ForwardContext = get_forward_context() - attn_metadata = forward_context.dynamic_forward_context + attn_metadata = None + if forward_context.dynamic_forward_context is not None: + attn_metadata = \ + forward_context.dynamic_forward_context.get("attn_metadata") self = forward_context.static_forward_context[layer_name] return self.impl.forward(query, key, @@ -282,7 +313,10 @@ def unified_attention_with_output( layer_name: str, ) -> None: forward_context: ForwardContext = get_forward_context() - attn_metadata = forward_context.dynamic_forward_context + attn_metadata = None + if forward_context.dynamic_forward_context is not None: + attn_metadata = \ + forward_context.dynamic_forward_context.get("attn_metadata") self = forward_context.static_forward_context[layer_name] self.impl.forward(query, key, diff --git a/vllm/config.py b/vllm/config.py index 2d9a76fe7ddb1..029bbbc7cc32b 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -833,6 +833,7 @@ def __init__( sliding_window: Optional[int] = None, enable_prefix_caching: bool = False, cpu_offload_gb: float = 0, + block_allocator: str = "CpuGpuBlockAllocator", ) -> None: self.block_size = block_size self.gpu_memory_utilization = gpu_memory_utilization @@ -843,6 +844,7 @@ def __init__( self.sliding_window = sliding_window self.enable_prefix_caching = enable_prefix_caching self.cpu_offload_gb = cpu_offload_gb + self.block_allocator = block_allocator self._verify_args() self._verify_cache_dtype() @@ -863,6 +865,13 @@ def _verify_args(self) -> None: "GPU memory utilization must be less than 1.0. Got " f"{self.gpu_memory_utilization}.") + if self.block_allocator not in [ + "CpuGpuBlockAllocator", "CpuOffloadingBlockAllocator" + ]: + raise ValueError( + "Only CpuGpuBlockAllocator and CpuOffloadingBlockAllocator is " + f"supported. Got {self.block_allocator}.") + def _verify_cache_dtype(self) -> None: if self.cache_dtype == "auto": pass diff --git a/vllm/core/block/cpu_gpu_block_allocator.py b/vllm/core/block/cpu_gpu_block_allocator.py index 3197af3c2b7a4..337899a272fb1 100644 --- a/vllm/core/block/cpu_gpu_block_allocator.py +++ b/vllm/core/block/cpu_gpu_block_allocator.py @@ -322,17 +322,26 @@ def get_prefix_cache_hit_rate(self, device: Device) -> float: assert device in self._allocators return self._allocators[device].get_prefix_cache_hit_rate() - def get_and_reset_swaps(self) -> List[Tuple[int, int]]: + def get_and_reset_swaps(self) -> Tuple[List[Tuple[int, int]], ...]: """Returns and clears the mapping of source to destination block IDs. Will be called after every swapping operations for now, and after every schedule when BlockManagerV2 become default. Currently not useful. + + Args: + now (float): The time stamp. Returns: - List[Tuple[int, int]]: A mapping of source to destination block IDs. + A tuple of two lists: (blocks_to_swap_out, blocks_to_swap_in). + Each list is a List[Tuple[int, int]], containing the mapping of + source to destination block IDs. The block IDs are physical block + IDs and it's expected to be used by the cache engine directly. """ - mapping = self._swap_mapping.copy() self._swap_mapping.clear() - return list(mapping.items()) + # return an empty list, to keep compatibility with previous behavior + return [], [] + + def access_cpu_hit_blocks(self, now: float) -> None: + pass def find_cached_blocks_prefix( self, diff --git a/vllm/core/block/cpu_offloading_block_allocator.py b/vllm/core/block/cpu_offloading_block_allocator.py new file mode 100644 index 0000000000000..5ecec5114222f --- /dev/null +++ b/vllm/core/block/cpu_offloading_block_allocator.py @@ -0,0 +1,394 @@ +"""This file implement a block allocator that supports CPU KV cache offloading + +The key idea of this implementation is to maintain those allocated blocks +that didn't hit the cache, and constantly copy them into CPU after each +scheduler step. + +This idea is borrowed from ConServe +(paper link: https://arxiv.org/abs/2410.01228), based on the assumption +that the CPU-GPU bandwidth is much higher than GPU KV cache generation +throughput. Thanks Yifan for this idea. + +This implementation also allows vLLM to gracefully handle preemption by +recomputation. +""" +from collections import deque +from typing import Deque, Dict, List, Optional, Tuple + +from vllm.core.block.cpu_gpu_block_allocator import CpuGpuBlockAllocator +from vllm.core.block.interfaces import Block, DeviceAwareBlockAllocator +from vllm.core.block.prefix_caching_block import PrefixCachingBlockAllocator +from vllm.utils import Device + + +class CpuOffloadingBlockAllocator(CpuGpuBlockAllocator): + """A block allocator that supports CPU KV cache offloading + + This class extends the `CpuGpuBlockAllocator` so that the CPU can be used + for prefix caching. + + It will internally maintain uncached blocks, and trying to copy uncached + blocks into CPU upon the end of scheduler step (i.e. calling + `get_and_reset_swaps`). + + This implementation also allows vLLM to gracefully handle preemption by + recomputation. + """ + + allocators: Dict[Device, PrefixCachingBlockAllocator] + + @staticmethod + def create( + allocator_type: str, + num_gpu_blocks: int, + num_cpu_blocks: int, + block_size: int, + ) -> DeviceAwareBlockAllocator: + """Initiate CpuOffloadingBlockAllocator. Similar to + CpuGpuBlockAllocator.create() but only support prefix caching + + Args: + allocator_type (str): The type of block allocator to use for CPU + and GPU blocks. Currently supported values are "naive" and + "prefix_caching". + num_gpu_blocks (int): The number of blocks to allocate for GPU + memory. + num_cpu_blocks (int): The number of blocks to allocate for CPU + memory. + block_size (int): The size of each block in number of tokens. + + Returns: + DeviceAwareBlockAllocator: A CpuOffloadingBlockAllocator instance + with the specified configuration. + + Notes: + - The block IDs are assigned contiguously, with GPU block IDs coming + before CPU block IDs. + """ + assert num_gpu_blocks < num_cpu_blocks, "CPU offloading block "\ + "allocator requires the allocated CPU memory capacity to be larger"\ + " than GPU memory capacity." + block_ids = list(range(num_gpu_blocks + num_cpu_blocks)) + gpu_block_ids = block_ids[:num_gpu_blocks] + cpu_block_ids = block_ids[num_gpu_blocks:] + + assert allocator_type == "prefix_caching", "CpuOffloadingBlock"\ + "Allocator should be only used together with prefix caching." + + # prefix caching block is now the default. + gpu_allocator = PrefixCachingBlockAllocator( + num_blocks=num_gpu_blocks, + block_size=block_size, + block_ids=gpu_block_ids, + ) + + cpu_allocator = PrefixCachingBlockAllocator( + num_blocks=num_cpu_blocks, + block_size=block_size, + block_ids=cpu_block_ids, + ) + + return CpuOffloadingBlockAllocator( + cpu_block_allocator=cpu_allocator, + gpu_block_allocator=gpu_allocator, + ) + + def __init__(self, cpu_block_allocator: PrefixCachingBlockAllocator, + gpu_block_allocator: PrefixCachingBlockAllocator): + assert not ( + cpu_block_allocator.all_block_ids + & gpu_block_allocator.all_block_ids + ), "cpu and gpu block allocators can't have intersection of block ids" + + super().__init__(cpu_block_allocator, gpu_block_allocator) + self._allocators: Dict[Device, + PrefixCachingBlockAllocator] = { # type: ignore + Device.CPU: cpu_block_allocator, + Device.GPU: gpu_block_allocator + } + """ + GPU block should only be in one of the following three status: + uncached: allocated blocks that didn't hit any cache + cached: allocated blocks that are cached, either in GPU or in CPU + free: the blocks are not allocated by block allocator + This implementation aims to transform uncacherd blocks to cached blocks + by performing GPU to CPU copy when calling `get_and_reset_swaps` + + As block allocator will automatically track free blocks, and we don't + need to specially handle cached blocks. So we only track uncached blocks + """ + self._uncached_blocks: Deque[Block] = deque() + """ + We probe CPU cache hit by trying to allocate a CPU + block and see if it is computed. + If we hit the CPU cache, we cannot free this CPU block until the end + of scheduler step, in order to avoid the CPU cache being overwritten. + so we track the cpu blocks we allocated, and free it after scheduler + step (i.e. calling `get_and_reset_swaps`). + """ + self._allocated_cpu_blocks: Deque[Block] = deque() + + self.num_gpu_blocks = gpu_block_allocator.get_num_total_blocks() + self.num_cpu_blocks = cpu_block_allocator.get_num_total_blocks() + + def allocate_mutable_block(self, prev_block: Optional[Block], + device: Device) -> Block: + """Allocates a new mutable block on the specified device. + + Args: + prev_block (Optional[Block]): The previous block to in the sequence. + Used for prefix hashing. + device (Device): The device on which to allocate the new block. + + Returns: + Block: The newly allocated mutable block. + """ + assert device == Device.GPU, "Calls to CPU offloading block allocator "\ + "should always use Device.GPU --- CPU offloading block allocator "\ + "handles CPU offloading internally."\ + # mark this block as uncached + + block = self._allocators[device].allocate_mutable_block(prev_block) + self._uncached_blocks.append(block) + return block + + def allocate_immutable_blocks(self, prev_block: Optional[Block], + block_token_ids: List[List[int]], + device: Device) -> List[Block]: + """Allocates a new group of immutable blocks with the provided block + token IDs on the specified device. + + Args: + prev_block (Optional[Block]): The previous block in the sequence. + Used for prefix hashing. + block_token_ids (List[int]): The list of block token IDs to be + stored in the new blocks. + device (Device): The device on which to allocate the new block. + + Returns: + List[Block]: The newly allocated list of immutable blocks + containing the provided block token IDs. + """ + assert device == Device.GPU, "Calls to CPU offloading block allocator "\ + "should always use Device.GPU --- CPU offloading block allocator"\ + "handles CPU offloading internally." + + # repeatedly call allocate_immutable_block + # because it handles CPU-GPU offloading related logics. + blocks = [] + for token_ids in block_token_ids: + prev_block = self.allocate_immutable_block(prev_block=prev_block, + token_ids=token_ids, + device=device) + blocks.append(prev_block) + return blocks + + def allocate_immutable_block(self, prev_block: Optional[Block], + token_ids: List[int], + device: Device) -> Block: + """Allocates a new immutable block with the provided token IDs on the + specified device. + + Args: + prev_block (Optional[Block]): The previous block in the sequence. + Used for prefix hashing. + token_ids (List[int]): The list of token IDs to be stored in the new + block. + device (Device): The device on which to allocate the new block. + + Returns: + Block: The newly allocated immutable block containing the provided + token IDs. + """ + + assert device == Device.GPU, "Calls to CPU offloading block allocator"\ + " should always use Device.GPU --- CPU offloading block allocator"\ + " handles CPU offloading internally." + + # allocate a GPU block + block = self._allocators[device].allocate_immutable_block( + prev_block, token_ids) + block_id = block.block_id + assert block_id is not None + block_computed = self._allocators[device].block_is_computed(block_id) + + # deal with prefix caching, three cases in total: + # 1. cache hit on GPU + # 2. no cache hit on GPU but cache hit on CPU + # 3. no cache hit + if block_computed: + # cache hit on GPU, no need to put it into uncached blocks + pass + else: + # check if we can hit cache on CPU by trying to allocate CPU block + cpu_block = self._allocators[Device.CPU].allocate_immutable_block( + prev_block, token_ids) + cpu_block_id = cpu_block.block_id + assert cpu_block_id is not None + cpu_block_computed = self._allocators[ + Device.CPU].block_is_computed(cpu_block_id) + if cpu_block_computed: + # CPU cache hit + # mark the GPU block as computed + self._allocators[Device.GPU].mark_blocks_as_computed( + [block_id]) + # copy the CPU cache to GPU + self._swap_mapping[cpu_block_id] = block_id + # and don't free this block until `get_and_reset_swap` is called + self._allocated_cpu_blocks.append(cpu_block) + else: + # No cache hit + # mark the GPU block as uncached + self._uncached_blocks.append(block) + # and free cpu block + self._allocators[Device.CPU].free(cpu_block) + + return block + + def swap(self, blocks: List[Block], src_device: Device, + dst_device: Device) -> Dict[int, int]: + + raise NotImplementedError("CPU offloading block allocator only " + "support preemption by recomputation.") + + def _is_gpu_block(self, block_id: int) -> bool: + return block_id in self._allocators[Device.GPU].all_block_ids + + def _is_gpu_block_unsafe(self, block_id: int) -> bool: + """Faster version of `_is_gpu_block` that doesn't check the block ID. + But assumes the that the block IDs are assigned contiguously, with GPU + block IDs coming before the CPU block IDs. + """ + return block_id < self.num_gpu_blocks + + def _get_physical_block_id_unsafe(self, block_id: int) -> int: + """Returns the physical block ID of the given block ID. + + This function avoids using the `allocator.get_physical_block_id()` + which is slow (O(NlogN)). Instead, this is based on the assumption + that the block IDs are assigned contiguously, with GPU block IDs coming + before CPU block IDs. + + Args: + block_id (int): The block ID to get the physical block ID of. + + Returns: + int: The physical block ID of the given block ID. + + Note: + Please see the implementation of + `CpuOffloadingBlockAllocator.create` for how the block IDs are + assigned. + """ + if self._is_gpu_block_unsafe(block_id): + return block_id + else: + return block_id - self.num_gpu_blocks + + def get_and_reset_swaps(self) -> Tuple[List[Tuple[int, int]], ...]: + """Returns and clears the mapping of source to destination block IDs. + Will be called right before scheduler step finishes. + + This function will do the following things: + 1. Iterate over uncached blocks and see if we can copy it to CPU + 2. Update all allocated CPU block time stamp + 3. Free CPU blocks + 4. Return and clear all swapping status + + Returns: + A tuple of two lists: (blocks_to_swap_out, blocks_to_swap_in). + Each list is a List[Tuple[int, int]], containing the mapping of + source to destination block IDs. The block IDs are physical block + IDs and it's expected to be used by the cache engine directly. + """ + + allocator = self._allocators[Device.GPU] + cpu_allocator = self._allocators[Device.CPU] + + new_uncached_blocks: Deque[Block] = deque() + + # XXX(lixiaobai09): may slow for each request to iterate over all? + while self._uncached_blocks: + block = self._uncached_blocks.pop() + block_id = block.block_id + + # check if this block is freed + if block_id is None: + # this block is already freed, no longer need to copy it to CPU + continue + + refcount = allocator._refcounter.get(block_id) + assert refcount > 0, "A freed block should have block_id None" + + # check if this block is computed + computed = allocator.block_is_computed(block_id) + # This block is computed or immutable, copy it to CPU + if computed or \ + (block.content_hash is not None): + # allocate a block on CPU + cpu_block = cpu_allocator.allocate_immutable_block( + prev_block=block.prev_block, token_ids=block.token_ids) + assert cpu_block.block_id is not None + self._allocated_cpu_blocks.append(cpu_block) + + # mark CPU block as computed + cpu_allocator.mark_blocks_as_computed([cpu_block.block_id]) + + # copy the GPU block to CPU + assert cpu_block.block_id is not None + self._swap_mapping[block_id] = cpu_block.block_id + + continue + + # this block is neither freed nor computed + # keep marking it as uncached + new_uncached_blocks.append(block) + + # update uncached blocks + self._uncached_blocks = new_uncached_blocks + + # populate the swap_out list and swap_in list + blocks_to_swap_out = [] + blocks_to_swap_in = [] + for src, dst in self._swap_mapping.items(): + # only two possible cases: CPU -> GPU, or GPU -> CPU + #if src in self._allocators[Device.GPU].all_block_ids: + if self._is_gpu_block_unsafe(src): + # swap out + src = self._get_physical_block_id_unsafe(src) + dst = self._get_physical_block_id_unsafe(dst) + blocks_to_swap_out.append((src, dst)) + else: + # swap in + src = self._get_physical_block_id_unsafe(src) + dst = self._get_physical_block_id_unsafe(dst) + blocks_to_swap_in.append((src, dst)) + self._swap_mapping.clear() + return blocks_to_swap_out, blocks_to_swap_in + + def access_cpu_hit_blocks(self, now: float) -> None: + ''' + Args: + now (float): The time stamp used to update CPU access time, so + that CPU evictor can work. + ''' + + # iterate over allocated CPU blocks, update access time and free them + # need to update access time so that CPU evictor can work + cpu_allocator = self._allocators[Device.CPU] + while self._allocated_cpu_blocks: + cpu_block = self._allocated_cpu_blocks.pop() + assert cpu_block.block_id is not None + # update the access time + cpu_allocator.mark_blocks_as_accessed([cpu_block.block_id], now) + # free the block + cpu_allocator.free(cpu_block) + + def will_swap_in_cpu_blocks(self): + """Check if there are CPU blocks that will be swapped in + + Returns: + bool: True if there are CPU blocks that will be swapped in, False + otherwise. + """ + return bool(self._swap_mapping) diff --git a/vllm/core/block/interfaces.py b/vllm/core/block/interfaces.py index 06f4851af3466..61a17f7f11063 100644 --- a/vllm/core/block/interfaces.py +++ b/vllm/core/block/interfaces.py @@ -290,3 +290,22 @@ def find_cached_blocks_prefix( device: Device = Device.GPU, ) -> List[int]: pass + + @abstractmethod + def get_and_reset_swaps(self) -> Tuple[List[Tuple[int, int]], ...]: + """Returns and clears the mapping of source to destination block IDs. + Will be called after every swapping operations for now, and after every + schedule when BlockManagerV2 become default. Currently not useful. + + Returns: + A tuple of two lists: (blocks_to_swap_out, blocks_to_swap_in). + Each list is a List[Tuple[int, int]], containing the mapping of + source to destination block IDs. The block IDs are physical block + IDs and it's expected to be used by the cache engine directly. + """ + pass + + @abstractmethod + def access_cpu_hit_blocks(self, now: float) -> None: + """Access cache hit blocks on CPU to update last accessed time.""" + pass diff --git a/vllm/core/block/prefix_caching_block.py b/vllm/core/block/prefix_caching_block.py index b736167f6ceb4..2a4b55585e7a9 100644 --- a/vllm/core/block/prefix_caching_block.py +++ b/vllm/core/block/prefix_caching_block.py @@ -907,6 +907,8 @@ def __init__( # `get_num_cached_tokens` for more details. self._seq_id_to_num_tokens_computed: Dict[int, int] = {} + self._seq_id_has_cpu_blocks: Set[int] = set() + def _update_seq_hashes(self, seq: Sequence) -> None: """Incrementally update the sequence's block hashes and record them.""" assert self._enable_caching @@ -962,7 +964,8 @@ def get_num_cached_tokens(self, seq: Sequence) -> int: # TODO(rickyx): This hack could be removed once we mark blocks as # computed correctly with chunked prefills. - if num_computed_tokens_prev is not None and seq.is_prefill(): + if num_computed_tokens_prev is not None and seq.is_prefill() \ + and seq.seq_id not in self._seq_id_has_cpu_blocks: # For a sequence that is still in prefill, we don't # recompute the number of cached tokens. # This also handles correctly chunked prefill since currently @@ -980,6 +983,14 @@ def get_num_cached_tokens(self, seq: Sequence) -> int: self._seq_id_to_num_tokens_computed[seq.seq_id] = num_cached_tokens return num_cached_tokens + def on_swap_in_cpu_blocks(self, seq_id: int) -> None: + """Mark the sequence as having CPU blocks swapped in.""" + # NOTE(Yihua): This is a temporary solution to handle the case where + # the CPU offloading is enabled and the sequence has CPU blocks swapped + # in. In this case, the number in self._seq_id_to_num_tokens_computed + # should be invalidated and we need to re-compute it. + self._seq_id_has_cpu_blocks.add(seq_id) + def remove_seq(self, seq_id: int) -> None: """Stop tracking the sequence.""" if not self._enable_caching: @@ -990,6 +1001,8 @@ def remove_seq(self, seq_id: int) -> None: assert seq_id in self._seq_id_to_num_tokens_computed del self._seq_id_to_num_tokens_computed[seq_id] + self._seq_id_has_cpu_blocks.discard(seq_id) + class LastAccessBlocksTracker: """Manages the last access time of the tracked sequences, in order to allow diff --git a/vllm/core/block_manager.py b/vllm/core/block_manager.py index 209487c6b4f9e..9f34131bfb79a 100644 --- a/vllm/core/block_manager.py +++ b/vllm/core/block_manager.py @@ -5,6 +5,8 @@ from vllm.core.block.block_table import BlockTable from vllm.core.block.cpu_gpu_block_allocator import CpuGpuBlockAllocator +from vllm.core.block.cpu_offloading_block_allocator import ( + CpuOffloadingBlockAllocator) from vllm.core.block.interfaces import Block from vllm.core.block.prefix_caching_block import (ComputedBlocksTracker, LastAccessBlocksTracker) @@ -16,6 +18,11 @@ SeqId = int EncoderSeqId = str +block_allocator_creator = { + "CpuGpuBlockAllocator": CpuGpuBlockAllocator.create, + "CpuOffloadingBlockAllocator": CpuOffloadingBlockAllocator.create, +} + class SelfAttnBlockSpaceManager(BlockSpaceManager): """BlockSpaceManager which manages the allocation of KV cache. @@ -65,6 +72,7 @@ def __init__( watermark: float = 0.01, sliding_window: Optional[int] = None, enable_caching: bool = False, + block_allocator: str = "CpuGpuBlockAllocator", ) -> None: self.block_size = block_size self.num_total_gpu_blocks = num_gpu_blocks @@ -90,7 +98,7 @@ def __init__( self.watermark_blocks = int(watermark * num_gpu_blocks) - self.block_allocator = CpuGpuBlockAllocator.create( + self.block_allocator = block_allocator_creator[block_allocator]( allocator_type="prefix_caching" if enable_caching else "naive", num_gpu_blocks=num_gpu_blocks, num_cpu_blocks=num_cpu_blocks, @@ -105,6 +113,11 @@ def __init__( self._last_access_blocks_tracker = LastAccessBlocksTracker( self.block_allocator) + # request_id -> (blocks_to_swap_out, blocks_to_swap_in) + self.blocks_to_swap_of_sequence_id: List[ + Tuple[int, Tuple[List[Tuple[int, int]], List[Tuple[int, int]]]]] = \ + [] + def can_allocate(self, seq_group: SequenceGroup, num_lookahead_slots: int = 0) -> AllocStatus: @@ -154,6 +167,18 @@ def _allocate_sequence(self, seq: Sequence) -> BlockTable: # Add blocks to the block table only if the sequence is non empty. block_table.allocate(seq.get_token_ids()) + # If the block allocator is CpuOffloadingBlockAllocator, we need to + # tell the computed_blocks_tracker to invalidate the previous computed + # num cached tokens + if isinstance(self.block_allocator, CpuOffloadingBlockAllocator) and \ + self.block_allocator.will_swap_in_cpu_blocks(): + self._computed_blocks_tracker.on_swap_in_cpu_blocks(seq.seq_id) + + blocks_to_swap_out, blocks_to_swap_in = \ + self.block_allocator.get_and_reset_swaps() + if (len(blocks_to_swap_out) + len(blocks_to_swap_in) > 0): + self.blocks_to_swap_of_sequence_id.append( + (seq.seq_id, (blocks_to_swap_out, blocks_to_swap_in))) return block_table def allocate(self, seq_group: SequenceGroup) -> None: @@ -241,6 +266,11 @@ def append_slots( ) # Return any new copy-on-writes. new_cows = self.block_allocator.clear_copy_on_writes() + blocks_to_swap_out, blocks_to_swap_in = \ + self.block_allocator.get_and_reset_swaps() + if (len(blocks_to_swap_out) + len(blocks_to_swap_in) > 0): + self.blocks_to_swap_of_sequence_id.append( + (seq.seq_id, (blocks_to_swap_out, blocks_to_swap_in))) return new_cows def free(self, seq: Sequence) -> None: @@ -508,3 +538,25 @@ def get_num_cached_tokens(self, seq: Sequence) -> int: cached in the block manager for the sequence. """ return self._computed_blocks_tracker.get_num_cached_tokens(seq) + + def get_and_reset_swaps(self, + now: float) -> \ + List[Tuple[int, + Tuple[List[Tuple[int, int]], List[Tuple[int, int]]]]]: + """Returns and clears the mapping of source to destination block IDs. + Will be called after every swapping operations for now, and after every + schedule when BlockManagerV2 become default. + + Args: + now (float): The time stamp. + + Returns: + A tuple of two lists: (blocks_to_swap_out, blocks_to_swap_in). + Each list is a List[Tuple[int, int]], containing the mapping of + source to destination block IDs. The block IDs are physical block + IDs and it's expected to be used by the cache engine directly. + """ + ret = self.blocks_to_swap_of_sequence_id + self.block_allocator.access_cpu_hit_blocks(now) + self.blocks_to_swap_of_sequence_id = [] + return ret diff --git a/vllm/core/interfaces.py b/vllm/core/interfaces.py index b10b8d3f4a5bf..fb3308508d1bb 100644 --- a/vllm/core/interfaces.py +++ b/vllm/core/interfaces.py @@ -125,3 +125,22 @@ def get_prefix_cache_hit_rate(self, device: Device) -> float: @abstractmethod def get_num_cached_tokens(self, seq: Sequence) -> int: pass + + @abstractmethod + def get_and_reset_swaps(self, + now: float) -> \ + List[Tuple[int, + Tuple[List[Tuple[int, int]], List[Tuple[int, int]]]]]: + """Returns and clears the mapping of source to destination block IDs. + Will be called after every swapping operations for now, and after every + schedule when BlockManagerV2 become default. + + Args: + now (float): The time stamp. + + Returns: + A tuple of two lists: (blocks_to_swap_out, blocks_to_swap_in). + Each list is a List[Tuple[int, int]], containing the mapping of + source to destination block IDs. + """ + pass diff --git a/vllm/core/placeholder_block_space_manager.py b/vllm/core/placeholder_block_space_manager.py index 26d42b7f1790e..a7166aa5ae04b 100644 --- a/vllm/core/placeholder_block_space_manager.py +++ b/vllm/core/placeholder_block_space_manager.py @@ -92,3 +92,8 @@ def get_prefix_cache_hit_rate(self, device: Device) -> float: def get_num_cached_tokens(self, seq: Sequence) -> int: return 0 + + def get_and_reset_swaps(self, now: float) -> \ + List[Tuple[int, + Tuple[List[Tuple[int, int]], List[Tuple[int, int]]]]]: + return [(-1, ([], []))] diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index c3bc6becf0995..328e2abb7684a 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -138,6 +138,10 @@ class SchedulerOutputs: num_batched_tokens: int # Blocks to swap in. List of CPU -> GPU block number. blocks_to_swap_in: List[Tuple[int, int]] + # swap in requests offsets + swap_in_offsets: List[int] + # swap in sequence IDs + swap_in_sequence_ids: List[int] # Blocks to swap out. List of GPU -> CPU block number. blocks_to_swap_out: List[Tuple[int, int]] # Blocks to copy. Source to dest block. @@ -152,7 +156,9 @@ class SchedulerOutputs: def __post_init__(self): # Swap in and swap out should never happen at the same time. - assert not (self.blocks_to_swap_in and self.blocks_to_swap_out) + # NOTE(Kuntai): in CpuOffloadingBlockAllocator swap in and swap out + # will happen at the same time. So we comment out the following line. + # assert not (self.blocks_to_swap_in and self.blocks_to_swap_out) self.num_loras: int = len(self.lora_requests) if self.num_loras > 0: @@ -358,7 +364,8 @@ def __init__( num_gpu_blocks=num_gpu_blocks, num_cpu_blocks=num_cpu_blocks, sliding_window=self.cache_config.sliding_window, - enable_caching=self.cache_config.enable_prefix_caching) + enable_caching=self.cache_config.enable_prefix_caching, + block_allocator=self.cache_config.block_allocator) # Sequence groups in the WAITING state. # Contain new prefill or preempted requests. @@ -1131,6 +1138,25 @@ def _schedule_default(self) -> SchedulerOutputs: blocks_to_copy = running_scheduled.blocks_to_copy blocks_to_copy.extend(swapped_in.blocks_to_copy) + blocks_to_swap_in = swapped_in.blocks_to_swap_in + blocks_to_swap_out = running_scheduled.blocks_to_swap_out + + swap_in_cnt = 0 + swap_in_offsets = [0] + swap_in_sequence_ids = [] + if (len(blocks_to_swap_in) > 0): + swap_in_cnt += len(blocks_to_swap_in) + swap_in_offsets.append(swap_in_cnt) + swap_in_sequence_ids.append(-1) + for seq_id, (new_swap_out, new_swap_in) in \ + self.block_manager.get_and_reset_swaps(time.time()): + blocks_to_swap_out.extend(new_swap_out) + if (len(new_swap_in) > 0): + blocks_to_swap_in.extend(new_swap_in) + swap_in_cnt += len(new_swap_in) + swap_in_offsets.append(swap_in_cnt) + swap_in_sequence_ids.append(seq_id) + ignored_seq_groups = prefills.ignored_seq_groups ignored_seq_groups.extend(swapped_in.infeasible_seq_groups) @@ -1139,8 +1165,10 @@ def _schedule_default(self) -> SchedulerOutputs: num_prefill_groups=num_prefill_groups, num_batched_tokens=budget.num_batched_tokens + budget.num_cached_tokens, - blocks_to_swap_in=swapped_in.blocks_to_swap_in, - blocks_to_swap_out=running_scheduled.blocks_to_swap_out, + blocks_to_swap_in=blocks_to_swap_in, + swap_in_offsets=swap_in_offsets, + swap_in_sequence_ids=swap_in_sequence_ids, + blocks_to_swap_out=blocks_to_swap_out, blocks_to_copy=blocks_to_copy, ignored_seq_groups=ignored_seq_groups, num_lookahead_slots=running_scheduled.num_lookahead_slots, @@ -1209,6 +1237,29 @@ def _schedule_chunked_prefill(self) -> SchedulerOutputs: # Update swapped requests. self.swapped.extend(running_scheduled.swapped_out) + + blocks_to_copy = running_scheduled.blocks_to_copy + blocks_to_copy.extend(swapped_in.blocks_to_copy) + + blocks_to_swap_in = swapped_in.blocks_to_swap_in + blocks_to_swap_out = running_scheduled.blocks_to_swap_out + + swap_in_cnt = 0 + swap_in_offsets = [0] + swap_in_sequence_ids = [] + if (len(blocks_to_swap_in) > 0): + swap_in_cnt += len(blocks_to_swap_in) + swap_in_offsets.append(swap_in_cnt) + swap_in_sequence_ids.append(-1) + for seq_id, (new_swap_out, new_swap_in) in \ + self.block_manager.get_and_reset_swaps(time.time()): + blocks_to_swap_out.extend(new_swap_out) + if (len(new_swap_in) > 0): + blocks_to_swap_in.extend(new_swap_in) + swap_in_cnt += len(new_swap_in) + swap_in_offsets.append(swap_in_cnt) + swap_in_sequence_ids.append(seq_id) + # Put prefills first due to Attention backend ordering assumption. scheduled_seq_groups = (prefills.seq_groups + running_scheduled.prefill_seq_groups + @@ -1231,10 +1282,11 @@ def _schedule_chunked_prefill(self) -> SchedulerOutputs: num_prefill_groups=num_prefill_groups, num_batched_tokens=budget.num_batched_tokens + budget.num_cached_tokens, - blocks_to_swap_in=swapped_in.blocks_to_swap_in, - blocks_to_swap_out=running_scheduled.blocks_to_swap_out, - blocks_to_copy=running_scheduled.blocks_to_copy + - swapped_in.blocks_to_copy, + blocks_to_swap_in=blocks_to_swap_in, + swap_in_offsets=swap_in_offsets, + swap_in_sequence_ids=swap_in_sequence_ids, + blocks_to_swap_out=blocks_to_swap_out, + blocks_to_copy=blocks_to_copy, ignored_seq_groups=prefills.ignored_seq_groups + swapped_in.infeasible_seq_groups, num_lookahead_slots=num_lookahead_slots, diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index d485c2a9e7208..fb93657fa2dd3 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -112,6 +112,7 @@ class EngineArgs: pipeline_parallel_size: int = 1 tensor_parallel_size: int = 1 max_parallel_loading_workers: Optional[int] = None + block_allocator: str = "CpuGpuBlockAllocator" # NOTE(kzawora): default block size for Gaudi should be 128 # smaller sizes still work, but very inefficiently block_size: int = 16 if not current_platform.is_hpu() else 128 @@ -412,6 +413,17 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: action='store_true', help='If specified, use nsight to profile Ray workers.') # KV cache arguments + parser.add_argument( + '--block-allocator', + type=str, + default='CpuGpuBlockAllocator', + choices=['CpuGpuBlockAllocator', 'CpuOffloadingBlockAllocator'], + help='The block allocator that vLLM uses. Currently' + ' can be CpuGpuBlockAllocator (the default) and ' + 'CpuOffloadingBlockAllocator (experimental) that ' + 'supports offloading the KV cache to CPU . ' + 'When using CpuOffloadingBlockAllocator, the ' + 'preemption mode must be recompute.') parser.add_argument('--block-size', type=int, default=EngineArgs.block_size, @@ -1007,6 +1019,14 @@ def create_engine_config(self, "CPU offload space must be non-negative" f", but got {self.cpu_offload_gb}") + if self.block_allocator == "CpuOffloadingBlockAllocator" and \ + self.preemption_mode == "swap": + raise ValueError( + "CpuOffloadingBlockAllocator only supports preemption by " + "recomputation as it internally offloads the request KV cache " + "to CPU. Please add `--preemption-mode recomputation` to vLLM " + "engine args") + device_config = DeviceConfig(device=self.device) model_config = self.create_model_config() @@ -1029,6 +1049,7 @@ def create_engine_config(self, sliding_window=model_config.get_sliding_window(), enable_prefix_caching=self.enable_prefix_caching, cpu_offload_gb=self.cpu_offload_gb, + block_allocator=self.block_allocator, ) parallel_config = ParallelConfig( pipeline_parallel_size=self.pipeline_parallel_size, diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 9be30c635cb2c..215bfda868c33 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -1390,6 +1390,8 @@ def step(self) -> List[Union[RequestOutput, PoolingRequestOutput]]: execute_model_req = ExecuteModelRequest( seq_group_metadata_list=seq_group_metadata_list, blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in, + swap_in_offsets=scheduler_outputs.swap_in_offsets, + swap_in_sequence_ids=scheduler_outputs.swap_in_sequence_ids, blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out, blocks_to_copy=scheduler_outputs.blocks_to_copy, num_lookahead_slots=scheduler_outputs.num_lookahead_slots, diff --git a/vllm/entrypoints/llm.py b/vllm/entrypoints/llm.py index 0bec978c4869c..0a3b258fa7a5a 100644 --- a/vllm/entrypoints/llm.py +++ b/vllm/entrypoints/llm.py @@ -162,6 +162,7 @@ def __init__( gpu_memory_utilization: float = 0.9, swap_space: float = 4, cpu_offload_gb: float = 0, + block_allocator: str = "CpuGpuBlockAllocator", enforce_eager: Optional[bool] = None, max_seq_len_to_capture: int = 8192, disable_custom_all_reduce: bool = False, @@ -210,6 +211,7 @@ def __init__( gpu_memory_utilization=gpu_memory_utilization, swap_space=swap_space, cpu_offload_gb=cpu_offload_gb, + block_allocator=block_allocator, enforce_eager=enforce_eager, max_seq_len_to_capture=max_seq_len_to_capture, disable_custom_all_reduce=disable_custom_all_reduce, diff --git a/vllm/sequence.py b/vllm/sequence.py index 669124319c4f4..8c1a1327991ff 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -1244,6 +1244,10 @@ class ExecuteModelRequest( # Blocks to swap in. List of CPU -> GPU block number. blocks_to_swap_in: List[Tuple[int, int]] = msgspec.field(default_factory=list) + # swap in requests offsets + swap_in_offsets: List[int] = msgspec.field(default_factory=list) + # swap in sequence IDs + swap_in_sequence_ids: List[int] = msgspec.field(default_factory=list) # Blocks to swap out. List of GPU -> CPU block number. blocks_to_swap_out: List[Tuple[int, int]] = msgspec.field(default_factory=list) diff --git a/vllm/spec_decode/draft_model_runner.py b/vllm/spec_decode/draft_model_runner.py index fe5fd39f42ac9..5dd51c57bbb9d 100644 --- a/vllm/spec_decode/draft_model_runner.py +++ b/vllm/spec_decode/draft_model_runner.py @@ -273,8 +273,9 @@ def execute_model( if previous_hidden_states is not None else {} # Run model - with set_forward_context(model_input.attn_metadata, - self.vllm_config): + with set_forward_context( + {"attn_metadata": model_input.attn_metadata}, + self.vllm_config): hidden_states = model_executable( input_ids=model_input.input_tokens, positions=model_input.input_positions, diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index 8d9976ded7c5e..2b4cc93ab0869 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -460,7 +460,8 @@ def execute_model( # Run the decoder. # Use persistent buffers for CUDA graphs. - with set_forward_context(attn_metadata, self.vllm_config): + with set_forward_context({"attn_metadata": attn_metadata}, + self.vllm_config): hidden_states = self.model( input_ids=None, positions=self.positions[:num_input_tokens], diff --git a/vllm/worker/cache_engine.py b/vllm/worker/cache_engine.py index 7ccd4571b19df..6e3c84091b0b3 100644 --- a/vllm/worker/cache_engine.py +++ b/vllm/worker/cache_engine.py @@ -1,11 +1,13 @@ """CacheEngine class for managing the KV cache.""" -from typing import List +from collections import deque +from typing import Any, Dict, List import torch from vllm.attention import get_attn_backend from vllm.config import CacheConfig, DeviceConfig, ModelConfig, ParallelConfig from vllm.logger import init_logger +from vllm.platforms import current_platform from vllm.utils import (STR_DTYPE_TO_TORCH_DTYPE, LayerBlockType, get_dtype_size, is_pin_memory_available) @@ -84,7 +86,10 @@ def _allocate_kv_cache( device=device)) return kv_cache - def swap_in(self, src_to_dst: torch.Tensor) -> None: + def swap_in(self, + src_to_dst: torch.Tensor, + offsets: torch.Tensor = None, + sequence_ids: torch.Tensor = None) -> None: for i in range(self.num_attention_layers): self.attn_backend.swap_blocks(self.cpu_cache[i], self.gpu_cache[i], src_to_dst) @@ -94,6 +99,12 @@ def swap_out(self, src_to_dst: torch.Tensor) -> None: self.attn_backend.swap_blocks(self.gpu_cache[i], self.cpu_cache[i], src_to_dst) + def swap_in_sync(self, sequence_ids: torch.Tensor) -> None: + pass + + def swap_out_sync(self) -> None: + pass + def copy(self, src_to_dsts: torch.Tensor) -> None: self.attn_backend.copy_blocks(self.gpu_cache, src_to_dsts) @@ -117,3 +128,183 @@ def get_cache_block_size( dtype = STR_DTYPE_TO_TORCH_DTYPE[cache_config.cache_dtype] dtype_size = get_dtype_size(dtype) return dtype_size * total + + +class EventPool: + + def __init__(self, reserve_num_events: int, device: torch.device): + self.reserve_num_events = reserve_num_events + self.event_queue: deque[torch.cuda.Event] = deque() + self.device = device + with torch.cuda.device(device): + for i in range(reserve_num_events): + event = torch.cuda.Event() + # create the detail new event + event.record() + event.synchronize() + self.event_queue.append(event) + + def get_event(self) -> torch.cuda.Event: + if (len(self.event_queue) == 0): + with torch.cuda.device(self.device): + event = torch.cuda.Event() + # create the detail new event + event.record() + event.synchronize() + self.event_queue.append(event) + return self.event_queue.popleft() + + def put_event(self, event: torch.cuda.Event): + self.event_queue.append(event) + + def get_events(self, num_events: int) -> list[torch.cuda.Event]: + ret = [] + for i in range(num_events): + ret.append(self.get_event()) + return ret + + def put_events(self, events: list[torch.cuda.Event]): + for event in events: + self.event_queue.append(event) + + +class GPUCacheEngine(CacheEngine): + + def __init__( + self, + cache_config: CacheConfig, + model_config: ModelConfig, + parallel_config: ParallelConfig, + device_config: DeviceConfig, + ) -> None: + super().__init__(cache_config, model_config, parallel_config, + device_config) + self.use_fast_path = False + # only these *PUs support fast path + if (current_platform.is_cuda()) or \ + (current_platform.is_rocm()): + self.use_fast_path = True + self.swap_in_stream = None + self.swap_in_event_pool = None + self.swap_in_event_map: Dict[int, Any] = {} + self.swap_out_stream = None + self.swap_out_event = None + self.device = None + self._cur_swap_in_sync_layer = 0 + self._cur_swap_out_layer = 0 + if (not self.use_fast_path): + return + # create device streams and events + self.device = torch.device(torch.cuda.current_device()) + with torch.cuda.device(self.device): + self.swap_in_stream = torch.cuda.Stream() + self.swap_in_event_pool = EventPool(64 * self.num_attention_layers, + self.device) + self.swap_out_stream = torch.cuda.Stream() + self.swap_out_event = torch.cuda.Event() + + def swap_in(self, + src_to_dst: torch.Tensor, + offsets: torch.Tensor = None, + sequence_ids: torch.Tensor = None) -> None: + if (not self.use_fast_path) or \ + (sequence_ids is None) or (sequence_ids.numel() == 0): + super().swap_in(src_to_dst) + return + sequence_ids_numpy = sequence_ids.numpy() + for seq_id in sequence_ids_numpy: + # the first one + if (seq_id == -1): + continue + assert (self.swap_in_event_map.get(seq_id) is None) + assert (self.swap_in_event_pool is not None) + tmp_event_list = self.swap_in_event_pool.get_events( + self.num_attention_layers) + self.swap_in_event_map[seq_id] = tmp_event_list + offsets_numpy = offsets.numpy() + forward_stream = torch.cuda.current_stream() + for idx, seq_id in enumerate(sequence_ids_numpy): + start_idx = offsets_numpy[idx] + last_idx = offsets_numpy[idx + 1] + num_blocks = last_idx - start_idx + swap_in_blocks = src_to_dst.narrow(0, start_idx, num_blocks) + for layer_idx in range(self.num_attention_layers): + if (seq_id == -1): + with torch.cuda.stream(forward_stream): + self.attn_backend.swap_blocks( + self.cpu_cache[layer_idx], + self.gpu_cache[layer_idx], swap_in_blocks) + else: + with torch.cuda.stream(self.swap_in_stream): + self.attn_backend.swap_blocks( + self.cpu_cache[layer_idx], + self.gpu_cache[layer_idx], swap_in_blocks) + self.swap_in_event_map[seq_id][layer_idx].record( + self.swap_in_stream) + + def swap_out( + self, + src_to_dst: torch.Tensor, + ) -> None: + if (src_to_dst.numel() == 0): + return + if (not self.use_fast_path): + cur_layer = self._cur_swap_out_layer + self.attn_backend.swap_blocks(self.gpu_cache[cur_layer], + self.cpu_cache[cur_layer], + src_to_dst) + else: + forward_stream = torch.cuda.current_stream() + assert (self.swap_out_event is not None) + self.swap_out_event.record(forward_stream) + self.swap_out_event.wait(self.swap_out_stream) + with torch.cuda.stream(self.swap_out_stream): + cur_layer = self._cur_swap_out_layer + self.attn_backend.swap_blocks(self.gpu_cache[cur_layer], + self.cpu_cache[cur_layer], + src_to_dst) + self._cur_swap_out_layer = \ + (self._cur_swap_out_layer + 1) % self.num_attention_layers + + def _swap_in_layer_sync_with_seq_ids(self, layer_id: int, + seq_ids: torch.Tensor) -> None: + for seq_id in seq_ids.numpy(): + if (self.swap_in_event_map.get(seq_id) is None): + continue + self.swap_in_event_map[seq_id][layer_id].synchronize() + if (layer_id == self.num_attention_layers - 1): + # recycle the events + for seq_id in seq_ids.numpy(): + if (self.swap_in_event_map.get(seq_id) is None): + continue + event_list = self.swap_in_event_map[seq_id] + assert (self.swap_in_event_pool is not None) + self.swap_in_event_pool.put_events(event_list) + del self.swap_in_event_map[seq_id] + + def _swap_in_layer_all_sync(self, layer_id: int) -> None: + for event_list in self.swap_in_event_map.values(): + event_list[layer_id].synchronize() + # recycle the events + if (layer_id == self.num_attention_layers - 1): + for event_list in self.swap_in_event_map.values(): + assert (self.swap_in_event_pool is not None) + self.swap_in_event_pool.put_events(event_list) + self.swap_in_event_map.clear() + + def swap_in_sync(self, sequence_ids: torch.Tensor) -> None: + if (not self.use_fast_path): + return + if (sequence_ids.numel() == 0): + self._swap_in_layer_all_sync(self._cur_swap_in_sync_layer) + else: + self._swap_in_layer_sync_with_seq_ids(self._cur_swap_in_sync_layer, + sequence_ids) + self._cur_swap_in_sync_layer = \ + (self._cur_swap_in_sync_layer + 1) % self.num_attention_layers + + def swap_out_sync(self) -> None: + if (not self.use_fast_path): + return + assert (self.swap_out_stream is not None) + self.swap_out_stream.synchronize() diff --git a/vllm/worker/cpu_enc_dec_model_runner.py b/vllm/worker/cpu_enc_dec_model_runner.py index cc24cfe04d2ba..44b5cb0c6356e 100644 --- a/vllm/worker/cpu_enc_dec_model_runner.py +++ b/vllm/worker/cpu_enc_dec_model_runner.py @@ -280,6 +280,7 @@ def execute_model( kv_caches: List[torch.Tensor], intermediate_tensors: Optional[IntermediateTensors] = None, num_steps: int = 1, + **kwargs: Any, ) -> Optional[List[SamplerOutput]]: if num_steps > 1: raise ValueError( @@ -305,7 +306,8 @@ def execute_model( intermediate_tensors, } - with set_forward_context(model_input.attn_metadata, self.vllm_config): + with set_forward_context({"attn_metadata": model_input.attn_metadata}, + self.vllm_config): hidden_states = model_executable(**execute_model_kwargs) # Compute the logits. diff --git a/vllm/worker/cpu_model_runner.py b/vllm/worker/cpu_model_runner.py index 420aaf8a1b4cd..9fe2b6fec711c 100644 --- a/vllm/worker/cpu_model_runner.py +++ b/vllm/worker/cpu_model_runner.py @@ -512,6 +512,7 @@ def execute_model( intermediate_tensors: Optional[IntermediateTensors] = None, num_steps: int = 1, previous_hidden_states: Optional[torch.Tensor] = None, + **kwargs, ) -> Optional[List[SamplerOutput]]: if num_steps > 1: raise ValueError( @@ -528,7 +529,8 @@ def execute_model( execute_model_kwargs.update( {"previous_hidden_states": previous_hidden_states}) - with set_forward_context(model_input.attn_metadata, self.vllm_config): + with set_forward_context({"attn_metadata": model_input.attn_metadata}, + self.vllm_config): hidden_states = model_executable( input_ids=model_input.input_tokens, positions=model_input.input_positions, diff --git a/vllm/worker/cpu_pooling_model_runner.py b/vllm/worker/cpu_pooling_model_runner.py index 17b2fd2564a04..14c3b7c8319bb 100644 --- a/vllm/worker/cpu_pooling_model_runner.py +++ b/vllm/worker/cpu_pooling_model_runner.py @@ -34,6 +34,7 @@ def execute_model( kv_caches: List[torch.Tensor], intermediate_tensors: Optional[IntermediateTensors] = None, num_steps: int = 1, + **kwargs: Any, ) -> Optional[Union[List[PoolerOutput], IntermediateTensors]]: if num_steps > 1: raise ValueError( @@ -69,7 +70,8 @@ def execute_model( intermediate_tensors, } - with set_forward_context(model_input.attn_metadata, self.vllm_config): + with set_forward_context({"attn_metadata": model_input.attn_metadata}, + self.vllm_config): hidden_states = model_executable(**execute_model_kwargs) # Only perform pooling in the driver worker. diff --git a/vllm/worker/enc_dec_model_runner.py b/vllm/worker/enc_dec_model_runner.py index 5697fbbaa2041..1ae051b6f38ea 100644 --- a/vllm/worker/enc_dec_model_runner.py +++ b/vllm/worker/enc_dec_model_runner.py @@ -154,6 +154,7 @@ def execute_model( kv_caches: List[torch.Tensor], intermediate_tensors: Optional[IntermediateTensors] = None, num_steps: int = 1, + **kwargs: Any, ) -> Optional[List[PoolerOutput]]: if num_steps > 1: raise ValueError("num_steps > 1 is not supported in " @@ -175,7 +176,8 @@ def execute_model( } if self.has_inner_state else {} multi_modal_kwargs = model_input.multi_modal_kwargs or {} - with set_forward_context(model_input.attn_metadata, self.vllm_config): + with set_forward_context({"attn_metadata": model_input.attn_metadata}, + self.vllm_config): hidden_or_intermediate_states = model_executable( input_ids=model_input.input_tokens, positions=model_input.input_positions, diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index 99cf9a7e67256..901699e8c61b0 100644 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -1893,6 +1893,7 @@ def execute_model( intermediate_tensors: Optional[IntermediateTensors] = None, num_steps: int = 1, warmup_mode=False, + **kwargs: Any, ) -> Optional[Union[List[SamplerOutput], IntermediateTensors]]: if num_steps > 1: raise ValueError( diff --git a/vllm/worker/model_runner.py b/vllm/worker/model_runner.py index 551b84435fdc0..ec6ef65ab23d3 100644 --- a/vllm/worker/model_runner.py +++ b/vllm/worker/model_runner.py @@ -58,6 +58,8 @@ if TYPE_CHECKING: from vllm.attention.backends.abstract import AttentionBackend + from vllm.worker.cache_engine import CacheEngine + from vllm.worker.worker_base import WorkerInput logger = init_logger(__name__) @@ -1511,7 +1513,8 @@ def capture_model(self, kv_caches: List[List[torch.Tensor]]) -> None: self._update_inputs_to_capture_for_enc_dec_model( capture_inputs) - with set_forward_context(attn_metadata, self.vllm_config): + with set_forward_context({"attn_metadata": attn_metadata}, + self.vllm_config): graph_runner.capture(**capture_inputs) self.graph_memory_pool = graph_runner.graph.pool() self.graph_runners[virtual_engine][batch_size] = ( @@ -1612,6 +1615,9 @@ def execute_model( kv_caches: List[torch.Tensor], intermediate_tensors: Optional[IntermediateTensors] = None, num_steps: int = 1, + cache_engine: Optional["CacheEngine"] = None, + worker_input: Optional["WorkerInput"] = None, + **kwargs: Any, ) -> Optional[Union[List[SamplerOutput], IntermediateTensors]]: if num_steps > 1: raise ValueError("num_steps > 1 is not supported in ModelRunner") @@ -1676,8 +1682,12 @@ def execute_model( model_forward_start.record() if not bypass_model_exec: - with set_forward_context(model_input.attn_metadata, - self.vllm_config): + with set_forward_context( + { + "attn_metadata": model_input.attn_metadata, + "cache_engine": cache_engine, + "worker_input": worker_input, + }, self.vllm_config): hidden_or_intermediate_states = model_executable( input_ids=model_input.input_tokens, positions=model_input.input_positions, diff --git a/vllm/worker/model_runner_base.py b/vllm/worker/model_runner_base.py index cd4770202a186..a086f988bc2f4 100644 --- a/vllm/worker/model_runner_base.py +++ b/vllm/worker/model_runner_base.py @@ -272,6 +272,7 @@ def execute_model( kv_caches: Optional[List[torch.Tensor]], intermediate_tensors: Optional[IntermediateTensors], num_steps: int = 1, + **kwargs: Any, ) -> Optional[List[SamplerOutput]]: """ Execute the model on the given input. diff --git a/vllm/worker/multi_step_model_runner.py b/vllm/worker/multi_step_model_runner.py index e08a61e31fe42..c2f4892eff787 100644 --- a/vllm/worker/multi_step_model_runner.py +++ b/vllm/worker/multi_step_model_runner.py @@ -461,6 +461,7 @@ def execute_model( kv_caches: List[torch.Tensor], intermediate_tensors: Optional[IntermediateTensors] = None, num_steps: int = 1, + **kwargs: Any, ) -> Optional[Union[List[SamplerOutput], IntermediateTensors]]: """ Execute the model for a single step and update multi-step diff --git a/vllm/worker/neuron_model_runner.py b/vllm/worker/neuron_model_runner.py index ae4eb6ba6eaec..aa424022111c3 100644 --- a/vllm/worker/neuron_model_runner.py +++ b/vllm/worker/neuron_model_runner.py @@ -309,6 +309,7 @@ def execute_model( kv_caches: Optional[List[torch.Tensor]] = None, intermediate_tensors: Optional[IntermediateTensors] = None, num_steps: int = 1, + **kwargs: Any, ) -> Optional[List[SamplerOutput]]: if num_steps > 1: raise ValueError( diff --git a/vllm/worker/openvino_model_runner.py b/vllm/worker/openvino_model_runner.py index 6000e5dfe4e30..7ffdc7a78e524 100644 --- a/vllm/worker/openvino_model_runner.py +++ b/vllm/worker/openvino_model_runner.py @@ -1,5 +1,5 @@ from collections import defaultdict -from typing import Dict, List, NamedTuple, Optional, Tuple +from typing import Any, Dict, List, NamedTuple, Optional, Tuple import openvino as ov import torch @@ -327,6 +327,7 @@ def execute_model( self, seq_group_metadata_list: List[SequenceGroupMetadata], kv_caches: List[Tuple["ov.Tensor", "ov.Tensor"]], + **kwargs: Any, ) -> Optional[SamplerOutput]: ( input_tokens, diff --git a/vllm/worker/pooling_model_runner.py b/vllm/worker/pooling_model_runner.py index 1beae1e3884c5..4ebab6bd9d3c8 100644 --- a/vllm/worker/pooling_model_runner.py +++ b/vllm/worker/pooling_model_runner.py @@ -49,6 +49,7 @@ def execute_model( kv_caches: List[torch.Tensor], intermediate_tensors: Optional[IntermediateTensors] = None, num_steps: int = 1, + **kwargs: Any, ) -> Optional[Union[List[PoolerOutput], IntermediateTensors]]: if num_steps > 1: raise ValueError( @@ -101,7 +102,8 @@ def execute_model( if model_input.token_types is not None: cross_enc_kwargs["token_type_ids"] = model_input.token_types - with set_forward_context(model_input.attn_metadata, self.vllm_config): + with set_forward_context({"attn_metadata": model_input.attn_metadata}, + self.vllm_config): hidden_or_intermediate_states = model_executable( input_ids=model_input.input_tokens, positions=model_input.input_positions, diff --git a/vllm/worker/tpu_model_runner.py b/vllm/worker/tpu_model_runner.py index 9a054eb8a4cf7..b010274b401bc 100644 --- a/vllm/worker/tpu_model_runner.py +++ b/vllm/worker/tpu_model_runner.py @@ -13,6 +13,7 @@ from vllm.attention import AttentionMetadata, get_attn_backend from vllm.config import VllmConfig +from vllm.forward_context import set_forward_context from vllm.logger import init_logger from vllm.model_executor.layers.sampler import SamplerOutput from vllm.model_executor.model_loader import get_model @@ -263,8 +264,10 @@ def _dummy_run( torch._dynamo.mark_dynamic(t, 0) torch._dynamo.mark_dynamic(p, 0) # Dummy run. - self.model(token_ids, position_ids, attn_metadata, input_lens, t, p, - num_samples, kv_caches) + with set_forward_context({"attn_metadata": attn_metadata}, + self.vllm_config): + self.model(token_ids, position_ids, attn_metadata, input_lens, t, + p, num_samples, kv_caches) def warmup_model( self, @@ -584,6 +587,7 @@ def execute_model( kv_caches: Optional[List[Any]], intermediate_tensors: Optional[IntermediateTensors] = None, num_steps: int = 1, + **kwargs: Any, ) -> List[SamplerOutput]: assert intermediate_tensors is None if not model_input.is_first_multi_step: @@ -661,10 +665,12 @@ def execute_model( input_lens = model_input.input_lens[i:i + 1].to(self.device) t = model_input.t[i:i + 1].to(self.device) p = model_input.p[i:i + 1].to(self.device) - output_token_ids = self.model(token_ids, position_ids, - attn_metadata, input_lens, t, p, - model_input.num_samples, - kv_caches) + with set_forward_context({"attn_metadata": attn_metadata}, + self.vllm_config): + output_token_ids = self.model(token_ids, position_ids, + attn_metadata, input_lens, t, + p, model_input.num_samples, + kv_caches) next_token_ids.append(output_token_ids[0]) start_idx = end_idx @@ -709,10 +715,12 @@ def execute_model( input_lens = model_input.input_lens.to(self.device) for i in range(num_steps): slot_mapping = attn_metadata.slot_mapping - output_token_ids = self.model(token_ids, position_ids, - attn_metadata, input_lens, t, p, - model_input.num_samples, - kv_caches) + with set_forward_context({"attn_metadata": attn_metadata}, + self.vllm_config): + output_token_ids = self.model(token_ids, position_ids, + attn_metadata, input_lens, t, + p, model_input.num_samples, + kv_caches) self.cached_step_outputs.append(output_token_ids) if i < num_steps - 1: diff --git a/vllm/worker/worker.py b/vllm/worker/worker.py index 832b9903b7abc..adca0fc301391 100644 --- a/vllm/worker/worker.py +++ b/vllm/worker/worker.py @@ -22,7 +22,7 @@ from vllm.prompt_adapter.request import PromptAdapterRequest from vllm.sequence import (ExecuteModelRequest, IntermediateTensors, SequenceGroupMetadata, SequenceGroupMetadataDelta) -from vllm.worker.cache_engine import CacheEngine +from vllm.worker.cache_engine import CacheEngine, GPUCacheEngine from vllm.worker.enc_dec_model_runner import EncoderDecoderModelRunner from vllm.worker.model_runner import GPUModelRunnerBase, ModelRunner from vllm.worker.pooling_model_runner import PoolingModelRunner @@ -289,8 +289,8 @@ def initialize_cache(self, num_gpu_blocks: int, def _init_cache_engine(self): assert self.cache_config.num_gpu_blocks is not None self.cache_engine = [ - CacheEngine(self.cache_config, self.model_config, - self.parallel_config, self.device_config) + GPUCacheEngine(self.cache_config, self.model_config, + self.parallel_config, self.device_config) for _ in range(self.parallel_config.pipeline_parallel_size) ] self.gpu_cache = [ @@ -319,14 +319,32 @@ def prepare_worker_input( virtual_engine = execute_model_req.virtual_engine num_steps = execute_model_req.num_steps num_seq_groups = len(execute_model_req.seq_group_metadata_list) + seq_ids = [] + for metadata_or_delta in execute_model_req.seq_group_metadata_list: + if isinstance(metadata_or_delta, SequenceGroupMetadata): + seq_ids.extend(metadata_or_delta.seq_data.keys()) + if (len(seq_ids) == 0): + swap_in_offsets = [] + swap_in_sequence_ids = [] + # `blocks_to_swap_in` and `blocks_to_swap_out` are cpu tensors. # they contain parameters to launch cudamemcpyasync. blocks_to_swap_in = torch.tensor(execute_model_req.blocks_to_swap_in, - device="cpu", + device="cuda", dtype=torch.int64).view(-1, 2) + swap_in_offsets = torch.tensor(execute_model_req.swap_in_offsets, + device="cpu", + dtype=torch.int64).view(-1) + swap_in_sequence_ids = \ + torch.tensor(execute_model_req.swap_in_sequence_ids, + device="cpu", + dtype=torch.int64).view(-1) blocks_to_swap_out = torch.tensor(execute_model_req.blocks_to_swap_out, - device="cpu", + device="cuda", dtype=torch.int64).view(-1, 2) + running_sequence_ids = torch.tensor(seq_ids, + device="cpu", + dtype=torch.int64).view(-1) # `blocks_to_copy` is a gpu tensor. The src and tgt of # blocks to copy are in the same device, and `blocks_to_copy` # can be used directly within cuda kernels. @@ -337,27 +355,33 @@ def prepare_worker_input( return WorkerInput( num_seq_groups=num_seq_groups, blocks_to_swap_in=blocks_to_swap_in, + swap_in_offsets=swap_in_offsets, + swap_in_sequence_ids=swap_in_sequence_ids, blocks_to_swap_out=blocks_to_swap_out, blocks_to_copy=blocks_to_copy, virtual_engine=virtual_engine, num_steps=num_steps, + running_sequence_ids=running_sequence_ids, ) + @torch.inference_mode() + def get_cache_engine(self, worker_input: WorkerInput) -> CacheEngine: + if (self.cache_engine is None): + return None + return self.cache_engine[worker_input.virtual_engine] + @torch.inference_mode() def execute_worker(self, worker_input: WorkerInput) -> None: virtual_engine = worker_input.virtual_engine # Issue cache operations. - if (worker_input.blocks_to_swap_in is not None - and worker_input.blocks_to_swap_in.numel() > 0): - self.cache_engine[virtual_engine].swap_in( - worker_input.blocks_to_swap_in) - if (worker_input.blocks_to_swap_out is not None - and worker_input.blocks_to_swap_out.numel() > 0): - self.cache_engine[virtual_engine].swap_out( - worker_input.blocks_to_swap_out) if (worker_input.blocks_to_copy is not None and worker_input.blocks_to_copy.numel() > 0): self.cache_engine[virtual_engine].copy(worker_input.blocks_to_copy) + if (worker_input.blocks_to_swap_in is not None + and worker_input.blocks_to_swap_in.numel() > 0): + self.cache_engine[virtual_engine].swap_in( + worker_input.blocks_to_swap_in, worker_input.swap_in_offsets, + worker_input.swap_in_sequence_ids) def _get_cached_seq_group_metadata( self, diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index 6d00102e0a324..0ba326628bef6 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -2,7 +2,8 @@ import os import time from abc import ABC, abstractmethod -from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union +from typing import (TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Type, + Union) import torch @@ -19,6 +20,9 @@ ModelRunnerBase, ModelRunnerInputBase) +if TYPE_CHECKING: + from vllm.worker.cache_engine import CacheEngine + logger = init_logger(__name__) @@ -144,8 +148,11 @@ class WorkerInput: num_seq_groups: Optional[int] = None blocks_to_swap_in: Optional[torch.Tensor] = None + swap_in_offsets: Optional[torch.Tensor] = None + swap_in_sequence_ids: Optional[torch.Tensor] = None blocks_to_swap_out: Optional[torch.Tensor] = None blocks_to_copy: Optional[torch.Tensor] = None + running_sequence_ids: Optional[torch.Tensor] = None virtual_engine: int = 0 num_steps: int = 1 @@ -161,10 +168,13 @@ def from_broadcasted_tensor_dict( return cls( num_seq_groups=tensor_dict.pop("num_seq_groups"), blocks_to_swap_in=tensor_dict.pop("blocks_to_swap_in"), + swap_in_offsets=tensor_dict.pop("swap_in_offsets"), + swap_in_sequence_ids=tensor_dict.pop("swap_in_sequence_ids"), blocks_to_swap_out=tensor_dict.pop("blocks_to_swap_out"), blocks_to_copy=tensor_dict.pop("blocks_to_copy"), virtual_engine=tensor_dict["virtual_engine"], num_steps=tensor_dict.pop("num_steps"), + running_sequence_ids=tensor_dict.pop("running_sequence_ids"), ) def as_broadcastable_tensor_dict( @@ -175,10 +185,13 @@ def as_broadcastable_tensor_dict( tensor_dict = { "num_seq_groups": self.num_seq_groups, "blocks_to_swap_in": self.blocks_to_swap_in, + "swap_in_offsets": self.swap_in_offsets, + "swap_in_sequence_ids": self.swap_in_sequence_ids, "blocks_to_swap_out": self.blocks_to_swap_out, "blocks_to_copy": self.blocks_to_copy, "virtual_engine": self.virtual_engine, "num_steps": self.num_steps, + "running_sequence_ids": self.running_sequence_ids, } return tensor_dict @@ -237,6 +250,13 @@ def execute_worker(self, worker_input: WorkerInput) -> None: """ raise NotImplementedError + def get_cache_engine(self, worker_input: WorkerInput) \ + -> Optional["CacheEngine"]: + """ + Get the cache engine for the worker. + """ + return None + def _get_worker_input_from_broadcast( self ) -> Optional[Tuple[BroadcastableModelInput, WorkerInput, Dict[ @@ -346,8 +366,12 @@ def execute_model( if self.kv_cache is not None else None, intermediate_tensors=intermediate_tensors, num_steps=num_steps, + cache_engine=self.get_cache_engine(worker_input), + worker_input=worker_input, **kwargs, ) + if (self.get_cache_engine(worker_input) is not None): + self.get_cache_engine(worker_input).swap_out_sync() # type: ignore model_execute_time = time.perf_counter() - start_time if not get_pp_group().is_last_rank: diff --git a/vllm/worker/xpu_model_runner.py b/vllm/worker/xpu_model_runner.py index e6322e095bbb9..8c871ffb8189a 100644 --- a/vllm/worker/xpu_model_runner.py +++ b/vllm/worker/xpu_model_runner.py @@ -557,6 +557,7 @@ def execute_model( kv_caches: List[torch.Tensor], intermediate_tensors: Optional[IntermediateTensors] = None, num_steps: int = 1, + **kwargs: Any, ) -> Optional[List[SamplerOutput]]: if num_steps > 1: raise ValueError(