diff --git a/.github/workflows/causal_lm_cpp.yml b/.github/workflows/causal_lm_cpp.yml index 4b7913bafd..03aed917ae 100644 --- a/.github/workflows/causal_lm_cpp.yml +++ b/.github/workflows/causal_lm_cpp.yml @@ -422,16 +422,8 @@ jobs: - name: run and compare run: | source ./ov/setupvars.sh - ./build/samples/cpp/speculative_decoding_lm/speculative_decoding_lm ./dolly-v2-3b/ ./dolly-v2-7b/ "Alan Turing was a" > predictions_speculative.txt - ./build/samples/cpp/greedy_causal_lm/greedy_causal_lm ./dolly-v2-7b/ "Alan Turing was a" > predictions_greedy.txt - python -c " - with open('predictions_greedy.txt', 'r') as f: - predicted_greedy = f.readline() - with open('predictions_speculative.txt', 'r') as f: - predicted_speculative = f.readline() - assert predicted_greedy == predicted_speculative - " - echo "Alan Turing was a" passed + ./build/samples/cpp/speculative_decoding_lm/speculative_decoding_lm -a ./dolly-v2-3b/ -m ./dolly-v2-7b/ -n 5 + ./build/samples/cpp/speculative_decoding_lm/continuous_batching_speculative_decoding -a ./dolly-v2-3b/ -m ./dolly-v2-7b/ -n 5 cpp-prompt_lookup_decoding_lm-ubuntu: runs-on: ubuntu-20.04-16-cores diff --git a/samples/cpp/speculative_decoding_lm/CMakeLists.txt b/samples/cpp/speculative_decoding_lm/CMakeLists.txt index 078ac8bb52..7633dda403 100644 --- a/samples/cpp/speculative_decoding_lm/CMakeLists.txt +++ b/samples/cpp/speculative_decoding_lm/CMakeLists.txt @@ -1,30 +1,34 @@ -# Copyright (C) 2023-2024 Intel Corporation +# Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 -find_package(OpenVINO REQUIRED COMPONENTS Runtime Threading) - -find_package(OpenVINOGenAI REQUIRED - PATHS - "${CMAKE_BINARY_DIR}" # Reuse the package from the build. - ${OpenVINO_DIR} # GenAI may be installed alogside OpenVINO. - NO_CMAKE_FIND_ROOT_PATH -) - -add_executable(speculative_decoding_lm speculative_decoding_lm.cpp) -target_link_libraries(speculative_decoding_lm PRIVATE openvino::runtime openvino::threading) -set_target_properties(speculative_decoding_lm PROPERTIES - COMPILE_PDB_NAME speculative_decoding_lm - # Ensure out of box LC_RPATH on macOS with SIP - INSTALL_RPATH_USE_LINK_PATH ON) -target_compile_features(speculative_decoding_lm PRIVATE cxx_std_17) - -get_target_property(genai_imported openvino::genai IMPORTED_LOCATION) -set(OPENVINO_TOKENIZERS_PATH $,${genai_imported},$>) -set(OPENVINO_TOKENIZERS_FILENAME "${CMAKE_SHARED_LIBRARY_PREFIX}openvino_tokenizers${CMAKE_SHARED_LIBRARY_SUFFIX}") -target_compile_definitions(speculative_decoding_lm PRIVATE - OPENVINO_TOKENIZERS_PATH="${OPENVINO_TOKENIZERS_PATH}/${OPENVINO_TOKENIZERS_FILENAME}") - -install(TARGETS speculative_decoding_lm - RUNTIME DESTINATION samples_bin/ - COMPONENT samples_bin - EXCLUDE_FROM_ALL) +# start of dependencies + +include(FetchContent) + +if(POLICY CMP0135) + cmake_policy(SET CMP0135 NEW) +endif() + +FetchContent_Declare(cxxopts + URL https://github.com/jarro2783/cxxopts/archive/refs/tags/v3.1.1.tar.gz + URL_HASH SHA256=523175f792eb0ff04f9e653c90746c12655f10cb70f1d5e6d6d9491420298a08) +FetchContent_MakeAvailable(cxxopts) + +if(NOT TARGET nlohmann_json) + FetchContent_Declare(nlohmann_json + URL https://github.com/nlohmann/json/archive/refs/tags/v3.11.3.tar.gz + URL_HASH SHA256=0d8ef5af7f9794e3263480193c491549b2ba6cc74bb018906202ada498a79406) + FetchContent_MakeAvailable(nlohmann_json) +endif() + +find_package(OpenVINO REQUIRED COMPONENTS Runtime) + +# end of dependencies + +set(TARGET_NAME speculative_decoding_lm) +add_executable(${TARGET_NAME} ${TARGET_NAME}.cpp) +target_link_libraries(${TARGET_NAME} PRIVATE openvino::genai cxxopts::cxxopts) + +set(TARGET_NAME_CB continuous_batching_speculative_decoding) +add_executable(${TARGET_NAME_CB} ${TARGET_NAME_CB}.cpp) +target_link_libraries(${TARGET_NAME_CB} PRIVATE openvino::genai cxxopts::cxxopts) diff --git a/samples/cpp/speculative_decoding_lm/continuous_batching_speculative_decoding.cpp b/samples/cpp/speculative_decoding_lm/continuous_batching_speculative_decoding.cpp new file mode 100644 index 0000000000..ad533be2bb --- /dev/null +++ b/samples/cpp/speculative_decoding_lm/continuous_batching_speculative_decoding.cpp @@ -0,0 +1,170 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include +#include + +#include "openvino/genai/continuous_batching_pipeline.hpp" + +void print_cb_generation_result(const ov::genai::GenerationResult& generation_result) { + for (size_t output_id = 0; output_id < generation_result.m_generation_ids.size(); ++output_id) { + std::cout << "Answer " << output_id << " (" << generation_result.m_scores[output_id] << ") : " << generation_result.m_generation_ids[output_id] << std::endl; + } +} + +std::vector get_spec_decoding_generation_config_examples() { + + // sampling param for speulative decoding + ov::genai::GenerationConfig generation_config_greedy_constant = ov::genai::greedy(); + { + generation_config_greedy_constant.num_assistant_tokens = 5; + } + + ov::genai::GenerationConfig generation_config_multinomial_constant = ov::genai::multinomial(); + { + generation_config_multinomial_constant.num_assistant_tokens = 5; + generation_config_multinomial_constant.num_return_sequences = 1; + } + + ov::genai::GenerationConfig generation_config_greedy_dynamic = ov::genai::greedy(); + { + generation_config_greedy_dynamic.assistant_confidence_threshold = 0.8f; + } + + ov::genai::GenerationConfig generation_config_multinomial_dynamic = ov::genai::multinomial(); + { + generation_config_multinomial_dynamic.assistant_confidence_threshold = 0.8f; + } + + return { + generation_config_greedy_constant, + generation_config_multinomial_constant, + generation_config_greedy_dynamic, + generation_config_multinomial_dynamic, + }; +} + +int main(int argc, char* argv[]) try { + // Command line options + + cxxopts::Options options("accuracy_sample", "Help command"); + + options.add_options() + ("n,num_prompts", "A number of prompts", cxxopts::value()->default_value("1")) + ("dynamic_split_fuse", "Whether to use dynamic split-fuse or vLLM scheduling", cxxopts::value()->default_value("false")) + ("m,model", "Path to model and tokenizers base directory", cxxopts::value()->default_value(".")) + ("a,draft_model", "Path to assisting model base directory", cxxopts::value()->default_value(".")) + ("d,device", "Target device to run the model", cxxopts::value()->default_value("CPU")) + ("use_prefix", "Whether to use a prefix or not", cxxopts::value()->default_value("false")) + ("h,help", "Print usage"); + + cxxopts::ParseResult result; + try { + result = options.parse(argc, argv); + } catch (const cxxopts::exceptions::exception& e) { + std::cout << e.what() << "\n\n"; + std::cout << options.help() << std::endl; + return EXIT_FAILURE; + } + + if (result.count("help")) { + std::cout << options.help() << std::endl; + return EXIT_SUCCESS; + } + + const size_t num_prompts = result["num_prompts"].as(); + const bool dynamic_split_fuse = result["dynamic_split_fuse"].as(); + const std::string model_path = result["model"].as(); + const std::string draft_model_path = result["draft_model"].as(); + const std::string device = result["device"].as(); + const bool use_prefix = result["use_prefix"].as(); + + std::string prefix_str = + "You are an advanced language model designed to assist users by providing accurate, " + "relevant, and helpful information. Your responses should be accurate, concise, contextual, " + "respectful, and helpful. The request is: "; + + // create dataset + + std::vector prompt_examples = { + "What is OpenVINO?", + "How are you?", + "What is your name?", + "Tell me something about Canada", + "What is OpenVINO?", + }; + + auto generation_config = get_spec_decoding_generation_config_examples(); + auto default_config_size = generation_config.size(); + for (size_t i = default_config_size; i < num_prompts; ++i) { + generation_config.push_back(generation_config[i % default_config_size]); + } + + std::vector prompts(num_prompts); + for (size_t i = 0; i < num_prompts; ++i) { + prompts[i] = prompt_examples[i % prompt_examples.size()]; + } + + // Perform the inference + auto get_default_block_size = [](const std::string& device) { + const size_t cpu_block_size = 32; + const size_t gpu_block_size = 16; + + bool is_gpu = device.find("GPU") != std::string::npos; + + return is_gpu ? gpu_block_size : cpu_block_size; + }; + + ov::genai::SchedulerConfig scheduler_config; + // batch size + scheduler_config.max_num_batched_tokens = use_prefix ? 256 : 32; + // cache params + scheduler_config.num_kv_blocks = 364; + scheduler_config.block_size = get_default_block_size(device); + // mode - vLLM or dynamic_split_fuse + scheduler_config.dynamic_split_fuse = dynamic_split_fuse; + // vLLM specific params + scheduler_config.max_num_seqs = 2; + scheduler_config.enable_prefix_caching = use_prefix; + + ov::genai::ContinuousBatchingPipeline pipe(model_path, scheduler_config, device, {ov::genai::draft_model(draft_model_path, device)}); + std::vector generation_results = pipe.generate(prompts, generation_config); + + for (size_t request_id = 0; request_id < generation_results.size(); ++request_id) { + const ov::genai::GenerationResult & generation_result = generation_results[request_id]; + std::cout << "Question: " << prompts[request_id] << std::endl; + switch (generation_result.m_status) + { + case ov::genai::GenerationStatus::FINISHED: + print_cb_generation_result(generation_result); + break; + case ov::genai::GenerationStatus::IGNORED: + std::cout << "Request was ignored due to lack of memory." < 0) { + std::cout << "Partial result:" << std::endl; + print_cb_generation_result(generation_result); + } + break; + case ov::genai::GenerationStatus::DROPPED_BY_PIPELINE: + std::cout << "Request was aborted." < 0) { + std::cout << "Partial result:" << std::endl; + print_cb_generation_result(generation_result); + } + break; + default: + break; + } + std::cout << std::endl; + } +} catch (const std::exception& error) { + try { + std::cerr << error.what() << '\n'; + } catch (const std::ios_base::failure&) {} + return EXIT_FAILURE; +} catch (...) { + try { + std::cerr << "Non-exception object thrown\n"; + } catch (const std::ios_base::failure&) {} + return EXIT_FAILURE; +} diff --git a/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp b/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp index f26cb6c7c4..2754c0109a 100644 --- a/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp +++ b/samples/cpp/speculative_decoding_lm/speculative_decoding_lm.cpp @@ -1,403 +1,149 @@ // Copyright (C) 2023-2024 Intel Corporation // SPDX-License-Identifier: Apache-2.0 -#include -#include #include -#include +#include -namespace { +#include "openvino/genai/llm_pipeline.hpp" -constexpr size_t BATCH_SIZE = 1; - -size_t get_seq_len_axis(std::shared_ptr model) { - // sequence length axis in key/values tensors, for most cases [BATCH_SIZE, num_kv_heads, seq_len, head_size], - // threfore usually seq_length_axis = 2 - size_t seq_length_axis = 2; - - // "ReadValue" node is KV cache representation in stateful model - std::string kv_node_type_name = std::string(ov::op::v6::ReadValue::get_type_info_static().name); - - for (const auto op : model->get_ops()) { - if (op->get_type_name() != kv_node_type_name) { - continue; - } - - // Shape example: [-1,4,0,64] - auto shape = op->get_input_partial_shape(0); - - for (size_t i = 0; i < shape.rank().get_length(); i++) { - // Find axis = 0. This would be sequence length axis. - if (shape[i] == 0) { - seq_length_axis = i; - } - } - break; +void print_generation_result(const std::vector& texts, const std::vector& log_probs) { + for (size_t output_id = 0; output_id < texts.size(); ++output_id) { + std::cout << "Answer " << output_id << " (" << log_probs[output_id] << ") : " << texts[output_id] << std::endl; } - - return seq_length_axis; -} - -std::pair tokenize(ov::InferRequest& tokenizer, std::string&& prompt) { - tokenizer.set_input_tensor(ov::Tensor{ov::element::string, {BATCH_SIZE}, &prompt}); - tokenizer.infer(); - return {tokenizer.get_tensor("input_ids"), tokenizer.get_tensor("attention_mask")}; -} - -std::string detokenize(ov::InferRequest& detokenizer, std::vector& tokens) { - detokenizer.set_input_tensor(ov::Tensor{ov::element::i64, {BATCH_SIZE, tokens.size()}, tokens.data()}); - detokenizer.infer(); - return detokenizer.get_output_tensor().data()[0]; } -// The following reasons require TextStreamer to keep a cache of previous tokens: -// detokenizer removes starting ' '. For example detokenize(tokenize(" a")) == "a", -// but detokenize(tokenize("prefix a")) == "prefix a" -// 1 printable token may consist of 2 token ids: detokenize(incomplete_token_idx) == "�" -struct TextStreamer { - ov::InferRequest detokenizer; - std::vector token_cache; - size_t print_len = 0; - - void put(int64_t token) { - token_cache.push_back(token); - std::string text = detokenize(detokenizer, token_cache); - if (!text.empty() && '\n' == text.back() && text.size() > print_len) { - // Flush the cache after the new line symbol - std::cout << std::string_view{text.data() + print_len, text.size() - print_len}; - token_cache.clear(); - print_len = 0; - return; - } - if (text.size() >= 3 && text.compare(text.size() - 3, 3, "�") == 0) { - // Don't print incomplete text - return; - } else if (text.size() > print_len) { - // It is possible to have a shorter text after adding new token. - // Print to output only if text lengh is increaesed. - std::cout << std::string_view{text.data() + print_len, text.size() - print_len} << std::flush; - print_len = text.size(); - } +std::vector get_spec_decoding_generation_config_examples() { + + // sampling param for speulative decoding + ov::genai::GenerationConfig generation_config_greedy_constant = ov::genai::greedy(); + { + generation_config_greedy_constant.num_assistant_tokens = 5; } - void end() { - std::string text = detokenize(detokenizer, token_cache); - if (text.size() <= print_len) - return; - std::cout << std::string_view{text.data() + print_len, text.size() - print_len} << '\n'; - token_cache.clear(); - print_len = 0; + ov::genai::GenerationConfig generation_config_multinomial_constant = ov::genai::multinomial(); + { + generation_config_multinomial_constant.num_assistant_tokens = 5; + generation_config_multinomial_constant.num_return_sequences = 1; } -}; - -ov::Tensor trimm_tensor(ov::Tensor& tensor, uint64_t seq_len_axis, uint64_t new_seq_len) { - // Copy elements from the old to a new tensor and return it. - // Trim kv tensor on sequence length axis - // key/values tensor shape example: [BATCH_SIZE, num_kv_heads, seq_len, head_size] - // Sequense length axis position may vary from one model to another - - auto shape = tensor.get_shape(); - - OPENVINO_ASSERT(seq_len_axis < shape.size(), - "Sequence length axis: ", - seq_len_axis, - " should be less than shape size: ", - shape.size()); - - size_t old_seq_len = shape[seq_len_axis]; - OPENVINO_ASSERT(new_seq_len <= old_seq_len); - - // if new_seq_len equal to old one no need to copy tensor, return as is - if (old_seq_len == new_seq_len) - return tensor; - - shape[seq_len_axis] = new_seq_len; - - if (seq_len_axis == 0) { - tensor.set_shape(shape); - return tensor; + ov::genai::GenerationConfig generation_config_greedy_dynamic = ov::genai::greedy(); + { + generation_config_greedy_dynamic.assistant_confidence_threshold = 0.8f; } - ov::Coordinate new_shape_begin{0, 0, 0, 0}; - ov::Coordinate new_shape_end{shape}; - - auto new_tensor = ov::Tensor(tensor, new_shape_begin, new_shape_end); - - return new_tensor; -} + ov::genai::GenerationConfig generation_config_multinomial_dynamic = ov::genai::multinomial(); + { + generation_config_multinomial_dynamic.assistant_confidence_threshold = 0.8f; + } -void update_kv_cache(ov::InferRequest request, uint64_t seq_len_axis, uint64_t new_seq_len) { - // trim kv_cache values up to the new_seq_len - auto states = request.query_state(); - ov::parallel_for(states.size(), [&](size_t i) { - ov::Tensor old_tensor = states.at(i).get_state(); - states.at(i).set_state(trimm_tensor(old_tensor, seq_len_axis, new_seq_len)); - }); + return { + generation_config_greedy_constant, + generation_config_multinomial_constant, + generation_config_greedy_dynamic, + generation_config_multinomial_dynamic, + }; } -class AssistedCandidateGenerator { -private: - ov::InferRequest draft_model; - size_t max_seq_length; - size_t num_pred_tokens = 5; - size_t seq_len_axis; - const size_t max_pred_tokens = 10; - int64_t out_of_kv_cache_token = -1; - size_t draft_model_seq_length = 0; - -public: - AssistedCandidateGenerator(ov::InferRequest draft_model, - const size_t max_seq_length, - const size_t num_pred_tokens, - const size_t seq_len_axis) - : draft_model{draft_model}, - max_seq_length{max_seq_length}, - num_pred_tokens{num_pred_tokens}, - seq_len_axis{seq_len_axis} {}; - - int64_t generate_next_token(const std::vector tokens) { - size_t tokens_size = tokens.size(); - auto input_ids = draft_model.get_tensor("input_ids"); - input_ids.set_shape({BATCH_SIZE, tokens_size}); - std::copy_n(tokens.begin(), tokens_size, input_ids.data()); - - auto attention_mask = draft_model.get_tensor("attention_mask"); - attention_mask.set_shape({BATCH_SIZE, draft_model_seq_length + tokens_size}); - std::fill_n(attention_mask.data(), attention_mask.get_size(), 1); - - auto position_ids = draft_model.get_tensor("position_ids"); - position_ids.set_shape({BATCH_SIZE, tokens_size}); - std::iota(position_ids.data(), - position_ids.data() + position_ids.get_size(), - draft_model_seq_length); - - draft_model.get_tensor("beam_idx").set_shape({BATCH_SIZE}); - draft_model.get_tensor("beam_idx").data()[0] = 0; - - draft_model.infer(); +int main(int argc, char* argv[]) try { + // Command line options - auto logits = draft_model.get_tensor("logits"); - size_t vocab_size = logits.get_shape().back(); - auto sequence_logits = logits.data() + (tokens_size - 1) * vocab_size; + cxxopts::Options options("accuracy_sample", "Help command"); - draft_model_seq_length += tokens_size; + options.add_options() + ("n,num_prompts", "A number of prompts", cxxopts::value()->default_value("1")) + ("dynamic_split_fuse", "Whether to use dynamic split-fuse or vLLM scheduling", cxxopts::value()->default_value("false")) + ("m,model", "Path to model and tokenizers base directory", cxxopts::value()->default_value(".")) + ("a,draft_model", "Path to assisting model base directory", cxxopts::value()->default_value(".")) + ("d,device", "Target device to run the model", cxxopts::value()->default_value("CPU")) + ("use_prefix", "Whether to use a prefix or not", cxxopts::value()->default_value("false")) + ("h,help", "Print usage"); - return std::max_element(sequence_logits, sequence_logits + vocab_size) - sequence_logits; + cxxopts::ParseResult result; + try { + result = options.parse(argc, argv); + } catch (const cxxopts::exceptions::exception& e) { + std::cout << e.what() << "\n\n"; + std::cout << options.help() << std::endl; + return EXIT_FAILURE; } - std::vector generate_candidates(int64_t out_token) { - std::vector candidates; - - // limit candidates size by num_pred_tokens or by max_seq_length - size_t candidates_to_generate = std::min(num_pred_tokens, max_seq_length - draft_model_seq_length - 1); - - candidates.reserve(candidates_to_generate); - - // generate cadidates - for (size_t i = 0; i < candidates_to_generate; i++) { - // if out_of_kv_cache_token is present, prepend it to out_token in order to collect kv cache for it - if (out_of_kv_cache_token != -1) { - out_token = generate_next_token(std::vector{out_of_kv_cache_token, out_token}); - out_of_kv_cache_token = -1; - } else { - out_token = generate_next_token(std::vector{out_token}); - } - - candidates.push_back(out_token); - } - - out_of_kv_cache_token = candidates.back(); - return candidates; + if (result.count("help")) { + std::cout << options.help() << std::endl; + return EXIT_SUCCESS; } - void update_candidate_strategy(const size_t num_matches) { - // dynamically adjust number of generated candidates based on number of matches - // we want to balance the benefits of getting candidates tokens correct with the - // cost of forecasting incorrect candidates tokens. - if (num_matches == num_pred_tokens) { - num_pred_tokens = std::min(num_pred_tokens + 2, max_pred_tokens); - } else { - num_pred_tokens = std::max(int64_t(num_pred_tokens) - 1, int64_t(1)); - } + const size_t num_prompts = result["num_prompts"].as(); + const bool dynamic_split_fuse = result["dynamic_split_fuse"].as(); + const std::string model_path = result["model"].as(); + const std::string draft_model_path = result["draft_model"].as(); + const std::string device = result["device"].as(); + const bool use_prefix = result["use_prefix"].as(); + + std::string prefix_str = + "You are an advanced language model designed to assist users by providing accurate, " + "relevant, and helpful information. Your responses should be accurate, concise, contextual, " + "respectful, and helpful. The request is: "; + + // create dataset + + std::vector prompt_examples = { + "What is OpenVINO?", + "How are you?", + "What is your name?", + "Tell me something about Canada", + "What is OpenVINO?", + }; + + auto generation_config = get_spec_decoding_generation_config_examples(); + auto default_config_size = generation_config.size(); + for (size_t i = default_config_size; i < num_prompts; ++i) { + generation_config.push_back(generation_config[i % default_config_size]); } - void update_kv_cache(const size_t seq_length) { - // this is the case when main model accepted all candidates from draft model - // we need to collect kv cache for out_of_kv_cache_token by infering it - // on next candidates generation cycle out_of_kv_cache_token will be prefixed - // to main models's latest out token - if (draft_model_seq_length < seq_length) { - return; - } - - out_of_kv_cache_token = -1; - ::update_kv_cache(draft_model, seq_len_axis, seq_length); - draft_model_seq_length = seq_length; + std::vector prompts(num_prompts); + for (size_t i = 0; i < num_prompts; ++i) { + prompts[i] = prompt_examples[i % prompt_examples.size()]; } -}; - -int64_t get_eos_token(const std::shared_ptr tokenizer) { - auto rt_info = tokenizer->get_rt_info(); // Get the runtime info for the model - auto it = rt_info.find("eos_token_id"); - if (it == rt_info.end()) { - throw std::runtime_error("EOS token ID not found in model's runtime information."); + // Perform the inference + auto get_default_block_size = [](const std::string& device) { + const size_t cpu_block_size = 32; + const size_t gpu_block_size = 16; + + bool is_gpu = device.find("GPU") != std::string::npos; + + return is_gpu ? gpu_block_size : cpu_block_size; + }; + + ov::genai::SchedulerConfig scheduler_config; + // batch size + scheduler_config.max_num_batched_tokens = use_prefix ? 256 : 32; + // cache params + scheduler_config.num_kv_blocks = 364; + scheduler_config.block_size = get_default_block_size(device); + // mode - vLLM or dynamic_split_fuse + scheduler_config.dynamic_split_fuse = dynamic_split_fuse; + // vLLM specific params + scheduler_config.max_num_seqs = 2; + scheduler_config.enable_prefix_caching = use_prefix; + + // It's possible to construct a Tokenizer from a different path. + // If the Tokenizer isn't specified, it's loaded from the same folder. + ov::genai::LLMPipeline pipe(model_path, device, ov::genai::draft_model(draft_model_path, device), ov::genai::scheduler_config(scheduler_config)); + + if (use_prefix) { + std::cout << "Running inference for prefix to compute the shared prompt's KV cache..." << std::endl; + auto generation_results = pipe.generate(prefix_str, ov::genai::greedy()); } - return it->second.as(); -} - -} // namespace - -int main(int argc, char* argv[]) try { - if (argc != 4) { - throw std::runtime_error(std::string{"Usage: "} + argv[0] + "
''"); - } - - // tokenizer model - ov::Core core; - core.add_extension(OPENVINO_TOKENIZERS_PATH); // OPENVINO_TOKENIZERS_PATH is defined in CMakeLists.txt - auto tokenizer_model = core.read_model(std::string{argv[1]} + "/openvino_tokenizer.xml"); - // tokenizer and detokenizer work on CPU only - ov::InferRequest tokenizer = core.compile_model(tokenizer_model, "CPU").create_infer_request(); - auto [input_ids, attention_mask] = tokenize(tokenizer, argv[3]); - ov::InferRequest detokenizer = - core.compile_model(std::string{argv[1]} + "/openvino_detokenizer.xml", "CPU").create_infer_request(); - TextStreamer text_streamer{std::move(detokenizer)}; - - // draft model (which is smaller, less accurate but faster) - std::shared_ptr ov_draft_model = core.read_model(std::string{argv[1]} + "/openvino_model.xml"); - - size_t draft_model_seq_len_axis = get_seq_len_axis(ov_draft_model); - - ov::InferRequest draft_model = core.compile_model(ov_draft_model, "CPU").create_infer_request(); - - size_t seq_len = input_ids.get_shape()[1]; - - // main model (which is bigger, more accurate but slower) - std::shared_ptr ov_main_model = core.read_model(std::string{argv[2]} + "/openvino_model.xml"); - - size_t main_model_seq_len_axis = get_seq_len_axis(ov_main_model); - - ov::InferRequest main_model = core.compile_model(ov_main_model, "CPU").create_infer_request(); - - size_t max_sequence_length = 100; - - AssistedCandidateGenerator candidateGenerator{draft_model, max_sequence_length, 5, draft_model_seq_len_axis}; - - main_model.set_tensor("input_ids", input_ids); - main_model.set_tensor("attention_mask", attention_mask); - - auto position_ids = main_model.get_tensor("position_ids"); - position_ids.set_shape(input_ids.get_shape()); - std::iota(position_ids.data(), position_ids.data() + position_ids.get_size(), 0); - - // set beam_idx for stateful model: no beam search is used and BATCH_SIZE = 1 - main_model.get_tensor("beam_idx").set_shape({BATCH_SIZE}); - main_model.get_tensor("beam_idx").data()[0] = 0; - - // To coollect kv-cache for the and to get the next token run the very first infer request - candidateGenerator.generate_next_token( - std::vector(input_ids.data(), input_ids.data() + input_ids.get_size())); - - main_model.infer(); - - size_t vocab_size = draft_model.get_tensor("logits").get_shape().back(); - OPENVINO_ASSERT(vocab_size == main_model.get_tensor("logits").get_shape().back(), - "vocab size should be the same for the both models"); - - // logits shape is [BATCH_SIZE, seq_len, vocab_size] - auto logits = main_model.get_tensor("logits"); - auto data_logits = logits.data() + (seq_len - 1) * vocab_size; - int64_t out_token = std::max_element(data_logits, data_logits + vocab_size) - data_logits; - - text_streamer.put(out_token); - - const int64_t EOS_TOKEN = get_eos_token(tokenizer_model); - - /* Speculative decoding works the following way. The draft model predicts the next K - tokens one by one in an autoregressive manner, while the main model validates these - predictions and corrects them if necessary. We go through each predicted token, and - if a difference is detected between the draft and main model, we stop and keep the - last token predicted by the main model. Then the draft model gets the latest main - prediction and again tries to predict the next K tokens, repeating the cycle. - - This approach reduces the need for multiple infer requests to the main model, - enhancing performance. For instance, in more predictable parts of text generation, - the draft model can, in best-case scenarios, generate the next K tokens that exactly - match the target. In that case they are validated in a single inference call to - the main model instead of running K subsequent requests. - */ - - while (out_token != EOS_TOKEN && seq_len < max_sequence_length) { - // generate candidates from the draft model - std::vector candidates = candidateGenerator.generate_candidates(out_token); - size_t candidates_size = candidates.size(); - - // For the main network, candidates_size + 1 tokens will be fed at once in a single infer request. - input_ids.set_shape({BATCH_SIZE, candidates_size + 1}); - - input_ids.data()[0] = out_token; - if (candidates_size > 0) { - std::copy_n(candidates.begin(), candidates_size, input_ids.data() + 1); - } - - attention_mask.set_shape({BATCH_SIZE, seq_len + candidates_size + 1}); - std::fill_n(attention_mask.data(), attention_mask.get_size(), 1); - - position_ids.set_shape({BATCH_SIZE, candidates_size + 1}); - std::iota(position_ids.data(), position_ids.data() + position_ids.get_size(), seq_len); - - main_model.infer(); - - data_logits = logits.data(); // [BATCH_SIZE, K, vocab_size] - - // match model tokens with candidate tokens - // 1. accept current out token (if not eos) - // 2. check if it matches apropriate candidate - // 2.1 if it's match, continue - accept next token - // 2.2 it it's mismatch, stop iteration but still accept current token as it was last token generated by - // model from a valid sequence. - size_t accepted_tokens_number = 0; - for (size_t i = 0; i < candidates_size + 1; i++) { - auto start = data_logits + vocab_size * i; - auto stop = data_logits + vocab_size * (i + 1); - out_token = std::max_element(start, stop) - start; - - if (out_token == EOS_TOKEN) { - break; - } - - text_streamer.put(out_token); - accepted_tokens_number++; - - if (i == candidates_size || out_token != candidates[i]) { - break; - } - } - - // After the inference request, key/values have shape [BATCH_SIZE, seq_len + K, vocab_size]. - // Increment the sequence length by the number of matched tokens, and - // trim the KV cache to match the new sequence length. - seq_len += accepted_tokens_number; - - if (accepted_tokens_number > 0) { - candidateGenerator.update_candidate_strategy(accepted_tokens_number - 1); - } - - candidateGenerator.update_kv_cache(seq_len); - update_kv_cache(main_model, main_model_seq_len_axis, seq_len); - candidates.clear(); + for (size_t request_id = 0; request_id < prompts.size(); ++request_id) { + ov::genai::DecodedResults generation_results = pipe.generate(prompts[request_id], generation_config[request_id]); + std::cout << "Question: " << prompts[request_id] << std::endl; + const std::vector& text_results = generation_results.texts; + const std::vector& log_prob_results = generation_results.scores; + print_generation_result(text_results, log_prob_results); + std::cout << std::endl; } - text_streamer.end(); - // Model is stateful which means that context (kv-cache) which belongs to a particular - // text sequence is accumulated inside the model during the generation loop above. - // This context should be reset before processing the next text sequence. - // While it is not required to reset context in this sample as only one sequence is processed, - // it is called for education purposes: - draft_model.reset_state(); - main_model.reset_state(); } catch (const std::exception& error) { try { std::cerr << error.what() << '\n'; diff --git a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp index efe4bc5e90..d1da15aa29 100644 --- a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp +++ b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp @@ -49,10 +49,19 @@ struct PipelineMetrics { }; class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { +protected: class ImplInterface; class ContinuousBatchingImpl; + class ContinuousBatchingForSpeculativeDecodingImpl; + class SpeculativeDecodingImpl; + + friend class ContinuousBatchingForSpeculativeDecodingImpl; + friend class SpeculativeDecodingImpl; + std::shared_ptr m_impl; + ContinuousBatchingPipeline() = default; + public: ContinuousBatchingPipeline(const std::string& models_path, const SchedulerConfig& scheduler_config, diff --git a/src/cpp/include/openvino/genai/generation_config.hpp b/src/cpp/include/openvino/genai/generation_config.hpp index a1244d3d75..ee6997fc2b 100644 --- a/src/cpp/include/openvino/genai/generation_config.hpp +++ b/src/cpp/include/openvino/genai/generation_config.hpp @@ -10,6 +10,7 @@ #include "openvino/runtime/compiled_model.hpp" #include "openvino/runtime/infer_request.hpp" #include "openvino/genai/tokenizer.hpp" +#include "openvino/genai/scheduler_config.hpp" #include "lora_adapter.hpp" namespace ov { @@ -65,6 +66,10 @@ enum class StopCriteria { EARLY, HEURISTIC, NEVER }; * @param presence_penalty reduces absolute log prob if the token was generated at least once. Ignored for non continuous batching. * @param frequency_penalty reduces absolute log prob as many times as the token was generated. Ignored for non continuous batching. * @param rng_seed initializes random generator. Ignored for non continuous batching. + * + * Speculative decoding parameters: + * @param assistant_confidence_threshold the lower token probability of candidate to be validated by main model in case of static strategy candidates number update. + * @param num_assistant_tokens the defined candidates number to be generated by draft model in case of dynamic strategy candidates number update. */ class OPENVINO_GENAI_EXPORTS GenerationConfig { @@ -103,6 +108,10 @@ class OPENVINO_GENAI_EXPORTS GenerationConfig { float frequency_penalty = 0.0f; size_t rng_seed = 0; + // Speculative decoding + float assistant_confidence_threshold = 0.f; + size_t num_assistant_tokens = 0; + // EOS special token int64_t eos_token_id = -1; @@ -118,6 +127,7 @@ class OPENVINO_GENAI_EXPORTS GenerationConfig { bool is_greedy_decoding() const; bool is_beam_search() const; bool is_multinomial() const; + bool is_speculative_decoding() const; void update_generation_config(const ov::AnyMap& config_map); template @@ -161,6 +171,9 @@ static constexpr ov::Property presence_penalty{"presence_penalty"}; static constexpr ov::Property frequency_penalty{"frequency_penalty"}; static constexpr ov::Property rng_seed{"rng_seed"}; +static constexpr ov::Property assistant_confidence_threshold{"assistant_confidence_threshold"}; +static constexpr ov::Property num_assistant_tokens{"num_assistant_tokens"}; + // Predefined Configs OPENVINO_GENAI_EXPORTS GenerationConfig beam_search(); OPENVINO_GENAI_EXPORTS GenerationConfig greedy(); diff --git a/src/cpp/include/openvino/genai/llm_pipeline.hpp b/src/cpp/include/openvino/genai/llm_pipeline.hpp index b21fb43bdb..f38f52f045 100644 --- a/src/cpp/include/openvino/genai/llm_pipeline.hpp +++ b/src/cpp/include/openvino/genai/llm_pipeline.hpp @@ -272,5 +272,12 @@ class OPENVINO_GENAI_EXPORTS LLMPipeline { OPENVINO_GENAI_EXPORTS std::pair streamer(StreamerVariant func); OPENVINO_GENAI_EXPORTS std::pair generation_config(const GenerationConfig& config); +OPENVINO_GENAI_EXPORTS +std::pair draft_model( + const std::string& model_path, + const std::string& device = "", + const ov::AnyMap& plugin_config = {}, + const ov::genai::SchedulerConfig& scheduler_config = {}); + } // namespace genai } // namespace ov diff --git a/src/cpp/src/block_manager.hpp b/src/cpp/src/block_manager.hpp index 555abbdbcb..f300e463c8 100644 --- a/src/cpp/src/block_manager.hpp +++ b/src/cpp/src/block_manager.hpp @@ -907,6 +907,26 @@ class BlockManager { return blocks_count; } + /** + * Clean up not busy physical KV cache blocks in a sequence group. + * @param seq_group Pointer to a sequence group. + */ + void free_empty_physical_blocks(SequenceGroup::Ptr seq_group) { + size_t num_logical_blocks = seq_group->get_num_logical_blocks(); + if (num_logical_blocks == 0) { + return; + } + for (const auto& sequence : seq_group->get_running_sequences()) { + auto seq_id = sequence->get_id(); + auto& block_table = m_block_table[seq_id]; + size_t num_physical_blocks = block_table[0].size(); + if (num_physical_blocks > num_logical_blocks) { + free_sequence_partially(seq_id, num_physical_blocks - num_logical_blocks); + } + } + } + + /** * Allocates just enough physical KV cache blocks to a sequence group to be enough for the sequences in it. If the sequences * in the group were forked before and their last block is a copy-on-write, then the block contents will have to be copied separately diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index dc5a74a475..9844354271 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -3,8 +3,8 @@ #include "text_callback_streamer.hpp" #include "continuous_batching_impl.hpp" -#include "paged_attention_transformations.hpp" #include "utils.hpp" +#include "utils/paged_attention_transformations.hpp" namespace ov::genai { template struct overloaded : Ts... {using Ts::operator()...;}; @@ -17,6 +17,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::ContinuousBatchingImpl( const std::string& device, const ov::AnyMap& plugin_config) { m_tokenizer = tokenizer; + ov::Core core; auto [core_plugin_config, compile_plugin_config] = ov::genai::utils::split_core_complile_config(plugin_config); @@ -28,9 +29,24 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::ContinuousBatchingImpl( DeviceConfig device_config(core, scheduler_config, device, compile_plugin_config); bool is_need_per_layer_cache_control = scheduler_config.use_cache_eviction; - apply_paged_attention_transformations(model, device_config, is_need_per_layer_cache_control); + utils::apply_paged_attention_transformations(model, device_config, is_need_per_layer_cache_control); + + init(model, scheduler_config, compile_plugin_config, device_config, core); +} - ov::InferRequest infer_request = core.compile_model(model, device_config.get_device(), compile_plugin_config).create_infer_request(); +void ContinuousBatchingPipeline::ContinuousBatchingImpl::_pull_awaiting_requests() { + std::lock_guard lock{m_awaiting_requests_mutex}; + m_requests.insert(m_requests.end(), m_awaiting_requests.begin(), m_awaiting_requests.end()); + m_awaiting_requests.clear(); +} + +void ContinuousBatchingPipeline::ContinuousBatchingImpl::init( + std::shared_ptr model, + const SchedulerConfig& scheduler_config, + const ov::AnyMap& plugin_config, + const DeviceConfig& device_config, + ov::Core& core) { + ov::InferRequest infer_request = core.compile_model(model, device_config.get_device(), plugin_config).create_infer_request(); // setup KV caches m_cache_manager = std::make_shared(device_config, core); @@ -62,9 +78,8 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::ContinuousBatchingImpl( } m_sampler = std::make_shared(m_tokenizer); m_sampler->set_seed(m_generation_config.rng_seed); +}; - // read default generation config -} GenerationHandle ContinuousBatchingPipeline::ContinuousBatchingImpl::add_request(uint64_t request_id, @@ -109,17 +124,15 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { step_timer.start(); // Pull awaiting requests - { - std::lock_guard lock{m_awaiting_requests_mutex}; - m_requests.insert(m_requests.end(), m_awaiting_requests.begin(), m_awaiting_requests.end()); - m_awaiting_requests.clear(); - } + _pull_awaiting_requests(); m_pipeline_metrics.requests = m_requests.size(); + Scheduler::Output scheduler_output; { static ManualTimer timer("scheduling"); timer.start(); + m_scheduler->clean_empty_blocks(m_requests); scheduler_output = m_scheduler->schedule(m_requests); m_pipeline_metrics.scheduled_requests = scheduler_output.m_scheduled_sequence_groups_ids.size(); m_pipeline_metrics.cache_usage = scheduler_output.m_cache_usage; @@ -190,7 +203,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { { static ManualTimer timer("sample"); timer.start(); - sampler_output = m_sampler->sample(m_requests, logits); + sampler_output = m_sampler->sample(m_requests, logits, m_is_validation_mode_enabled); timer.end(); } @@ -266,7 +279,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vectorfree_sequence(sequence->get_id()); } } - m_sampler->clear_beam_search_info(request->get_request_id()); + m_sampler->clear_request_info(request->get_request_id()); } m_requests.clear(); }; @@ -320,51 +333,6 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector -ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector& prompts, - std::vector sampling_params, - const StreamerVariant& streamer) { - std::vector input_ids; - static ManualTimer timer("tokenize"); - if (m_is_chat_conversation) { - OPENVINO_ASSERT(1 == prompts.size(), "Can't chat with multiple prompts"); - m_history.push_back({{"role", "user"}, {"content", prompts.at(0)}}); - constexpr bool add_generation_prompt = true; - std::string history = m_tokenizer.apply_chat_template(m_history, add_generation_prompt); - timer.start(); - // ov::genai::add_special_tokens(false) is aligned with stateful pipeline - input_ids.push_back(m_tokenizer.encode(history, ov::genai::add_special_tokens(false)).input_ids); - timer.end(); - } else { - input_ids.reserve(prompts.size()); - for (const std::string& prompt : prompts) { - timer.start(); - input_ids.push_back(m_tokenizer.encode(prompt).input_ids); - timer.end(); - } - } - std::vector encoded = generate(input_ids, sampling_params, streamer); - std::vector decoded; - decoded.reserve(encoded.size()); - for (EncodedGenerationResult& res : encoded) { - std::vector generated; - generated.reserve(res.m_generation_ids.size()); - for (size_t idx = 0; idx < res.m_generation_ids.size(); ++idx) { - generated.push_back(m_tokenizer.decode(res.m_generation_ids.at(idx))); - if (m_is_chat_conversation && 0 == idx) { - m_history.push_back({{"role", "assistant"}, {"content", generated.back()}}); - } - } - decoded.push_back(GenerationResult{ - res.m_request_id, - std::move(generated), - std::move(res.m_scores), - res.m_status - }); - } - return decoded; -} - void ContinuousBatchingPipeline::ContinuousBatchingImpl::_free_non_running_requests() { std::vector::iterator requests_iterator = m_requests.begin(); while (requests_iterator != m_requests.end()) { @@ -375,7 +343,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::_free_non_running_reque m_scheduler->free_sequence(sequence->get_id()); } } - m_sampler->clear_beam_search_info(request->get_request_id()); + m_sampler->clear_request_info(request->get_request_id()); requests_iterator = m_requests.erase(requests_iterator); } else { requests_iterator++; @@ -440,4 +408,5 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::maybe_evict_cache_block seq_group_ptr->register_token_eviction(num_blocks_evicted * sched_config.block_size); } } + } diff --git a/src/cpp/src/continuous_batching_impl.hpp b/src/cpp/src/continuous_batching_impl.hpp index 0d170e07ed..c9b606fb42 100644 --- a/src/cpp/src/continuous_batching_impl.hpp +++ b/src/cpp/src/continuous_batching_impl.hpp @@ -26,11 +26,17 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc static const size_t AVG_CACHE_USAGE_WINDOW_SIZE_IN_STEPS = 1000; std::deque m_previous_step_cache_usages; + + // flag to enable validation mode for sampler + bool m_is_validation_mode_enabled = false; #ifdef DEBUG_CACHE_STATE_DUMP size_t step_count = 0; #endif + // used by tests only + ContinuousBatchingImpl() = default; + void _free_non_running_requests(); void _notify_requests_dropped_by_handle(); void _register_step_cache_usage(float step_cache_usage); @@ -38,6 +44,15 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc float _get_current_running_average_cache_usage() const; void maybe_evict_cache_blocks(const SchedulerConfig& sched_config); + + void init(std::shared_ptr model, + const SchedulerConfig& scheduler_config, + const ov::AnyMap& plugin_config, + const DeviceConfig& device_config, + ov::Core& core); + + void _pull_awaiting_requests(); + public: ContinuousBatchingImpl(const std::string& models_path, const Tokenizer& tokenizer, @@ -72,9 +87,5 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) override; - std::vector - generate(const std::vector& prompts, - std::vector sampling_params, - const StreamerVariant& streamer) override; }; } \ No newline at end of file diff --git a/src/cpp/src/continuous_batching_impl_interface.cpp b/src/cpp/src/continuous_batching_impl_interface.cpp index 7f7db465fc..c5a2fb087b 100644 --- a/src/cpp/src/continuous_batching_impl_interface.cpp +++ b/src/cpp/src/continuous_batching_impl_interface.cpp @@ -27,4 +27,49 @@ void ContinuousBatchingPipeline::ImplInterface::finish_chat() { m_is_chat_conversation = false; m_history.clear(); }; +std::vector +ContinuousBatchingPipeline::ImplInterface::generate( + const std::vector& prompts, + std::vector sampling_params, + const StreamerVariant& streamer) { + std::vector input_ids; + static ManualTimer timer("tokenize"); + if (m_is_chat_conversation) { + OPENVINO_ASSERT(1 == prompts.size(), "Can't chat with multiple prompts"); + m_history.push_back({{"role", "user"}, {"content", prompts.at(0)}}); + constexpr bool add_generation_prompt = true; + std::string history = m_tokenizer.apply_chat_template(m_history, add_generation_prompt); + timer.start(); + // ov::genai::add_special_tokens(false) is aligned with stateful pipeline + input_ids.push_back(m_tokenizer.encode(history, ov::genai::add_special_tokens(false)).input_ids); + timer.end(); + } else { + input_ids.reserve(prompts.size()); + for (const std::string& prompt : prompts) { + timer.start(); + input_ids.push_back(m_tokenizer.encode(prompt).input_ids); + timer.end(); + } + } + std::vector encoded = generate(input_ids, sampling_params, streamer); + std::vector decoded; + decoded.reserve(encoded.size()); + for (EncodedGenerationResult& res : encoded) { + std::vector generated; + generated.reserve(res.m_generation_ids.size()); + for (size_t idx = 0; idx < res.m_generation_ids.size(); ++idx) { + generated.push_back(m_tokenizer.decode(res.m_generation_ids.at(idx))); + if (m_is_chat_conversation && 0 == idx) { + m_history.push_back({{"role", "assistant"}, {"content", generated.back()}}); + } + } + decoded.push_back(GenerationResult{ + res.m_request_id, + std::move(generated), + std::move(res.m_scores), + res.m_status + }); + } + return decoded; +} } \ No newline at end of file diff --git a/src/cpp/src/continuous_batching_impl_interface.hpp b/src/cpp/src/continuous_batching_impl_interface.hpp index a3615b5828..eddb07a8b4 100644 --- a/src/cpp/src/continuous_batching_impl_interface.hpp +++ b/src/cpp/src/continuous_batching_impl_interface.hpp @@ -58,10 +58,10 @@ class ContinuousBatchingPipeline::ImplInterface { generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) = 0; - virtual std::vector + std::vector generate(const std::vector& prompts, std::vector sampling_params, - const StreamerVariant& streamer) = 0; + const StreamerVariant& streamer); void start_chat(const std::string& system_message); void finish_chat(); diff --git a/src/cpp/src/continuous_batching_pipeline.cpp b/src/cpp/src/continuous_batching_pipeline.cpp index 1cfee51652..b0ce404fa2 100644 --- a/src/cpp/src/continuous_batching_pipeline.cpp +++ b/src/cpp/src/continuous_batching_pipeline.cpp @@ -10,18 +10,36 @@ #include "openvino/genai/generation_handle.hpp" #include "openvino/genai/tokenizer.hpp" #include "continuous_batching_impl.hpp" +#include "speculative_decoding/speculative_decoding_impl.hpp" #include "timer.hpp" +#include "utils.hpp" #include "debug_utils.hpp" #include "cache_state_dumper.hpp" using namespace ov::genai; +inline ov::genai::ModelDesc +extract_draft_model_from_config(ov::AnyMap& config) { + ov::genai::ModelDesc draft_model(""); + if (config.find(utils::DRAFT_MODEL_ARG_NAME) != config.end()) { + draft_model = config.at(utils::DRAFT_MODEL_ARG_NAME).as(); + config.erase(utils::DRAFT_MODEL_ARG_NAME); + } + return draft_model; +} + ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::string& models_path, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& llm_plugin_config, const ov::AnyMap& tokenizer_plugin_config) { - m_impl = std::make_shared(models_path, scheduler_config, device, llm_plugin_config, tokenizer_plugin_config); + auto llm_plugin_config_without_draft_model = llm_plugin_config; + auto draft_model = extract_draft_model_from_config(llm_plugin_config_without_draft_model); + if (draft_model.model_path.empty()) { + m_impl = std::make_shared(models_path, scheduler_config, device, llm_plugin_config, tokenizer_plugin_config); + } else { + m_impl = std::make_shared(models_path, scheduler_config, device, llm_plugin_config_without_draft_model, draft_model, tokenizer_plugin_config); + } } ContinuousBatchingPipeline::ContinuousBatchingPipeline( @@ -30,7 +48,13 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& plugin_config) { - m_impl = std::make_shared(model_path, tokenizer, scheduler_config, device, plugin_config); + auto plugin_config_without_draft_model = plugin_config; + auto draft_model = extract_draft_model_from_config(plugin_config_without_draft_model); + if (draft_model.model_path.empty()) { + m_impl = std::make_shared(model_path, tokenizer, scheduler_config, device, plugin_config); + } else { + m_impl = std::make_shared(model_path, scheduler_config, device, plugin_config_without_draft_model, draft_model); + } } ov::genai::Tokenizer ContinuousBatchingPipeline::get_tokenizer() { diff --git a/src/cpp/src/generation_config.cpp b/src/cpp/src/generation_config.cpp index 384a50385f..f4d0c60a7e 100644 --- a/src/cpp/src/generation_config.cpp +++ b/src/cpp/src/generation_config.cpp @@ -116,6 +116,10 @@ bool GenerationConfig::is_multinomial() const { return do_sample; } +bool GenerationConfig::is_speculative_decoding() const { + return assistant_confidence_threshold > 0 || num_assistant_tokens > 0; +} + void GenerationConfig::validate() const { OPENVINO_ASSERT(!do_sample || num_beams == 1, "Beam search with sampling is not supported yet. " @@ -159,6 +163,13 @@ void GenerationConfig::validate() const { OPENVINO_ASSERT(frequency_penalty >= -2.0f && frequency_penalty <= 2.0f, "frequence_penalty penalty must be a [-2; +2]"); OPENVINO_ASSERT(presence_penalty >= -2.0f && presence_penalty <= 2.0f, "presence_penalty penalty must be a [-2; +2]"); } + if (is_speculative_decoding()) { + if (assistant_confidence_threshold != 0.f) { + OPENVINO_ASSERT(num_assistant_tokens == 0); + } else { + OPENVINO_ASSERT(num_assistant_tokens > 0); + }; + } } GenerationConfig beam_search() { @@ -190,5 +201,6 @@ GenerationConfig multinomial() { multinomial_config.max_new_tokens = 30; return multinomial_config; } + } // namespace genai } // namespace ov diff --git a/src/cpp/src/generation_handle.cpp b/src/cpp/src/generation_handle.cpp index 8bf838ef9e..31c110d961 100644 --- a/src/cpp/src/generation_handle.cpp +++ b/src/cpp/src/generation_handle.cpp @@ -44,8 +44,12 @@ void add_partial_result(std::unordered_map& partial_ if (partial_result_iter == partial_results.end()) { partial_results.emplace(iteration_result.first, iteration_result.second); } else { - partial_result_iter->second.generated_ids.push_back(iteration_result.second.generated_ids[0]); - partial_result_iter->second.generated_log_probs.push_back(iteration_result.second.generated_log_probs[0]); + auto generated_len = iteration_result.second.generated_ids.size(); + OPENVINO_ASSERT(generated_len == iteration_result.second.generated_log_probs.size()); + for (size_t i = 0; i < generated_len; ++i) { + partial_result_iter->second.generated_ids.push_back(iteration_result.second.generated_ids[i]); + partial_result_iter->second.generated_log_probs.push_back(iteration_result.second.generated_log_probs[i]); + } partial_result_iter->second.score = iteration_result.second.score; partial_result_iter->second.finish_reason = iteration_result.second.finish_reason; } diff --git a/src/cpp/src/llm_pipeline.cpp b/src/cpp/src/llm_pipeline.cpp index e3815e5944..b954f7dac4 100644 --- a/src/cpp/src/llm_pipeline.cpp +++ b/src/cpp/src/llm_pipeline.cpp @@ -17,6 +17,7 @@ #include "text_callback_streamer.hpp" #include "openvino/genai/lora_adapter.hpp" #include "lora_helper.hpp" +#include "speculative_decoding/speculative_decoding_impl.hpp" namespace ov { namespace genai { @@ -367,6 +368,14 @@ std::pair generation_config(const GenerationConfig& config) { return {utils::CONFIG_ARG_NAME, Any::make(config)}; } +std::pair draft_model( + const std::string& model_path, + const std::string& device, + const ov::AnyMap& plugin_config, + const ov::genai::SchedulerConfig& scheduler_config) { + return { utils::DRAFT_MODEL_ARG_NAME, Any::make(model_path, device, plugin_config, scheduler_config) }; +} + } // namespace genai } // namespace ov diff --git a/src/cpp/src/logit_processor.hpp b/src/cpp/src/logit_processor.hpp index 2e904cb023..f945fe4cc3 100644 --- a/src/cpp/src/logit_processor.hpp +++ b/src/cpp/src/logit_processor.hpp @@ -310,6 +310,10 @@ class LogitProcessor { std::shared_ptr> m_unique_prompt_token_ids = std::shared_ptr>(new std::set); size_t m_generated_tokens = 0; + // speculative decoding parameters + float m_assistant_confidence_threshold = 0.f; + + public: LogitProcessor(const ov::genai::GenerationConfig& sampling_params, const LogitTransformers::TokenIds& input_ids) { @@ -354,9 +358,16 @@ class LogitProcessor { m_logit_transformers.emplace_back(new LogitTransformers::TopKFilter(sampling_params.top_k)); } } + if (sampling_params.assistant_confidence_threshold > 0) { + m_assistant_confidence_threshold = sampling_params.assistant_confidence_threshold; + } } } + float get_assistant_confidence_threshold() { + return m_assistant_confidence_threshold; + } + void apply(Logits& logits) { for (const auto& transformer : m_logit_transformers) { if (transformer->is_applicable(m_generated_tokens)) { diff --git a/src/cpp/src/paged_attention_transformations.hpp b/src/cpp/src/paged_attention_transformations.hpp deleted file mode 100644 index a7bce23757..0000000000 --- a/src/cpp/src/paged_attention_transformations.hpp +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright (C) 2023-2024 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 - -#pragma once - -#include "openvino/core/model.hpp" -#include "device_config.hpp" - -namespace ov::genai { -void apply_paged_attention_transformations(std::shared_ptr model, DeviceConfig& device_config, bool per_layer_cache_control = false); -} \ No newline at end of file diff --git a/src/cpp/src/sampler.cpp b/src/cpp/src/sampler.cpp index 4885a21b8f..a0490816e9 100644 --- a/src/cpp/src/sampler.cpp +++ b/src/cpp/src/sampler.cpp @@ -548,7 +548,7 @@ std::vector Sampler::_try_finish_generation(SequenceGroup::Ptr & sequen std::vector dropped_seq_ids; for (auto& running_sequence : sequence_group->get_running_sequences()) { const auto generated_len = running_sequence->get_generated_len(); - if (sampling_params.max_new_tokens == generated_len || + if (sampling_params.max_new_tokens <= generated_len || is_stop_token_id_hit(running_sequence->get_generated_ids().back(), sampling_params.stop_token_ids) && !sampling_params.ignore_eos) { // stop sequence by max_new_tokens or stop token (eos included) running_sequence->set_status(SequenceStatus::FINISHED); @@ -581,19 +581,15 @@ void register_new_token(const Token& sampled_token_id, Sequence::Ptr running_sequence, LogitProcessor& logit_processor, bool is_extend_sequence, - bool is_update_len_logit_processor) { + bool is_validation_mode_enabled) { logit_processor.register_new_generated_token(sampled_token_id.m_index); - size_t generated_len = logit_processor.get_generated_len(); if (is_extend_sequence) { running_sequence->append_token(sampled_token_id.m_index, sampled_token_id.m_log_prob); - } else { - // just update the token log prob in case of successfully validated token - OPENVINO_ASSERT(generated_len < running_sequence->get_generated_len()); - running_sequence->update_generated_log_prob(generated_len, sampled_token_id.m_log_prob); } - // increment seq len only for one sequence in sequence group to sync them - if (is_update_len_logit_processor) { - logit_processor.update_generated_len(++generated_len); + if (!is_validation_mode_enabled && + std::fabs(sampled_token_id.m_log_prob) < logit_processor.get_assistant_confidence_threshold()) { + auto sequence_group = running_sequence->get_sequence_group_ptr(); + sequence_group->pause_generation(true); } }; @@ -612,7 +608,11 @@ create_n_forked_sequences(SequenceGroup::Ptr sequence_group, const std::vector& sampled_tokens) { const auto& running_sequences = sequence_group->get_running_sequences(); OPENVINO_ASSERT(running_sequences.size() == 1); - Sequence::Ptr sequence_to_fork = running_sequences.front(); + Sequence::Ptr sequence_to_fork = running_sequences[0]; + if (sequence_to_fork->get_generated_len() > 0) { + logit_processor.update_generated_len(0); + sequence_to_fork->remove_last_tokens(sequence_to_fork->get_generated_len()); + } std::list forked_seq_ids; for (size_t i = 1; i < sampled_tokens.size(); ++i) { const auto forked_sequence = sequence_group->fork_sequence(sequence_to_fork); @@ -623,17 +623,33 @@ create_n_forked_sequences(SequenceGroup::Ptr sequence_group, return forked_seq_ids; } -bool -is_continue_to_sample_tokens(Sequence::Ptr running_sequence, - size_t token_idx, - size_t max_gen_len, - size_t& decrease_context_len_per_seq_group) { - if (max_gen_len == 0) { - running_sequence->remove_last_tokens(token_idx); - decrease_context_len_per_seq_group = std::max(decrease_context_len_per_seq_group, token_idx); - return false; +void +stop_sample_tokens(Sequence::Ptr running_sequence, + size_t token_idx, + size_t max_gen_len, + size_t& max_removed_tokens_per_request) { + running_sequence->remove_last_tokens(token_idx); + max_removed_tokens_per_request = std::max(max_removed_tokens_per_request, token_idx); + running_sequence->set_status(SequenceStatus::FINISHED); + running_sequence->set_finish_reason(GenerationFinishReason::STOP); +} + +void +align_all_sequence_len(SequenceGroup::Ptr& sequence_group, + size_t min_generated_tokens, + LogitProcessor& logit_processor) { + for (auto& sequence : sequence_group->get_running_sequences()) { + const auto generated_token_ids = sequence->get_generated_ids(); + auto generated_len = sequence->get_generated_len(); + if (generated_len > min_generated_tokens) { + auto removed_token_cnt = generated_len - min_generated_tokens; + for (size_t i = min_generated_tokens + 1; i < generated_len; ++i) { + logit_processor.decrease_generated_token_occurance(generated_token_ids[i]); + } + sequence->remove_last_tokens(removed_token_cnt); + } } - return true; + logit_processor.update_generated_len(min_generated_tokens); } bool @@ -641,7 +657,7 @@ validate_candidate(Sequence::Ptr running_sequence, size_t& token_idx, Token& sampled_token, bool& is_extend_sequence, - size_t& decrease_context_len_per_seq_group) { + size_t& max_removed_tokens) { if (token_idx > 0) { const auto& generated_tokens = running_sequence->get_generated_ids(); auto it = generated_tokens.rbegin(); @@ -649,7 +665,7 @@ validate_candidate(Sequence::Ptr running_sequence, // to validate candidates from assisting model and remove incorrect ones from generated sequence if (*it != sampled_token.m_index) { running_sequence->remove_last_tokens(token_idx); - decrease_context_len_per_seq_group = std::max(decrease_context_len_per_seq_group, token_idx); + max_removed_tokens = std::max(max_removed_tokens, token_idx); is_extend_sequence = true; return false; } else { @@ -687,7 +703,7 @@ SamplerOutput Sampler::sample(std::vector & sequence_groups, const void * sequence_group_logits_data = logits_data + vocab_size * currently_processed_tokens; ov::Tensor sequence_group_logits(ov::element::f32, ov::Shape{num_running_sequences, actual_seq_len, vocab_size}, (void *)sequence_group_logits_data); - size_t decrease_context_len_per_seq_group = 0; + size_t max_removed_tokens_per_request = 0, min_generated_len = std::numeric_limits::max(); if (sequence_group->requires_sampling()) { // get number of token to be validated auto num_tokens_to_process = sequence_group->get_num_tokens_to_validate(); @@ -703,8 +719,12 @@ SamplerOutput Sampler::sample(std::vector & sequence_groups, // calculate token offset from the end of logit size_t token_offset = num_tokens_to_process - i; // max counter of needed to be sampled tokens - size_t max_num_sampled_token = sampling_params.max_new_tokens + token_offset - running_sequence->get_generated_len(); - if (!is_continue_to_sample_tokens(running_sequence, token_offset, max_num_sampled_token, decrease_context_len_per_seq_group)) { + OPENVINO_ASSERT(running_sequence->get_generated_len() >= token_offset); + size_t generated_and_verified_len = running_sequence->get_generated_len() - token_offset; + OPENVINO_ASSERT(sampling_params.max_new_tokens >= generated_and_verified_len); + size_t max_num_sampled_token = sampling_params.max_new_tokens - generated_and_verified_len; + if (max_num_sampled_token == 0) { + stop_sample_tokens(running_sequence, token_offset, max_num_sampled_token, max_removed_tokens_per_request); break; } @@ -719,14 +739,17 @@ SamplerOutput Sampler::sample(std::vector & sequence_groups, logit_processor.apply(logit_vector); Token sampled_token_id; + bool is_generate_n_tokens = false; if (sampling_params.is_greedy_decoding()) { - sampled_token_id = _greedy_sample(logit_vector); + sampled_token_id = { _greedy_sample(logit_vector) }; } else { // is_multinomial() - const bool is_generate_n_tokens = sequence_group->num_total_seqs() == 1; + is_generate_n_tokens = sequence_group->num_total_seqs() == 1; const size_t num_tokens_per_sequence = is_generate_n_tokens ? sampling_params.num_return_sequences : 1; + is_generate_n_tokens &= (num_tokens_per_sequence > 1); auto sampled_token_ids = _multinomial_sample(logit_vector, num_tokens_per_sequence); OPENVINO_ASSERT(sampled_token_ids.size(), num_tokens_per_sequence); + // to create n sequence just in case of `sequence_group->num_total_seqs() == 1` and `sampling_params.num_return_sequences > 1` if (is_generate_n_tokens) { const auto forked_seq_ids = create_n_forked_sequences(sequence_group, logit_processor, sampled_token_ids); sampler_output.m_forked_sequences.insert({running_sequences[0]->get_id(), forked_seq_ids}); @@ -734,19 +757,23 @@ SamplerOutput Sampler::sample(std::vector & sequence_groups, sampled_token_id = sampled_token_ids.front(); } // flag to add sampled token to generated sequence or extend logit processors only - bool is_extend_sequence = token_offset == 0, - // flag to update generated length of sequence group in logit processor - is_update_len_logit_processor = running_sequence_id == num_running_sequences - 1, + bool is_extend_sequence = token_offset == 0 || is_generate_n_tokens, is_validation_passed = true; - if (is_validation_mode_enabled) { - is_validation_passed = validate_candidate(running_sequences[running_sequence_id], token_offset, sampled_token_id, is_extend_sequence, decrease_context_len_per_seq_group); + if (is_validation_mode_enabled && !is_generate_n_tokens) { + is_validation_passed = validate_candidate(running_sequences[running_sequence_id], token_offset, sampled_token_id, is_extend_sequence, max_removed_tokens_per_request); + // update log prob just while validation process + if (!is_extend_sequence) { + OPENVINO_ASSERT(generated_and_verified_len < running_sequences[running_sequence_id]->get_generated_len()); + running_sequence->update_generated_log_prob(generated_and_verified_len, sampled_token_id.m_log_prob); + } } - register_new_token(sampled_token_id, running_sequences[running_sequence_id], logit_processor, is_extend_sequence, is_update_len_logit_processor); + register_new_token(sampled_token_id, running_sequences[running_sequence_id], logit_processor, is_extend_sequence, is_validation_mode_enabled); // to exit from sampling in case of failed token validation if (!is_validation_passed) { break; } } + min_generated_len = std::min(min_generated_len, running_sequence->get_generated_len()); } for (const auto& dropped_seq_id : _try_finish_generation(sequence_group)) { sampler_output.m_dropped_sequences.push_back(dropped_seq_id); @@ -779,32 +806,36 @@ SamplerOutput Sampler::sample(std::vector & sequence_groups, // NOTE: it should be before 'get_num_scheduled_tokens' is used // update internal state of sequence group to reset scheduler tokens and update currently processed ones + auto min_validated_tokens = sequence_group->get_num_tokens_to_validate() - max_removed_tokens_per_request; sequence_group->finish_iteration(); // decrease sequence_group context in case of candidates generated by draft_model were not accepted by main_model - if (decrease_context_len_per_seq_group) { - const auto num_processed_tokens = sequence_group->get_num_processed_tokens(); - OPENVINO_ASSERT(num_processed_tokens >= decrease_context_len_per_seq_group); - OPENVINO_ASSERT(sequence_group->get_context_len() >= decrease_context_len_per_seq_group); - sequence_group->update_processed_tokens_num(num_processed_tokens - decrease_context_len_per_seq_group); + if (max_removed_tokens_per_request) { + align_all_sequence_len(sequence_group, min_generated_len, logit_processor); + auto min_processed_tokens = sequence_group->get_prompt_len() + min_generated_len - 1; + sequence_group->update_processed_tokens_num(min_processed_tokens); + logit_processor.update_generated_len(min_processed_tokens); } // accumulate a number of processed tokens - currently_processed_tokens += (padded_amount_of_processed_tokens - decrease_context_len_per_seq_group) * num_running_sequences; + currently_processed_tokens += padded_amount_of_processed_tokens * num_running_sequences; } return sampler_output; } -void Sampler::update_logit_processor(uint64_t request_id, uint64_t token_id) { +LogitProcessor& Sampler::get_logit_processor(uint64_t request_id) { OPENVINO_ASSERT(m_logit_processors.count(request_id)); - auto& logit_processor = m_logit_processors.at(request_id); - logit_processor.decrease_generated_token_occurance(token_id); - auto gen_size = logit_processor.get_generated_len(); - logit_processor.update_generated_len(gen_size - 1); + return m_logit_processors.at(request_id); +} + + +void Sampler::create_logit_processor(uint64_t request_id, const GenerationConfig& sampling_params, const TokenIds& prompt) { + m_logit_processors.insert({request_id, LogitProcessor(sampling_params, prompt)}); } -void Sampler::clear_beam_search_info(uint64_t request_id) { +void Sampler::clear_request_info(uint64_t request_id) { m_beam_search_info.erase(request_id); + m_logit_processors.erase(request_id); } int64_t Sampler::GroupBeamSearcher::Group::finish(Beam beam, const ov::genai::GenerationConfig& sampling_params) { diff --git a/src/cpp/src/sampler.hpp b/src/cpp/src/sampler.hpp index 13933e0b75..0f4ef93927 100644 --- a/src/cpp/src/sampler.hpp +++ b/src/cpp/src/sampler.hpp @@ -47,7 +47,6 @@ class Sampler { Token _greedy_sample(const Logits& logits) const; std::vector _multinomial_sample(const Logits& logits, size_t num_tokens_per_sequence); std::vector _try_finish_generation(SequenceGroup::Ptr & sequence_group); - void update_logit_processor(uint64_t request_id, uint64_t token_id); // request ID => beam search tracking information std::map m_beam_search_info; @@ -64,7 +63,11 @@ class Sampler { SamplerOutput sample(std::vector & sequence_groups, ov::Tensor logits, bool is_validation_mode_enabled = false); void set_seed(size_t seed) { rng_engine.seed(seed); } - void clear_beam_search_info(uint64_t request_id); + void clear_request_info(uint64_t request_id); + + LogitProcessor& get_logit_processor(uint64_t request_id); + void create_logit_processor(uint64_t request_id, const GenerationConfig& sampling_parameters, const TokenIds& prompt); + std::vector get_beam_idxs(SequenceGroup::CPtr sequence_group); }; diff --git a/src/cpp/src/scheduler.hpp b/src/cpp/src/scheduler.hpp index e53d4c14bc..c8bec64e48 100644 --- a/src/cpp/src/scheduler.hpp +++ b/src/cpp/src/scheduler.hpp @@ -70,6 +70,11 @@ class Scheduler { return scheduler_output; } + void clean_empty_blocks(std::vector& seq_groups) { + for (const auto& seq_group : seq_groups) + m_block_manager.free_empty_physical_blocks(seq_group); + } + const std::vector& get_block_tables(const Sequence& seq) const { return m_block_manager.get_block_tables(seq.get_id()); } @@ -342,7 +347,6 @@ class Scheduler { // here we also assume that sequence must be scheduler in a single shot and has no already generated context if (!m_config.enable_prefix_caching) OPENVINO_ASSERT(sequence_group->get_context_len() == 0); - size_t num_available_tokens_in_megabatch = m_config.max_num_batched_tokens - scheduler_output.m_total_num_scheduled_tokens; size_t sequence_len = sequence_group->get_num_available_tokens_for_batching(); @@ -367,7 +371,6 @@ class Scheduler { { Sequence::Ptr sequence = (*sequence_group)[0]; uint64_t seq_id = sequence->get_id(); - // and schedule tokens sequence_group->schedule_tokens(sequence_len); @@ -377,6 +380,7 @@ class Scheduler { // add information to scheduler_output { scheduler_output.m_scheduled_sequence_groups_ids.push_back(sequence_group_id); + uint64_t seq_id = sequence_group->get_running_sequences()[0]->get_id(); scheduler_output.m_block_tables[seq_id] = m_block_manager.get_block_tables(seq_id); scheduler_output.m_total_num_scheduled_tokens += sequence_len; } diff --git a/src/cpp/src/sequence_group.hpp b/src/cpp/src/sequence_group.hpp index 5c87e8ebfa..c86f3b5a3c 100644 --- a/src/cpp/src/sequence_group.hpp +++ b/src/cpp/src/sequence_group.hpp @@ -126,12 +126,22 @@ class Sequence { } } - GenerationOutput get_last_generation_output() { + GenerationOutput get_last_generation_output(size_t token_cnt = 1) { GenerationOutput output; OPENVINO_ASSERT(m_generated_ids.size()); output.score = get_cumulative_log_probs(); - output.generated_ids = std::vector {m_generated_ids.back()}; - output.generated_log_probs = std::vector {m_generated_log_probs.back()}; + + auto generated_token_id = get_generated_ids(); + auto generated_log_probs = get_generated_log_probs(); + + OPENVINO_ASSERT(get_generated_len() >= token_cnt); + auto offset = get_generated_len() - token_cnt; + + std::vector token_id(generated_token_id.begin() + offset, generated_token_id.end()); + std::vector log_probs(generated_log_probs.begin() + offset, generated_log_probs.end()); + + output.generated_ids = token_id; + output.generated_log_probs = log_probs; output.finish_reason = get_finish_reason(); return output; } @@ -205,7 +215,9 @@ class SequenceGroup { // context length of longest sequence within a group size_t m_max_content_len = 0; // max validation length within a group to check generated tokens - size_t m_num_validated_tokens = 0; + size_t m_num_validation_tokens = 0; + // flag to enable/disable token generation, e.g. in speculative decoding scenario + bool m_is_gen_paused = false; SequenceGroup(uint64_t request_id, const ov::genai::GenerationConfig& sampling_params, std::size_t block_size, bool enable_prefix_caching) @@ -248,9 +260,13 @@ class SequenceGroup { return m_prompt_ids.size(); } + void pause_generation(bool status) { + m_is_gen_paused = status; + } + // a sequence group can generate new tokens if it already proccessed m_max_content_len before bool can_generate_tokens() const { - return m_max_content_len >= get_prompt_len(); + return m_max_content_len + m_num_validation_tokens >= get_prompt_len() && !m_is_gen_paused; } Sequence::Ptr operator[] (size_t index) { @@ -420,7 +436,7 @@ class SequenceGroup { void clear_scheduled_tokens() { m_num_scheduled_tokens = 0; - m_num_validated_tokens = 0; + m_num_validation_tokens = 0; } bool is_scheduled() const { @@ -429,12 +445,12 @@ class SequenceGroup { void set_num_validated_tokens(size_t k) { // in case of non-prompt we need to take prev tokens + token to validate - // m_num_validated_tokens = get_num_processed_tokens() ? k + 1 : k; - m_num_validated_tokens = k; + // m_num_validation_tokens = get_num_processed_tokens() ? k + 1 : k; + m_num_validation_tokens = k; } size_t get_num_tokens_to_validate() { - return m_num_validated_tokens; + return m_num_validation_tokens; } size_t get_num_available_tokens_for_batching() const { @@ -442,7 +458,7 @@ class SequenceGroup { OPENVINO_ASSERT(get_num_scheduled_tokens() == 0, "Internal error: this function cannot be called when we are already in scheduling phase"); // if sequence group has not finished, it has at least one token to process size_t num_available_tokens = std::max(get_prompt_len(), m_max_content_len); - return std::max(num_available_tokens - m_num_processed_tokens, 1u) + m_num_validated_tokens; + return std::max(num_available_tokens - m_num_processed_tokens, 1u) + m_num_validation_tokens; } // mark current schedule phase as finished and updates internal counters @@ -529,7 +545,7 @@ class SequenceGroup { return true; } } - return false; + return m_is_gen_paused; } void set_sequence_group_ptr(std::shared_ptr sequence_group) { @@ -567,13 +583,15 @@ class SequenceGroup { m_generation_stream->push(std::move(outputs)); } - void push_partial_outputs() { + void push_partial_outputs(size_t token_cnt = 1) { GenerationOutputs outputs; for (auto& sequence : m_sequences) { // todo: check seq.is_finished() to generate without several // or is it ok to use padding? - const auto last_gen_token = sequence->get_last_generation_output(); - outputs.emplace(sequence->get_grouped_id(), last_gen_token); + for (size_t i = 0; i < token_cnt; ++i) { + auto last_gen_token = sequence->get_last_generation_output(token_cnt); + outputs.emplace(sequence->get_grouped_id(), last_gen_token); + } } m_generation_stream->push(std::move(outputs)); } @@ -592,8 +610,14 @@ class SequenceGroup { } else if (m_sampling_params.is_greedy_decoding() || m_sampling_params.is_multinomial()) { // We can stream only when one sequence is returned and we don't use stop strings that would be excluded from the output // (after stop string is detected its tokens are already sent) - if (num_total_seqs() == 1&& (m_sampling_params.stop_strings.empty() || m_sampling_params.include_stop_str_in_output)) { - push_partial_outputs(); + if (num_total_seqs() == 1 && + (m_sampling_params.stop_strings.empty() || m_sampling_params.include_stop_str_in_output)) { + auto previous_step_gen_len = get_num_processed_tokens() > 0 ? get_num_processed_tokens() - get_prompt_len() + 1 : 0; + auto generation_len = m_sequences.front()->get_generated_len(); + if (previous_step_gen_len < generation_len) { + auto token_to_print = generation_len - previous_step_gen_len; + push_partial_outputs(token_to_print); + } } else if (has_finished() || out_of_memory()) { push_outputs(); } diff --git a/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp new file mode 100644 index 0000000000..d4e95fcdae --- /dev/null +++ b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp @@ -0,0 +1,318 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include "continuous_batching_for_speculative_decoding_impl.hpp" + +namespace ov::genai { +ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::ContinuousBatchingForSpeculativeDecodingImpl( + ov::Core& core, + const std::shared_ptr& model, + const Tokenizer& tokenizer, + const DeviceConfig& device_config, + const SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config, + bool is_validation_mode_enabled) { + m_tokenizer = tokenizer; + m_is_validation_mode_enabled = is_validation_mode_enabled; + init(model, scheduler_config, plugin_config, device_config, core); +} + +void +ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::finish_request(SequenceGroup::Ptr request) { + + for (const auto& sequence : request->get_sequences()) { + m_scheduler->free_sequence(sequence->get_id()); + } + m_sampler->clear_request_info(request->get_request_id()); +} + +void ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::finish_request(int64_t request_id) { + // finish all request s in case of -1 + if (request_id == -1) { + while (!m_requests.empty()) { + const auto& request = *m_requests.rbegin(); + finish_request(request); + m_requests.pop_back(); + } + return; + } + for (size_t i = 0; i < m_requests.size(); ++i) { + auto& request = m_requests[i]; + if (request->get_request_id() != request_id) { + continue; + } + finish_request(request); + m_requests.erase(m_requests.begin() + i); + break; + } +} + +GeneratedRequests +ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::get_generated_requests() { + _pull_awaiting_requests(); + + GeneratedRequests result; + for (const auto& request : m_requests) { + const auto& request_id = request->get_request_id(); + if (!result.count(request_id)) { + result.insert({request_id, {{}} }); + } + auto& generated_request = result[request_id]; + for (const auto& sequence : request->get_running_sequences()) { + const auto& sequence_id = sequence->get_grouped_id(); + OPENVINO_ASSERT(!generated_request.count(sequence_id)); + generated_request.insert({{sequence_id, { sequence->get_generated_ids(), sequence->get_generated_log_probs() } }}); + } + } + return result; +} + +// { min_len_of_prefix, min_length_of_candidate } +std::pair +get_prefix_len( + const std::vector& running_sequences, + const GeneratedSequences& candidates) { + size_t min_generated_tokens = std::numeric_limits::max(), + min_candidate_len = std::numeric_limits::max(); + for (const auto& running_sequence : running_sequences) { + const auto& sequence_id = running_sequence->get_grouped_id(); + if (!candidates.count(sequence_id)) { + continue; + } + + const auto& candidate_sequence = candidates.at(sequence_id); + + const std::vector& candidate_token_ids = candidate_sequence.token_ids, + running_token_ids = running_sequence->get_generated_ids(); + + const size_t candidate_sequence_gen_len = candidate_token_ids.size(), + running_sequence_gen_len = running_sequence->get_generated_len(); + + // to find the len of prefix + size_t sequence_prefix_len = std::min(candidate_sequence_gen_len, running_sequence_gen_len); + for (size_t i = 0; i < sequence_prefix_len; ++i) { + if (candidate_token_ids[i] != running_token_ids[i]) { + sequence_prefix_len = i; + break; + } + } + + min_generated_tokens = std::min(sequence_prefix_len, min_generated_tokens); + min_candidate_len = std::min(candidate_sequence_gen_len, min_candidate_len); + } + return { min_generated_tokens, min_candidate_len }; +} + +size_t +remove_tokens_from_sequence(Sequence::Ptr& sequence, + size_t min_generated_tokens, + LogitProcessor& logit_proccessor) { + const auto generated_token_ids = sequence->get_generated_ids(); + const auto sequence_generated_len = generated_token_ids.size(); + OPENVINO_ASSERT(sequence_generated_len >= min_generated_tokens); + + size_t removed_token_cnt = sequence_generated_len - min_generated_tokens; + for (size_t i = min_generated_tokens; i < sequence_generated_len; ++i) { + logit_proccessor.decrease_generated_token_occurance(generated_token_ids[i]); + } + sequence->remove_last_tokens(removed_token_cnt); + return (sequence_generated_len - min_generated_tokens); +} + +size_t +insert_tokens_to_sequence(Sequence::Ptr& sequence, + const std::vector& token_ids, + const std::vector& token_log_probs, + LogitProcessor& logit_proccessor, + bool is_update_sampler) { + size_t generated_len = sequence->get_generated_len(), candidate_len = token_ids.size(); + OPENVINO_ASSERT(generated_len <= candidate_len); + for (size_t i = generated_len; i < candidate_len; ++i) { + sequence->append_token(token_ids[i], token_log_probs[i]); + if (is_update_sampler) { + logit_proccessor.register_new_generated_token(token_ids[i]); + } + } + return (candidate_len - generated_len); +} + + +// `is_init_all_sequences_in_request` is flag to enable initialization of all sequences in case of `num_return_sequences > 1`. +// Only first sequence from all will be initialized if flag is set to `false` state. +// This approach helped to process prompt once in speculative decoding multisequence case. +size_t +init_request( + SequenceGroup::Ptr request, + const GeneratedSequences& candidates, + LogitProcessor& logit_processor, + bool is_update_logit_processor, + bool is_init_all_sequences_in_request = false) { + if (candidates.begin()->second.token_ids.empty() && !is_init_all_sequences_in_request) { + return 0; + } + size_t min_candidate_len = std::numeric_limits::max(); + if (is_init_all_sequences_in_request) { + for (const auto& candidate_sequence : candidates) { + min_candidate_len = std::min(candidate_sequence.second.token_ids.size(), min_candidate_len); + } + } else { + // place only one token to first sequence in case of multisequence generation. + // Left sequences in request will be initialized in sampler and validated after (only one token). + min_candidate_len = request->get_sampling_parameters().num_return_sequences == 1 ? candidates.begin()->second.token_ids.size() : 1; + } + for (const auto& candidate_sequence : candidates) { + Sequence::Ptr sequence; + if (is_init_all_sequences_in_request && candidate_sequence.first > 0) { + sequence = Sequence::Ptr(new Sequence(candidate_sequence.first)); + sequence->set_status(ov::genai::SequenceStatus::RUNNING); + request->add_sequence(sequence); + } else { + auto running_sequences = request->get_running_sequences(); + OPENVINO_ASSERT(!running_sequences.empty()); + sequence = request->get_running_sequences()[0]; + } + auto token_ids = candidate_sequence.second.token_ids; + auto log_probs = candidate_sequence.second.log_probs; + token_ids.resize(min_candidate_len); + log_probs.resize(min_candidate_len); + + for (size_t i = 0; i < min_candidate_len; ++i) { + sequence->append_token(token_ids[i], log_probs[i]); + if (is_update_logit_processor) { + logit_processor.register_new_generated_token(token_ids[i]); + } + } + + if (!is_init_all_sequences_in_request) { + break; + } + } + return min_candidate_len; +} + +UpdateRequestResult +ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::init_request_by_candidate( + uint64_t request_id, + const GeneratedSequences& candidates) { + _pull_awaiting_requests(); + + for (auto& request : m_requests) { + if (request->get_request_id() != request_id) { + continue; + } + + UpdateRequestResult result; + m_sampler->create_logit_processor(request_id, request->get_sampling_parameters(), request->get_prompt_ids()); + auto& logit_processor = m_sampler->get_logit_processor(request_id); + result.inserted_tokens_cnt = init_request(request, candidates, logit_processor, true, true); + request->set_num_validated_tokens(result.inserted_tokens_cnt); + return result; + } + return {0, 0}; +} + +UpdateRequestResult +ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::update_request(uint64_t request_id, + const GeneratedSequences& candidates, + bool is_update_logit_processor) { + _pull_awaiting_requests(); + + UpdateRequestResult result{0, 0}; + for (auto& request : m_requests) { + if (request_id != request->get_request_id()) { + continue; + } + + std::vector running_sequences = request->get_running_sequences(); + size_t min_generated_tokens, min_candidate_len; + if (request->get_context_len() == 0 && !request->get_num_tokens_to_validate()) { + if (candidates.begin()->second.log_probs.empty()) { + // lock generation in case on empty generation + request->pause_generation(true); + return result; + } + // init request by sequences in case the pipeline was not started + m_sampler->create_logit_processor(request_id, request->get_sampling_parameters(), request->get_prompt_ids()); + auto& logit_processor = m_sampler->get_logit_processor(request_id); + result.inserted_tokens_cnt = init_request(request, candidates, logit_processor, is_update_logit_processor); + min_generated_tokens = result.inserted_tokens_cnt; + running_sequences = request->get_running_sequences(); + min_candidate_len = result.inserted_tokens_cnt; + } else { + // update existing sequences by the candidates + auto& logit_processor = m_sampler->get_logit_processor(request_id); + std::tie(min_generated_tokens, min_candidate_len) = get_prefix_len(running_sequences, candidates); + + for (auto& running_sequence : running_sequences) { + if (!candidates.count(running_sequence->get_grouped_id())) { + continue; + } + + result.removed_tokens_cnt = remove_tokens_from_sequence(running_sequence, min_generated_tokens, logit_processor); + + auto candidate_sequence = candidates.at(running_sequence->get_grouped_id()); + std::vector candidate_token_ids = candidate_sequence.token_ids; + std::vector candidate_token_log_probs = candidate_sequence.log_probs; + candidate_token_ids.resize(min_candidate_len); + candidate_token_log_probs.resize(min_candidate_len); + result.inserted_tokens_cnt = insert_tokens_to_sequence(running_sequence, candidate_token_ids, candidate_token_log_probs, logit_processor, is_update_logit_processor); + } + // we should update a logit proccessor just for draft model to generate the same tokens + // logit processors of main model will be updated in sampler while validation mode + if (is_update_logit_processor) { + logit_processor.update_generated_len(min_candidate_len); + } + } + + // update request context information to provide correct scheduling phase + const size_t num_processed_tokens = request->get_num_processed_tokens(), + prompt_len = request->get_prompt_len(), + updated_context_len = min_candidate_len + prompt_len; + if (num_processed_tokens > 0) + request->update_processed_tokens_num(num_processed_tokens - result.removed_tokens_cnt); + request->set_num_validated_tokens(result.inserted_tokens_cnt); + request->pause_generation(false); + break; + } + + return result; +} + +void +ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::unlock_next_request_generation() { + for (auto& request : m_requests) { + if (!request->has_finished() && !request->can_generate_tokens()) { + request->pause_generation(false); + return; + } + } +} + +void ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::multistep() { + size_t generated_tokens_cnt = 0; + // cycle to generate several tokens per one iteration for speculative decoding case + bool to_generate = true; + while (to_generate) { + generated_tokens_cnt++; + + step(); + + to_generate = false; + for (auto& request : m_requests) { + const auto& sampling_params = request->get_sampling_parameters(); + if (!sampling_params.is_speculative_decoding()) { + // generate only one token in case of non speculative decoding + request->pause_generation(true); + } else if (request->get_num_processed_tokens() == 0 && sampling_params.num_return_sequences > 1) { + request->pause_generation(true); + } else if (sampling_params.num_assistant_tokens <= generated_tokens_cnt) { + request->pause_generation(true); + } else if (request->get_num_processed_tokens() - request->get_prompt_len() + 1 >= sampling_params.max_new_tokens - 1) { + request->pause_generation(true); + } + to_generate |= request->can_generate_tokens(); + } + } +} +} \ No newline at end of file diff --git a/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.hpp b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.hpp new file mode 100644 index 0000000000..a75a160f14 --- /dev/null +++ b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.hpp @@ -0,0 +1,37 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "openvino/genai/continuous_batching_pipeline.hpp" + +#include "continuous_batching_impl.hpp" +#include "speculative_decoding/update_request_structs.hpp" + +namespace ov::genai { +class ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl : public ContinuousBatchingPipeline::ContinuousBatchingImpl { +public: + ContinuousBatchingForSpeculativeDecodingImpl() = default; + + ContinuousBatchingForSpeculativeDecodingImpl(ov::Core& core, + const std::shared_ptr& model, + const Tokenizer& tokenizer, + const DeviceConfig& device_config, + const SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config, + bool is_validation_mode_enabled); + + void multistep(); + void finish_request(int64_t request_id = -1); + void unlock_next_request_generation(); + + GeneratedRequests get_generated_requests(); + UpdateRequestResult update_request(uint64_t request_id, const GeneratedSequences& candidates, bool is_update_logit_processor); + + UpdateRequestResult init_request_by_candidate(uint64_t request_id, const GeneratedSequences& candidates); + +protected: + void finish_request(SequenceGroup::Ptr request); +}; +} \ No newline at end of file diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp new file mode 100644 index 0000000000..ebd5c73d9a --- /dev/null +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp @@ -0,0 +1,239 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include "text_callback_streamer.hpp" +#include "speculative_decoding_impl.hpp" +#include "utils.hpp" +#include "utils/paged_attention_transformations.hpp" + + +namespace ov::genai { +template struct overloaded : Ts... {using Ts::operator()...;}; +template overloaded(Ts...) -> overloaded; + +ContinuousBatchingPipeline::SpeculativeDecodingImpl::SpeculativeDecodingImpl( + const std::string& main_models_path, + const SchedulerConfig& main_scheduler_config, + const std::string& main_device, + const ov::AnyMap& main_plugin_config, + const ov::genai::ModelDesc draft_model_desc, + const ov::AnyMap& tokenizer_plugin_config) { + ov::Core core; + auto [core_plugin_config, compile_plugin_config] = ov::genai::utils::split_core_complile_config(main_plugin_config); + core.set_property(core_plugin_config); + + std::string openvino_model_name = "/openvino_model.xml", + draft_model_path = draft_model_desc.model_path; + + std::shared_ptr main_model = core.read_model(main_models_path + openvino_model_name), + draft_model = core.read_model(draft_model_path + openvino_model_name); + + utils::apply_paged_attention_transformations(main_model, main_scheduler_config.use_cache_eviction); + utils::apply_paged_attention_transformations(draft_model, main_scheduler_config.use_cache_eviction); + + std::string draft_device = draft_model_desc.device; + bool is_draft_device_undefined = false; + if (draft_device.empty() || draft_device == main_device) { + draft_device = main_device; + is_draft_device_undefined = true; + } + + ov::genai::SchedulerConfig main_scheduler_config_updated = main_scheduler_config, + draft_scheduler_config = is_draft_device_undefined ? main_scheduler_config : draft_model_desc.scheduler_config; + if (is_draft_device_undefined) { + // split KV cache to 2 caches for main and draft models + size_t main_model_cache_size = utils::get_kv_cache_size(main_model), + draft_model_cache_size = utils::get_kv_cache_size(draft_model); + auto k = static_cast(draft_model_cache_size) / (main_model_cache_size + draft_model_cache_size); + + size_t main_cache_size = main_scheduler_config.cache_size * (1 - k), + draft_cache_size = main_scheduler_config.cache_size * k; + if (draft_cache_size == 0) { + main_cache_size -= main_cache_size > 1 ? 1 : 0; + draft_cache_size = 1; + } + + main_scheduler_config_updated.cache_size = main_cache_size; + draft_scheduler_config.cache_size = draft_cache_size; + } + + ov::AnyMap draft_plugin_config = is_draft_device_undefined ? compile_plugin_config : draft_model_desc.plugin_config; + + DeviceConfig main_device_config(core, main_scheduler_config, main_device, compile_plugin_config), + draft_device_config(core, draft_scheduler_config, draft_device, draft_plugin_config); + + utils::set_kv_cache_type_and_shape(main_model, main_device_config); + utils::set_kv_cache_type_and_shape(draft_model, draft_device_config); + + // main and draft model can have different tokenizers + // to do: support retokenization: 154103 + Tokenizer main_model_tokenizer(main_models_path, tokenizer_plugin_config), + draft_model_tokenizer(draft_model_path, tokenizer_plugin_config); + + m_tokenizer = main_model_tokenizer; + + // to create `main_pipeline` with enabled validation_mode and `draft_pipeline` with disabled validation mode + m_main_pipeline = std::make_shared(core, main_model, main_model_tokenizer, main_device_config, main_scheduler_config, main_device, compile_plugin_config, true); + m_draft_pipeline = std::make_shared(core, draft_model, draft_model_tokenizer, draft_device_config, draft_scheduler_config, draft_device, draft_plugin_config, false); +} + +GenerationHandle +ContinuousBatchingPipeline::SpeculativeDecodingImpl::add_request(uint64_t request_id, + const ov::Tensor& input_ids, + ov::genai::GenerationConfig sampling_params) { + m_draft_pipeline->add_request(request_id, input_ids, sampling_params); + return m_main_pipeline->add_request(request_id, input_ids, sampling_params); +}; + +GenerationHandle +ContinuousBatchingPipeline::SpeculativeDecodingImpl::add_request(uint64_t request_id, + const std::string& prompt, + ov::genai::GenerationConfig sampling_params) { + m_draft_pipeline->add_request(request_id, prompt, sampling_params); + return m_main_pipeline->add_request(request_id, prompt, sampling_params); +} + +bool ContinuousBatchingPipeline::SpeculativeDecodingImpl::has_non_finished_requests() { + return m_main_pipeline->has_non_finished_requests(); +} + +void print_generated_request(const ov::genai::GeneratedRequests& requests) { + for (const auto& request : requests) { + for (const auto& sequence : request.second) { + std::cout << "request_id: " << request.first << " | sequence_id: " << sequence.first << " | "; + for (const auto& token_id : sequence.second.token_ids) { + std::cout << token_id << " "; + } + std::cout << std::endl; + } + std::cout << std::endl; + } +} + +void ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() { + // generate candidates by draft model + static ManualTimer draft_timer("speculative_decoding: draft_model: multistep()"); + draft_timer.start(); + m_draft_pipeline->multistep(); + draft_timer.end(); + m_sd_metrics.draft_duration += draft_timer.get_duration_ms(); + + // to generate num_matches statistic + std::map update_sequence_info; + // put candidates to model KV cache + auto draft_generated_requests = m_draft_pipeline->get_generated_requests(); + for (const auto& candidate : m_draft_pipeline->get_generated_requests()) { + auto update_result = m_main_pipeline->update_request(candidate.first, candidate.second, false); + update_sequence_info.insert({{candidate.first, update_result}}); + } + + static ManualTimer main_timer("speculative_decoding: main_model: step()"); + main_timer.start(); + m_main_pipeline->step(); + main_timer.end(); + m_sd_metrics.main_duration += main_timer.get_duration_ms(); + + auto main_generated_requests = m_main_pipeline->get_generated_requests(); + for (const auto& checked_sequence : main_generated_requests) { + auto update_result = m_draft_pipeline->update_request(checked_sequence.first, checked_sequence.second, true); + update_sequence_info[checked_sequence.first].removed_tokens_cnt = update_result.removed_tokens_cnt; + } + + // finish draft request if the generation was complited + for (const auto& draft_request : draft_generated_requests) { + auto request_id = draft_request.first; + if (!main_generated_requests.count(request_id)) { + m_draft_pipeline->finish_request(request_id); + // in case of some requests not to started, unlock generation of next request + m_draft_pipeline->unlock_next_request_generation(); + } + auto updated_seq_info = update_sequence_info[request_id]; + float acceptance_rate = 1 - static_cast(updated_seq_info.removed_tokens_cnt) / updated_seq_info.inserted_tokens_cnt; + m_sd_metrics.update_acceptance_rate(request_id, acceptance_rate); + m_sd_metrics.update_draft_accepted_tokens(request_id, (updated_seq_info.inserted_tokens_cnt - updated_seq_info.removed_tokens_cnt)); + } +} + +std::vector +ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector& input_ids, + const std::vector& sampling_params, + const StreamerVariant& streamer) { + static ManualTimer timer("speculative_decoding: generate()"); + timer.start(); + OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); + OPENVINO_ASSERT(input_ids.size() == sampling_params.size()); + const std::shared_ptr& streamer_ptr = std::visit(overloaded{ + [](std::monostate) -> std::shared_ptr { + return nullptr; + }, + [](const std::shared_ptr& streamer) { + return streamer; + }, + [this](const std::function& streamer) -> std::shared_ptr { + return std::make_unique(m_tokenizer, streamer); + } + }, streamer); + + std::vector main_generations, draft_generations; + for (size_t request_id = 0; request_id < input_ids.size(); ++request_id) { + OPENVINO_ASSERT(1 == input_ids[request_id].get_shape().at(0), "Use multiple tensors to pass a batch."); + main_generations.push_back(m_main_pipeline->add_request(request_id, input_ids[request_id], sampling_params[request_id])); + + auto draft_sampling_params = sampling_params[request_id]; + // set the parameters do not stop draft generation without stopping of the same request for main pipeline + draft_sampling_params.max_new_tokens = draft_sampling_params.max_new_tokens + 1; + draft_sampling_params.min_new_tokens = draft_sampling_params.min_new_tokens + 1; + draft_sampling_params.ignore_eos = true; + draft_generations.push_back(m_draft_pipeline->add_request(request_id, input_ids[request_id], draft_sampling_params)); + // decrease generation len to generate last token by main model + } + + std::vector results; + results.reserve(input_ids.size()); + + bool continue_generation = true; + while (has_non_finished_requests() && continue_generation) { + step(); + if (streamer_ptr) { + std::unordered_map token = main_generations.at(0).get()->back(); + OPENVINO_ASSERT(1 == token.size()); + OPENVINO_ASSERT(1 == token.begin()->second.generated_ids.size()); + continue_generation = !streamer_ptr->put(token.begin()->second.generated_ids.at(0)); + } + } + if (streamer_ptr) { + streamer_ptr->end(); + } + draft_generations.clear(); + + for (size_t generation_idx = 0; generation_idx < main_generations.size(); ++generation_idx) { + const auto& generation = main_generations[generation_idx]; + EncodedGenerationResult result; + result.m_request_id = 1; + std::vector generation_outputs = generation->read_all(); + std::sort(generation_outputs.begin(), generation_outputs.end(), [=] (GenerationOutput& r1, GenerationOutput& r2) { + return r1.score > r2.score; + }); + + auto num_outputs = std::min(sampling_params[generation_idx].num_return_sequences, generation_outputs.size()); + for (size_t generation_output_idx = 0; generation_output_idx < num_outputs; ++generation_output_idx) { + const auto& generation_output = generation_outputs[generation_output_idx]; + m_sd_metrics.set_generated_len(generation_idx, generation_outputs[generation_output_idx].generated_ids.size()); + result.m_generation_ids.push_back(std::move(generation_output.generated_ids)); + result.m_scores.push_back(generation_output.score); + } + result.m_status = generation->get_status(); + results.push_back(std::move(result)); + } + + OPENVINO_ASSERT(results.size() == input_ids.size()); + timer.end(); + m_sd_metrics.total_duration = timer.get_duration_ms(); + return results; +} + +SpeculativeDecodingMetrics +ContinuousBatchingPipeline::SpeculativeDecodingImpl::get_speculative_decoding_metrics() { + return m_sd_metrics; +}; +} diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp new file mode 100644 index 0000000000..3b6e43544d --- /dev/null +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp @@ -0,0 +1,61 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "openvino/genai/continuous_batching_pipeline.hpp" +#include "continuous_batching_impl.hpp" +#include "continuous_batching_for_speculative_decoding_impl.hpp" +#include "speculative_decoding/speculative_decoding_metrics.hpp" + +namespace ov::genai { + +struct ModelDesc { + std::string model_path; + std::string device; + ov::genai::SchedulerConfig scheduler_config; + ov::AnyMap plugin_config; + + ModelDesc(const std::string& model_path, + const std::string& device = "", + const ov::AnyMap& plugin_config = {}, + const ov::genai::SchedulerConfig& scheduler_config = {}) : + model_path(model_path), + device(device), + plugin_config(plugin_config), + scheduler_config(scheduler_config) {} +}; + +class ContinuousBatchingPipeline::SpeculativeDecodingImpl : public ContinuousBatchingPipeline::ImplInterface { +protected: + std::shared_ptr m_main_pipeline, m_draft_pipeline; + SpeculativeDecodingMetrics m_sd_metrics; + +public: + SpeculativeDecodingImpl(const std::string& main_models_path, + const SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& plugin_config, + const ov::genai::ModelDesc draft_model_desc, + const ov::AnyMap& tokenizer_config = {}); + + GenerationHandle add_request(uint64_t request_id, + const ov::Tensor& input_ids, + ov::genai::GenerationConfig sampling_params) override; + GenerationHandle add_request(uint64_t request_id, + const std::string& prompt, + ov::genai::GenerationConfig sampling_params) override; + + bool has_non_finished_requests() override; + + void step() override; + + std::vector + generate(const std::vector& input_ids, + const std::vector& sampling_params, + const StreamerVariant& streamer) override; + + SpeculativeDecodingMetrics get_speculative_decoding_metrics(); +}; + +} \ No newline at end of file diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_metrics.cpp b/src/cpp/src/speculative_decoding/speculative_decoding_metrics.cpp new file mode 100644 index 0000000000..d15bab48b0 --- /dev/null +++ b/src/cpp/src/speculative_decoding/speculative_decoding_metrics.cpp @@ -0,0 +1,66 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include + +#include "speculative_decoding/speculative_decoding_metrics.hpp" +#include "openvino/runtime/exception.hpp" + +namespace ov::genai { + +float SpeculativeDecodingMetrics::get_avg_acceptance_rate(int64_t request_id) { + float avg_acceptance_rate = 0.f; + if (request_id != -1) { + size_t total_iteration_cnt = 0; + for (const auto& acceptance_rate : m_acceptance_rate) { + avg_acceptance_rate += std::accumulate(acceptance_rate.second.begin(), acceptance_rate.second.end(), 0); + total_iteration_cnt += acceptance_rate.second.size(); + } + avg_acceptance_rate /= total_iteration_cnt; + } else { + OPENVINO_ASSERT(m_acceptance_rate.count(request_id)); + const auto& acceptance_rate = m_acceptance_rate[request_id]; + avg_acceptance_rate = std::accumulate(acceptance_rate.begin(), acceptance_rate.end(), 0); + avg_acceptance_rate /= acceptance_rate.size(); + } + return avg_acceptance_rate; +} + +void SpeculativeDecodingMetrics::update_acceptance_rate(int64_t request_id, float acceptance_rate) { + if (m_acceptance_rate.count(request_id)) { + m_acceptance_rate[request_id].push_back(acceptance_rate); + } else { + m_acceptance_rate.insert({{ request_id, std::vector{acceptance_rate} }}); + } +} + +size_t SpeculativeDecodingMetrics::get_iteration_number(int64_t request_id) { + OPENVINO_ASSERT(m_acceptance_rate.count(request_id)); + return m_acceptance_rate[request_id].size(); +} + +float SpeculativeDecodingMetrics::get_draft_duration_percentage() { + return (draft_duration / total_duration); +} + +float SpeculativeDecodingMetrics::get_main_duration_percentage() { + return (main_duration / total_duration); +} + +float SpeculativeDecodingMetrics::get_inference_duration_percentage() { + return ((draft_duration + main_duration) / total_duration); +} + +float SpeculativeDecodingMetrics::get_draft_accepted_tokens_percentage(int64_t request_id) { + return 0.f; +} + +void SpeculativeDecodingMetrics::update_draft_accepted_tokens(int64_t request_id, size_t num_matches) { + m_draft_accepted_tokens[request_id] += num_matches; +} + +void SpeculativeDecodingMetrics::set_generated_len(int64_t request_id, size_t generated_len) { + m_generated_len[request_id] = std::max(generated_len, m_generated_len[request_id]); +} + +} diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_metrics.hpp b/src/cpp/src/speculative_decoding/speculative_decoding_metrics.hpp new file mode 100644 index 0000000000..bb840f500f --- /dev/null +++ b/src/cpp/src/speculative_decoding/speculative_decoding_metrics.hpp @@ -0,0 +1,38 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include +#include + +namespace ov::genai { +class SpeculativeDecodingMetrics { + // percent of draft model using time + draft model gen tokens + using AcceptanceRate = std::vector; + // { request_id, acceptance_rate } + std::map m_acceptance_rate; + + std::map m_draft_accepted_tokens; + std::map m_generated_len; + +public: + float draft_duration = 0, main_duration = 0, total_duration = 0; + + float get_avg_acceptance_rate(int64_t request_id); + void update_acceptance_rate(int64_t request_id, float acceptance_rate); + + float get_draft_accepted_tokens_percentage(int64_t request_id); + void update_draft_accepted_tokens(int64_t request_id, size_t num_matches); + + void set_generated_len(int64_t request_id, size_t generated_len); + + size_t get_iteration_number(int64_t request_id); + + float get_draft_duration_percentage(); + float get_main_duration_percentage(); + float get_inference_duration_percentage(); + +}; +} \ No newline at end of file diff --git a/src/cpp/src/speculative_decoding/update_request_structs.hpp b/src/cpp/src/speculative_decoding/update_request_structs.hpp new file mode 100644 index 0000000000..bd9bb3b67e --- /dev/null +++ b/src/cpp/src/speculative_decoding/update_request_structs.hpp @@ -0,0 +1,33 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include + +namespace ov::genai { +struct GeneratedSequence { + std::vector token_ids; + std::vector log_probs; + + GeneratedSequence(const std::vector& generated_token_ids, + const std::vector& generated_log_probs) : + token_ids(generated_token_ids), + log_probs(generated_log_probs) {}; +}; + +struct UpdateRequestResult { + size_t inserted_tokens_cnt, removed_tokens_cnt; + + UpdateRequestResult(size_t to_insert = 0, size_t to_remove = 0) : + inserted_tokens_cnt(to_insert), + removed_tokens_cnt(to_remove) {}; +}; + +// { sequence_id : generated_tokens_and_log_probs } +using GeneratedSequences = std::map; + +// { request_id : generated_sequence } +using GeneratedRequests = std::map; +} diff --git a/src/cpp/src/timer.hpp b/src/cpp/src/timer.hpp index c4893acd1c..e3d2d3f006 100644 --- a/src/cpp/src/timer.hpp +++ b/src/cpp/src/timer.hpp @@ -26,6 +26,10 @@ class ManualTimer { m_total += std::chrono::duration(m_end - m_start).count(); } + float get_duration_ms() const { + return m_total / 1000.; + } + ~ManualTimer() { std::cout << m_title << ": " << m_total / 1000. << " secs" << std::endl; } diff --git a/src/cpp/src/utils.hpp b/src/cpp/src/utils.hpp index 5279520a29..531b50e163 100644 --- a/src/cpp/src/utils.hpp +++ b/src/cpp/src/utils.hpp @@ -34,6 +34,7 @@ void read_anymap_param(const ov::AnyMap& config_map, const std::string& name, T& const std::string STREAMER_ARG_NAME = "streamer"; const std::string CONFIG_ARG_NAME = "generation_config"; +const std::string DRAFT_MODEL_ARG_NAME = "draft_model"; template Config from_config_json_if_exists(const std::filesystem::path& model_path, const char config_name[]="generation_config.json") { diff --git a/src/cpp/src/paged_attention_transformations.cpp b/src/cpp/src/utils/paged_attention_transformations.cpp similarity index 71% rename from src/cpp/src/paged_attention_transformations.cpp rename to src/cpp/src/utils/paged_attention_transformations.cpp index 28dda4dea3..53690f770c 100644 --- a/src/cpp/src/paged_attention_transformations.cpp +++ b/src/cpp/src/utils/paged_attention_transformations.cpp @@ -1,38 +1,44 @@ // Copyright (C) 2023-2024 Intel Corporation // SPDX-License-Identifier: Apache-2.0 +#include "utils/paged_attention_transformations.hpp" + #include "openvino/pass/manager.hpp" #include "openvino/pass/sdpa_to_paged_attention.hpp" -#include "paged_attention_transformations.hpp" -#include "cache_manager.hpp" +namespace ov { +namespace genai { +namespace utils { -namespace ov::genai { inline ov::PartialShape to_partial_with_dyn_0_dim(const ov::Shape& static_shape) { ov::PartialShape partial_shape = static_shape; partial_shape[0] = ov::Dimension::dynamic(); return partial_shape; } -/** Applies transformations to the ov::Model to enable paged attention inference. - * @param model Pointer to the ov::Model representing one of the supported LLM architectures. - * @param device_config Configuration struct for inferencing device specifics. - * @param per_layer_cache_control If true, then the transformations will enable per-layer control of KV cache blocks, allowing to specify - * different sets of KV cache blocks for different attention layers. If false, then the KV cache block structure will be identical across all - * decoder layers. - */ -void apply_paged_attention_transformations(std::shared_ptr model, DeviceConfig& device_config, bool per_layer_cache_control) { +size_t get_kv_cache_size(const std::shared_ptr model) { + const auto& parameters = model->get_parameters(); + // extract num_kv_heads and head_size + size_t kv_caches_inputs_offset = 2; + ov::PartialShape k_shape = parameters[kv_caches_inputs_offset]->get_partial_shape(); + OPENVINO_ASSERT(k_shape.rank().get_length() == 3, "KV cache shape is expected to have rank 3, while shape is ", k_shape); + size_t num_kv_heads = k_shape[1].get_length(), head_size = k_shape[2].get_length(); + return num_kv_heads * head_size; +} + +void apply_paged_attention_transformations(std::shared_ptr model, bool per_layer_cache_control) { const ov::op::util::VariableVector& variables = model->get_variables(); OPENVINO_ASSERT(!variables.empty(), "Model is supposed to be stateful"); bool use_block_indices_inputs = per_layer_cache_control; bool use_score_outputs = per_layer_cache_control; ov::pass::SDPAToPagedAttention(use_block_indices_inputs, use_score_outputs).run_on_model(model); +} +void set_kv_cache_type_and_shape(std::shared_ptr model, DeviceConfig& device_config) { const ov::ParameterVector& parameters = model->get_parameters(); - std::map> key_cache_params; - std::map> value_cache_params; + std::map> key_cache_params, value_cache_params; for (const auto& param_ptr : parameters) { const auto& name = param_ptr->get_friendly_name(); if (name.find("key_cache.") == 0) { @@ -43,8 +49,8 @@ void apply_paged_attention_transformations(std::shared_ptr model, Dev } } - OPENVINO_ASSERT(key_cache_params.size() == value_cache_params.size()); OPENVINO_ASSERT(key_cache_params.size() > 0); + OPENVINO_ASSERT(key_cache_params.size() == value_cache_params.size()); size_t num_layers = key_cache_params.size(); // extract num_kv_heads and head_size @@ -66,4 +72,12 @@ void apply_paged_attention_transformations(std::shared_ptr model, Dev model->validate_nodes_and_infer_types(); } + +void apply_paged_attention_transformations(std::shared_ptr model, DeviceConfig& device_config, bool per_layer_cache_control) { + apply_paged_attention_transformations(model, per_layer_cache_control); + set_kv_cache_type_and_shape(model, device_config); } + +} // namespace utils +} // namespace genai +} // namespace ov \ No newline at end of file diff --git a/src/cpp/src/utils/paged_attention_transformations.hpp b/src/cpp/src/utils/paged_attention_transformations.hpp new file mode 100644 index 0000000000..3bc423d7bc --- /dev/null +++ b/src/cpp/src/utils/paged_attention_transformations.hpp @@ -0,0 +1,32 @@ +// Copyright (C) 2023-2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "openvino/core/any.hpp" +#include "openvino/core/model.hpp" +#include "device_config.hpp" + +namespace ov { +namespace genai { +namespace utils { + + +/** Applies transformations to the ov::Model to enable paged attention inference. + * @param model Pointer to the ov::Model representing one of the supported LLM architectures. + * @param device_config Configuration struct for inferencing device specifics. + * @param per_layer_cache_control If true, then the transformations will enable per-layer control of KV cache blocks, allowing to specify + * different sets of KV cache blocks for different attention layers. If false, then the KV cache block structure will be identical across all + * decoder layers. + */ +void apply_paged_attention_transformations(std::shared_ptr model, DeviceConfig& device_config, bool per_layer_cache_control = false); + +void apply_paged_attention_transformations(std::shared_ptr model, bool per_layer_cache_control = false); + +size_t get_kv_cache_size(const std::shared_ptr model); + +void set_kv_cache_type_and_shape(std::shared_ptr model, DeviceConfig& device_config); + +} // namespace utils +} // namespace genai +} // namespace ov diff --git a/tests/cpp/CMakeLists.txt b/tests/cpp/CMakeLists.txt index b3526c873c..f404e63cff 100644 --- a/tests/cpp/CMakeLists.txt +++ b/tests/cpp/CMakeLists.txt @@ -17,7 +17,12 @@ set(TEST_TARGET_NAME "tests_continuous_batching") file(GLOB tests_src "*.cpp") file(GLOB src_files "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/sequence_group.cpp" "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/cache_eviction.cpp" - "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/sampler.cpp") + "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/sampler.cpp" + "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/speculative_decoding/*.cpp" + "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/utils/*.cpp" + "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/utils.cpp" + "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/continuous_batching*.cpp" + "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/text_callback_streamer.cpp") add_executable(${TEST_TARGET_NAME} ${tests_src} block_allocator.cpp) diff --git a/tests/cpp/generate_config.cpp b/tests/cpp/generate_config.cpp index bf11b33e67..974fd499f8 100644 --- a/tests/cpp/generate_config.cpp +++ b/tests/cpp/generate_config.cpp @@ -101,3 +101,43 @@ TEST(GenerationConfigTest, valid_frequency_penalty) { config.frequency_penalty = -2.0; EXPECT_NO_THROW(config.validate()); } + +ov::genai::GenerationConfig speculative_decoding_multinomial() { + auto speculative_decoding_multinomial_config = ov::genai::multinomial(); + speculative_decoding_multinomial_config.num_assistant_tokens = 5; + return speculative_decoding_multinomial_config; +} + +ov::genai::GenerationConfig speculative_decoding_greedy() { + auto speculative_decoding_greedy_config = ov::genai::greedy(); + speculative_decoding_greedy_config.assistant_confidence_threshold = 0.4f; + return speculative_decoding_greedy_config; +} + +TEST(GenerationConfigTest, invalid_static_spec_decoding) { + GenerationConfig config = speculative_decoding_greedy(); + config.num_assistant_tokens = 5; + config.assistant_confidence_threshold = 0.2; + EXPECT_THROW(config.validate(), ov::Exception); +} + +TEST(GenerationConfigTest, valid_static_spec_decoding) { + GenerationConfig config = speculative_decoding_greedy(); + config.num_assistant_tokens = 5; + config.assistant_confidence_threshold = 0; + EXPECT_NO_THROW(config.validate()); +} + +TEST(GenerationConfigTest, invalid_dynamic_spec_decoding) { + GenerationConfig config = speculative_decoding_greedy(); + config.num_assistant_tokens = 5; + config.assistant_confidence_threshold = 0.5; + EXPECT_THROW(config.validate(), ov::Exception); +} + +TEST(GenerationConfigTest, valid_dynamic_spec_decoding) { + GenerationConfig config = speculative_decoding_greedy(); + config.assistant_confidence_threshold = 0.5; + config.num_assistant_tokens = 0; + EXPECT_NO_THROW(config.validate()); +} diff --git a/tests/cpp/speculative_decoding.cpp b/tests/cpp/speculative_decoding.cpp new file mode 100644 index 0000000000..d4e48d3ebb --- /dev/null +++ b/tests/cpp/speculative_decoding.cpp @@ -0,0 +1,429 @@ +// Copyright (C) 2024 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include "gtest/gtest.h" + +#include "speculative_decoding/continuous_batching_for_speculative_decoding_impl.hpp" + +class CBForSDTest : public testing::Test, public ov::genai::ContinuousBatchingPipeline { +protected: + class PipelineTestInstance : public ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl { + public: + PipelineTestInstance() { + m_sampler = std::make_shared(); + }; + + ov::genai::GenerationHandle + add_request(uint64_t request_id, const ov::Tensor& input_ids) { + auto sampling_params = ov::genai::greedy(); + + ov::genai::SequenceGroup::Ptr sequence_group = std::make_shared(request_id, input_ids, + sampling_params, + 32, + true); + sequence_group->set_sequence_group_ptr(sequence_group); + + { + std::lock_guard lock{m_awaiting_requests_mutex}; + m_awaiting_requests.push_back(sequence_group); + } + return std::make_shared(sequence_group->get_generation_stream(), sampling_params); + }; + + }; + + PipelineTestInstance m_pipeline = PipelineTestInstance(); +}; + +TEST_F(CBForSDTest, init_sequence_by_not_empty__one_sequence) { + std::vector input_vector{0, 1, 2, 3, 4}; + ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); + m_pipeline.add_request(0, input_tensor); + + std::vector tokens = { 0, 1, 2 }; + std::vector log_probs = { 0.1f, 0.2f, 0.3f }; + ov::genai::GeneratedSequences candidate{{ 0, ov::genai::GeneratedSequence(tokens, log_probs) }}; + + auto before = m_pipeline.get_generated_requests(); + auto update_result = m_pipeline.update_request(0, candidate, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 3); + + auto after = m_pipeline.get_generated_requests(); + ASSERT_NE(after.at(0).at(0).token_ids, before.at(0).at(0).token_ids); + ASSERT_NE(after.at(0).at(0).log_probs, before.at(0).at(0).log_probs); + ASSERT_EQ(after.at(0).at(0).token_ids, tokens); + ASSERT_EQ(after.at(0).at(0).log_probs, log_probs); +} + +TEST_F(CBForSDTest, init_sequence_by_empty__one_sequence) { + std::vector input_vector{0, 1, 2, 3, 4}; + ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); + m_pipeline.add_request(0, input_tensor); + + std::vector tokens = {}; + std::vector log_probs = {}; + ov::genai::GeneratedSequences candidate{{ 0, ov::genai::GeneratedSequence(tokens, log_probs) }}; + + auto before = m_pipeline.get_generated_requests(); + auto update_result = m_pipeline.update_request(0, candidate, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 0); + + auto after = m_pipeline.get_generated_requests(); + ASSERT_EQ(after.at(0).at(0).token_ids, before.at(0).at(0).token_ids); + ASSERT_EQ(after.at(0).at(0).log_probs, before.at(0).at(0).log_probs); + ASSERT_EQ(after.at(0).at(0).token_ids, tokens); + ASSERT_EQ(after.at(0).at(0).log_probs, log_probs); +} + +TEST_F(CBForSDTest, no_updated_tokens__one_sequence) { + std::vector input_vector{0, 1, 2, 3, 4}; + ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); + m_pipeline.add_request(0, input_tensor); + + std::vector tokens = { 0, 1, 2 }; + std::vector log_probs = { 0.1f, 0.2f, 0.3f }; + ov::genai::GeneratedSequences candidate{{ 0, ov::genai::GeneratedSequence(tokens, log_probs) }}; + + auto update_result = m_pipeline.update_request(0, candidate, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 3); + + ov::genai::GeneratedSequences candidate_1{{ 0, ov::genai::GeneratedSequence(tokens, log_probs) }}; + + auto before = m_pipeline.get_generated_requests(); + update_result = m_pipeline.update_request(0, candidate_1, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 0); + + auto after = m_pipeline.get_generated_requests(); + ASSERT_EQ(after.at(0).at(0).token_ids, before.at(0).at(0).token_ids); + ASSERT_EQ(after.at(0).at(0).log_probs, before.at(0).at(0).log_probs); + ASSERT_EQ(after.at(0).at(0).token_ids, tokens); + ASSERT_EQ(after.at(0).at(0).log_probs, log_probs); +} + +TEST_F(CBForSDTest, remove_tokens__one_sequence) { + std::vector input_vector{0, 1, 2, 3, 4}; + ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); + m_pipeline.add_request(0, input_tensor); + + std::vector tokens = { 0, 1, 2 }; + std::vector log_probs = { 0.1f, 0.2f, 0.3f }; + ov::genai::GeneratedSequences candidate{{ 0, ov::genai::GeneratedSequence(tokens, log_probs) }}; + + auto update_result = m_pipeline.update_request(0, candidate, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 3); + + tokens = { 0, 1 }; + log_probs = { 0.1f, 0.2f }; + ov::genai::GeneratedSequences candidate_1{{ 0, ov::genai::GeneratedSequence(tokens, log_probs) }}; + + auto before = m_pipeline.get_generated_requests(); + update_result = m_pipeline.update_request(0, candidate_1, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 1); + ASSERT_EQ(update_result.inserted_tokens_cnt, 0); + + auto after = m_pipeline.get_generated_requests(); + ASSERT_NE(after.at(0).at(0).token_ids, before.at(0).at(0).token_ids); + ASSERT_NE(after.at(0).at(0).log_probs, before.at(0).at(0).log_probs); + ASSERT_EQ(after.at(0).at(0).token_ids, tokens); + ASSERT_EQ(after.at(0).at(0).log_probs, log_probs); +} + +TEST_F(CBForSDTest, remove_and_replace_tokens__one_sequence) { + std::vector input_vector{0, 1, 2, 3, 4}; + ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); + m_pipeline.add_request(0, input_tensor); + + std::vector tokens = { 0, 1, 2 }; + std::vector log_probs = { 0.1f, 0.2f, 0.3f }; + ov::genai::GeneratedSequences candidate{{ 0, ov::genai::GeneratedSequence(tokens, log_probs) }}; + + auto update_result = m_pipeline.update_request(0, candidate, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 3); + + tokens = { 0, 1, 4 }; + log_probs = { 0.1f, 0.2f, 0.4f }; + ov::genai::GeneratedSequences candidate_1{{ 0, ov::genai::GeneratedSequence(tokens, log_probs) }}; + + auto before = m_pipeline.get_generated_requests(); + update_result = m_pipeline.update_request(0, candidate_1, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 1); + ASSERT_EQ(update_result.inserted_tokens_cnt, 1); + + auto after = m_pipeline.get_generated_requests(); + ASSERT_NE(after.at(0).at(0).token_ids, before.at(0).at(0).token_ids); + ASSERT_NE(after.at(0).at(0).log_probs, before.at(0).at(0).log_probs); + ASSERT_EQ(after.at(0).at(0).token_ids, tokens); + ASSERT_EQ(after.at(0).at(0).log_probs, log_probs); +} + +TEST_F(CBForSDTest, add_tokens__one_sequence) { + std::vector input_vector{0, 1, 2, 3, 4}; + ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); + m_pipeline.add_request(0, input_tensor); + + std::vector tokens = { 0, 1, 2 }; + std::vector log_probs = { 0.1f, 0.2f, 0.3f }; + ov::genai::GeneratedSequences candidate{{ 0, ov::genai::GeneratedSequence(tokens, log_probs) }}; + + auto update_result = m_pipeline.update_request(0, candidate, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 3); + + tokens = { 0, 1, 2, 3, 4 }; + log_probs = { 0.1f, 0.2f, 0.3f, 0.4f, 0.5f }; + ov::genai::GeneratedSequences candidate_1{{ 0, ov::genai::GeneratedSequence(tokens, log_probs) }}; + + auto before = m_pipeline.get_generated_requests(); + update_result = m_pipeline.update_request(0, candidate_1, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 2); + + auto after = m_pipeline.get_generated_requests(); + ASSERT_NE(after.at(0).at(0).token_ids, before.at(0).at(0).token_ids); + ASSERT_NE(after.at(0).at(0).log_probs, before.at(0).at(0).log_probs); + ASSERT_EQ(after.at(0).at(0).token_ids, tokens); + ASSERT_EQ(after.at(0).at(0).log_probs, log_probs); +} + +TEST_F(CBForSDTest, update_empty_sequence_by_not_empty__two_sequence) { + std::vector input_vector{0, 1, 2, 3, 4}; + ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); + m_pipeline.add_request(0, input_tensor); + + std::vector tokens_0 = { 0, 1, 2 }, + tokens_1 = { 0, 1 }; + std::vector log_probs_0 = { 0.1f, 0.2f, 0.3f }, + log_probs_1 = { 0.1f, 0.2f }; + ov::genai::GeneratedSequences candidate{ + { 0, ov::genai::GeneratedSequence(tokens_0, log_probs_0) }, + { 1, ov::genai::GeneratedSequence(tokens_1, log_probs_1) } + }; + + auto before = m_pipeline.get_generated_requests(); + auto update_result = m_pipeline.update_request(0, candidate, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 3); + + auto after = m_pipeline.get_generated_requests(); + ASSERT_NE(after.at(0).at(0).token_ids, before.at(0).at(0).token_ids); + ASSERT_NE(after.at(0).at(0).log_probs, before.at(0).at(0).log_probs); + ASSERT_EQ(after.at(0).at(0).token_ids, tokens_0); + ASSERT_EQ(after.at(0).at(0).log_probs, log_probs_0); + + ASSERT_EQ(after.at(0).size(), 1); + ASSERT_EQ(after.at(0).size(), 1); +} + +TEST_F(CBForSDTest, init_sequence_by_not_empty__two_sequence) { + std::vector input_vector{0, 1, 2, 3, 4}; + ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); + m_pipeline.add_request(0, input_tensor); + + std::vector tokens_0 = { 0, 1, 2 }, + tokens_1 = { 0, 1 }; + std::vector log_probs_0 = { 0.1f, 0.2f, 0.3f }, + log_probs_1 = { 0.1f, 0.2f }; + ov::genai::GeneratedSequences candidate{ + { 0, ov::genai::GeneratedSequence(tokens_0, log_probs_0) }, + { 1, ov::genai::GeneratedSequence(tokens_1, log_probs_1) } + }; + + auto before = m_pipeline.get_generated_requests(); + auto update_result = m_pipeline.init_request_by_candidate(0, candidate); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 2); + + auto after = m_pipeline.get_generated_requests(); + ASSERT_NE(after.at(0).at(0).token_ids, before.at(0).at(0).token_ids); + ASSERT_NE(after.at(0).at(0).log_probs, before.at(0).at(0).log_probs); + ASSERT_EQ(after.at(0).at(0).token_ids, tokens_1); + ASSERT_EQ(after.at(0).at(0).log_probs, log_probs_1); + + ASSERT_EQ(after.at(0).at(1).token_ids, tokens_1); + ASSERT_EQ(after.at(0).at(1).log_probs, log_probs_1); +} + +TEST_F(CBForSDTest, init_sequence_by_empty__two_sequence) { + std::vector input_vector{0, 1, 2, 3, 4}; + ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); + m_pipeline.add_request(0, input_tensor); + + std::vector tokens = {}; + std::vector log_probs = {}; + ov::genai::GeneratedSequences candidate{ + { 0, ov::genai::GeneratedSequence(tokens, log_probs) }, + { 1, ov::genai::GeneratedSequence(tokens, log_probs) }, + }; + + auto before = m_pipeline.get_generated_requests(); + auto update_result = m_pipeline.init_request_by_candidate(0, candidate); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 0); + + auto after = m_pipeline.get_generated_requests(); + ASSERT_EQ(after.at(0).at(0).token_ids, before.at(0).at(0).token_ids); + ASSERT_EQ(after.at(0).at(0).log_probs, before.at(0).at(0).log_probs); + ASSERT_EQ(after.at(0).at(0).token_ids, tokens); + ASSERT_EQ(after.at(0).at(0).log_probs, log_probs); + ASSERT_EQ(after.at(0).at(1).token_ids, tokens); + ASSERT_EQ(after.at(0).at(1).log_probs, log_probs); +} + +TEST_F(CBForSDTest, no_updated_tokens__two_sequence) { + std::vector input_vector{0, 1, 2, 3, 4}; + ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); + m_pipeline.add_request(0, input_tensor); + + std::vector tokens_0 = { 0, 1, 2 }, tokens_1 = { 0, 1 }; + std::vector log_probs_0 = { 0.1f, 0.2f, 0.3f }, log_probs_1 = { 0.1f, 0.2f }; + ov::genai::GeneratedSequences candidate{ + { 0, ov::genai::GeneratedSequence(tokens_0, log_probs_0) }, + { 1, ov::genai::GeneratedSequence(tokens_1, log_probs_1) }, + }; + + auto update_result = m_pipeline.init_request_by_candidate(0, candidate); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 2); + + ov::genai::GeneratedSequences candidate_1{ + { 0, ov::genai::GeneratedSequence(tokens_1, log_probs_1) }, + { 1, ov::genai::GeneratedSequence(tokens_1, log_probs_1) }, + }; + + auto before = m_pipeline.get_generated_requests(); + update_result = m_pipeline.update_request(0, candidate_1, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 0); + + auto after = m_pipeline.get_generated_requests(); + ASSERT_EQ(after.at(0).at(0).token_ids, tokens_1); + ASSERT_EQ(after.at(0).at(0).log_probs, log_probs_1); + ASSERT_EQ(after.at(0).at(1).token_ids, tokens_1); + ASSERT_EQ(after.at(0).at(1).log_probs, log_probs_1); +} + +TEST_F(CBForSDTest, remove_tokens__two_sequence) { + std::vector input_vector{0, 1, 2, 3, 4}; + ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); + m_pipeline.add_request(0, input_tensor); + + std::vector tokens = { 0, 1, 2 }; + std::vector log_probs = { 0.1f, 0.2f, 0.3f }; + ov::genai::GeneratedSequences candidate{ + { 0, ov::genai::GeneratedSequence(tokens, log_probs) }, + { 1, ov::genai::GeneratedSequence(tokens, log_probs) }, + }; + + auto update_result = m_pipeline.init_request_by_candidate(0, candidate); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 3); + + std::vector tokens_new = { 0, 1 }; + std::vector log_probs_new = { 0.1f, 0.2f }; + ov::genai::GeneratedSequences candidate_1{ + { 0, ov::genai::GeneratedSequence(tokens, log_probs) }, + { 1, ov::genai::GeneratedSequence(tokens_new, log_probs_new) }, + }; + + auto before = m_pipeline.get_generated_requests(); + update_result = m_pipeline.update_request(0, candidate_1, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 1); + ASSERT_EQ(update_result.inserted_tokens_cnt, 0); + + auto after = m_pipeline.get_generated_requests(); + ASSERT_NE(after.at(0).at(0).token_ids, before.at(0).at(0).token_ids); + ASSERT_NE(after.at(0).at(0).log_probs, before.at(0).at(0).log_probs); + ASSERT_NE(after.at(0).at(1).token_ids, before.at(0).at(1).token_ids); + ASSERT_NE(after.at(0).at(1).log_probs, before.at(0).at(1).log_probs); + ASSERT_EQ(after.at(0).at(0).token_ids, tokens_new); + ASSERT_EQ(after.at(0).at(0).log_probs, log_probs_new); + ASSERT_EQ(after.at(0).at(1).token_ids, tokens_new); + ASSERT_EQ(after.at(0).at(1).log_probs, log_probs_new); +} + +TEST_F(CBForSDTest, remove_and_replace_tokens__two_sequence) { + std::vector input_vector{0, 1, 2, 3, 4}; + ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); + m_pipeline.add_request(0, input_tensor); + + std::vector tokens = { 0, 1, 2 }; + std::vector log_probs = { 0.1f, 0.2f, 0.3f }; + ov::genai::GeneratedSequences candidate{ + { 0, ov::genai::GeneratedSequence(tokens, log_probs) }, + { 1, ov::genai::GeneratedSequence(tokens, log_probs) }, + }; + + auto update_result = m_pipeline.init_request_by_candidate(0, candidate); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 3); + + std::vector new_tokens = { 0, 1, 4 }; + std::vector new_log_probs = { 0.1f, 0.2f, 0.4f }; + ov::genai::GeneratedSequences candidate_1{ + { 0, ov::genai::GeneratedSequence(tokens, log_probs) }, + { 1, ov::genai::GeneratedSequence(new_tokens, new_log_probs) }, + }; + + auto before = m_pipeline.get_generated_requests(); + update_result = m_pipeline.update_request(0, candidate_1, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 1); + ASSERT_EQ(update_result.inserted_tokens_cnt, 1); + + auto after = m_pipeline.get_generated_requests(); + ASSERT_EQ(after.at(0).at(0).token_ids, before.at(0).at(0).token_ids); + ASSERT_EQ(after.at(0).at(0).log_probs, before.at(0).at(0).log_probs); + ASSERT_EQ(after.at(0).at(0).token_ids, tokens); + ASSERT_EQ(after.at(0).at(0).log_probs, log_probs); + ASSERT_NE(after.at(0).at(1).token_ids, before.at(0).at(1).token_ids); + ASSERT_NE(after.at(0).at(1).log_probs, before.at(0).at(1).log_probs); + ASSERT_EQ(after.at(0).at(1).token_ids, new_tokens); + ASSERT_EQ(after.at(0).at(1).log_probs, new_log_probs); +} + +TEST_F(CBForSDTest, add_tokens__two_sequence) { + std::vector input_vector{0, 1, 2, 3, 4}; + ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); + m_pipeline.add_request(0, input_tensor); + + std::vector tokens = { 0, 1, 2 }; + std::vector log_probs = { 0.1f, 0.2f, 0.3f }; + ov::genai::GeneratedSequences candidate{ + { 0, ov::genai::GeneratedSequence(tokens, log_probs) }, + { 1, ov::genai::GeneratedSequence(tokens, log_probs) }, + }; + + auto update_result = m_pipeline.init_request_by_candidate(0, candidate); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 3); + + tokens = { 0, 1, 2, 3, 4 }; + log_probs = { 0.1f, 0.2f, 0.3f, 0.4f, 0.5f }; + std::vector new_tokens = { 0, 1, 2, 3, 4, 5 }; + std::vector new_log_probs = { 0.1f, 0.2f, 0.3f, 0.4f, 0.5f, 0.6f }; + ov::genai::GeneratedSequences candidate_1{ + { 0, ov::genai::GeneratedSequence(tokens, log_probs) }, + { 1, ov::genai::GeneratedSequence(new_tokens, new_log_probs) }, + }; + + auto before = m_pipeline.get_generated_requests(); + update_result = m_pipeline.update_request(0, candidate_1, true); + ASSERT_EQ(update_result.removed_tokens_cnt, 0); + ASSERT_EQ(update_result.inserted_tokens_cnt, 2); + + auto after = m_pipeline.get_generated_requests(); + ASSERT_NE(after.at(0).at(0).token_ids, before.at(0).at(0).token_ids); + ASSERT_NE(after.at(0).at(0).log_probs, before.at(0).at(0).log_probs); + ASSERT_EQ(after.at(0).at(0).token_ids, tokens); + ASSERT_EQ(after.at(0).at(0).log_probs, log_probs); + ASSERT_NE(after.at(0).at(1).token_ids, before.at(0).at(1).token_ids); + ASSERT_NE(after.at(0).at(1).log_probs, before.at(0).at(1).log_probs); + ASSERT_EQ(after.at(0).at(1).token_ids, tokens); + ASSERT_EQ(after.at(0).at(1).log_probs, log_probs); +} +