Skip to content

Commit

Permalink
fix: code index has no failed chunk count even when embed failed (#3490)
Browse files Browse the repository at this point in the history
* test: add code index builder test for failed chunk count

Signed-off-by: Wei Zhang <[email protected]>

* fix: indexer return error when embed failed

Signed-off-by: Wei Zhang <[email protected]>

* [autofix.ci] apply automated fixes

* chore: skip chunk if embedding failed

Signed-off-by: Wei Zhang <[email protected]>

* test: revert making test pub

Signed-off-by: Wei Zhang <[email protected]>

---------

Signed-off-by: Wei Zhang <[email protected]>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
  • Loading branch information
zwpaper and autofix-ci[bot] authored Nov 29, 2024
1 parent 722ed16 commit 644c6ad
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 85 deletions.
2 changes: 1 addition & 1 deletion crates/tabby-common/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl IndexSchema {

let field_updated_at = builder.add_date_field(FIELD_UPDATED_AT, INDEXED | STORED);
let field_failed_chunks_count =
builder.add_u64_field(FIELD_FAILED_CHUNKS_COUNT, INDEXED | FAST);
builder.add_u64_field(FIELD_FAILED_CHUNKS_COUNT, INDEXED | FAST | STORED);
let field_attributes = builder.add_text_field("attributes", STORED);

let field_chunk_id = builder.add_text_field(FIELD_CHUNK_ID, STRING | FAST | STORED);
Expand Down
14 changes: 12 additions & 2 deletions crates/tabby-index/src/code/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ async fn add_changed_documents(

let id = SourceCode::to_index_id(&repository.source_id, &key).id;

if cloned_index.is_indexed(&id) {
// Skip if already indexed
// Skip if already indexed and has no failed chunks
if !require_updates(cloned_index.clone(), &id) {
continue;
}

Expand All @@ -110,6 +110,8 @@ async fn add_changed_documents(
}

let (_, s) = builder.build(code).await;
// must delete before adding, otherwise the some fields like failed_chunks_count will remain
cloned_index.delete(&id);
for await task in s {
yield task;
}
Expand All @@ -133,6 +135,14 @@ async fn add_changed_documents(
count_docs
}

fn require_updates(indexer: Arc<Indexer>, id: &str) -> bool {
if indexer.is_indexed(id) && !indexer.has_failed_chunks(id) {
return false;
};

true
}

fn is_valid_file(file: &SourceCode) -> bool {
file.max_line_length <= MAX_LINE_LENGTH_THRESHOLD
&& file.avg_line_length <= AVG_LINE_LENGTH_THRESHOLD
Expand Down
26 changes: 2 additions & 24 deletions crates/tabby-index/src/code/intelligence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,34 +247,12 @@ mod metrics {

#[cfg(test)]
mod tests {
use std::path::PathBuf;

use serial_test::file_serial;
use tabby_common::{
config::{config_index_to_id, CodeRepository},
path::set_tabby_root,
};
use tabby_common::path::set_tabby_root;
use tracing_test::traced_test;

use super::*;

fn get_tabby_root() -> PathBuf {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("testdata");
path
}

fn get_repository_config() -> CodeRepository {
CodeRepository::new("https://github.com/TabbyML/tabby", &config_index_to_id(0))
}

fn get_rust_source_file() -> PathBuf {
let mut path = get_tabby_root();
path.push("repositories");
path.push("https_github.com_TabbyML_tabby");
path.push("rust.rs");
path
}
use crate::testutils::{get_repository_config, get_rust_source_file, get_tabby_root};

#[test]
#[traced_test]
Expand Down
34 changes: 24 additions & 10 deletions crates/tabby-index/src/code/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use anyhow::{bail, Result};
use async_stream::stream;
use async_trait::async_trait;
use futures::stream::BoxStream;
Expand Down Expand Up @@ -68,7 +69,7 @@ impl IndexAttributeBuilder<SourceCode> for CodeBuilder {
async fn build_chunk_attributes<'a>(
&self,
source_code: &'a SourceCode,
) -> BoxStream<'a, JoinHandle<(Vec<String>, serde_json::Value)>> {
) -> BoxStream<'a, JoinHandle<Result<(Vec<String>, serde_json::Value)>>> {
let text = match source_code.read_content() {
Ok(content) => content,
Err(e) => {
Expand All @@ -77,13 +78,22 @@ impl IndexAttributeBuilder<SourceCode> for CodeBuilder {
source_code.filepath, e
);

return Box::pin(futures::stream::empty());
return Box::pin(stream! {
let path = source_code.filepath.clone();
yield tokio::spawn(async move {
bail!("Failed to read content of '{}': {}", path, e);
});
});
}
};

let Some(embedding) = self.embedding.clone() else {
warn!("No embedding service found for code indexing");
return Box::pin(futures::stream::empty());
return Box::pin(stream! {
yield tokio::spawn(async move {
bail!("No embedding service found for code indexing");
});
});
};

let source_code = source_code.clone();
Expand All @@ -100,8 +110,10 @@ impl IndexAttributeBuilder<SourceCode> for CodeBuilder {
let embedding = embedding.clone();
let rewritten_body = format!("```{}\n{}\n```", source_code.filepath, body);
yield tokio::spawn(async move {
let tokens = build_binarize_embedding_tokens(embedding.clone(), &rewritten_body).await;
(tokens, attributes)
match build_binarize_embedding_tokens(embedding.clone(), &rewritten_body).await {
Ok(tokens) => Ok((tokens, attributes)),
Err(err) => Err(err),
}
});
}
};
Expand All @@ -110,12 +122,14 @@ impl IndexAttributeBuilder<SourceCode> for CodeBuilder {
}
}

async fn build_binarize_embedding_tokens(embedding: Arc<dyn Embedding>, body: &str) -> Vec<String> {
async fn build_binarize_embedding_tokens(
embedding: Arc<dyn Embedding>,
body: &str,
) -> Result<Vec<String>> {
let embedding = match embedding.embed(body).await {
Ok(x) => x,
Err(err) => {
warn!("Failed to embed chunk text: {}", err);
return Vec::new();
bail!("Failed to embed chunk text: {}", err);
}
};

Expand All @@ -124,10 +138,10 @@ async fn build_binarize_embedding_tokens(embedding: Arc<dyn Embedding>, body: &s
tokens.push(token);
}

tokens
Ok(tokens)
}

fn create_code_builder(embedding: Option<Arc<dyn Embedding>>) -> TantivyDocBuilder<SourceCode> {
pub fn create_code_builder(embedding: Option<Arc<dyn Embedding>>) -> TantivyDocBuilder<SourceCode> {
let builder = CodeBuilder::new(embedding);
TantivyDocBuilder::new(corpus::CODE, builder)
}
28 changes: 10 additions & 18 deletions crates/tabby-index/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashSet;

use anyhow::bail;
use anyhow::{bail, Result};
use async_stream::stream;
use futures::{stream::BoxStream, Stream, StreamExt};
use serde_json::json;
Expand Down Expand Up @@ -43,7 +43,7 @@ pub trait IndexAttributeBuilder<T>: Send + Sync {
async fn build_chunk_attributes<'a>(
&self,
document: &'a T,
) -> BoxStream<'a, JoinHandle<(Vec<String>, serde_json::Value)>>;
) -> BoxStream<'a, JoinHandle<Result<(Vec<String>, serde_json::Value)>>>;
}

pub struct TantivyDocBuilder<T> {
Expand Down Expand Up @@ -79,17 +79,15 @@ impl<T: ToIndexId> TantivyDocBuilder<T> {
let mut failed_count: u64 = 0;
for await chunk_doc in self.build_chunks(cloned_id, source_id.clone(), updated_at, document).await {
match chunk_doc.await {
Ok((Some(doc), ok)) => {
if !ok {
failed_count += 1;
}
Ok(Ok(doc)) => {
yield tokio::spawn(async move { Some(doc) });
}
Ok((None, _)) => {
Ok(Err(e)) => {
warn!("Failed to build chunk for document '{}': {}", doc_id, e);
failed_count += 1;
}
Err(e) => {
warn!("Failed to build chunk for document '{}': {}", doc_id, e);
warn!("Failed to call build chunk '{}': {}", doc_id, e);
failed_count += 1;
}
}
Expand Down Expand Up @@ -118,23 +116,17 @@ impl<T: ToIndexId> TantivyDocBuilder<T> {
source_id: String,
updated_at: tantivy::DateTime,
document: T,
) -> impl Stream<Item = JoinHandle<(Option<TantivyDocument>, bool)>> + '_ {
) -> impl Stream<Item = JoinHandle<Result<TantivyDocument>>> + '_ {
let kind = self.corpus;
stream! {
let schema = IndexSchema::instance();
for await (chunk_id, task) in self.builder.build_chunk_attributes(&document).await.enumerate() {
let id = id.clone();
let source_id = source_id.clone();

// The tokens may be empty if the embedding call fails,
// but the attributes remain useful.
// Therefore, we return:
// the document, and
// a flag indicating whether the tokens were created successfully.
yield tokio::spawn(async move {
let Ok((tokens, chunk_attributes)) = task.await else {
return (None, false);
};
let built_chunk_attributes_result = task.await?;
let (tokens, chunk_attributes) = built_chunk_attributes_result?;

let mut doc = doc! {
schema.field_id => id,
Expand All @@ -149,7 +141,7 @@ impl<T: ToIndexId> TantivyDocBuilder<T> {
doc.add_text(schema.field_chunk_tokens, token);
}

(Some(doc), !tokens.is_empty())
Ok(doc)
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@ mod mock_embedding {

pub struct MockEmbedding {
result: Vec<f32>,
error: bool,
}

impl MockEmbedding {
pub fn new(result: Vec<f32>) -> Self {
Self { result }
pub fn new(result: Vec<f32>, error: bool) -> Self {
Self { result, error }
}
}

#[async_trait]
impl Embedding for MockEmbedding {
async fn embed(&self, prompt: &str) -> Result<Vec<f32>> {
if prompt.starts_with("error") {
Err(anyhow::anyhow!(prompt.to_owned()))
if self.error {
Err(anyhow::anyhow!(
"Mock error, prompt length {}",
prompt.len()
))
} else {
Ok(self.result.clone())
}
Expand Down Expand Up @@ -51,7 +55,7 @@ mod structured_doc_tests {
tabby_common::path::set_tabby_root(temp_dir.to_owned());

let id = "structured_doc_empty_embedding";
let embedding = MockEmbedding::new(vec![]);
let embedding = MockEmbedding::new(vec![], true);
let embedding = Arc::new(embedding);
let indexer = StructuredDocIndexer::new(embedding.clone());
let doc = StructuredDoc {
Expand Down Expand Up @@ -103,7 +107,7 @@ mod structured_doc_tests {
tabby_common::path::set_tabby_root(temp_dir.to_owned());

let id = "structured_doc_with_embedding";
let embedding = MockEmbedding::new(vec![1.0]);
let embedding = MockEmbedding::new(vec![1.0], false);
let embedding = Arc::new(embedding);
let indexer = StructuredDocIndexer::new(embedding.clone());
let doc = StructuredDoc {
Expand Down Expand Up @@ -159,13 +163,59 @@ mod builder_tests {

use super::mock_embedding::MockEmbedding;
use crate::{
indexer::TantivyDocBuilder,
code::{create_code_builder, intelligence::CodeIntelligence},
indexer::{TantivyDocBuilder, ToIndexId},
structured_doc::{
public::{StructuredDoc, StructuredDocFields, StructuredDocIssueFields},
StructuredDocBuilder,
},
testutils::{get_repository_config, get_rust_source_file, get_tabby_root},
};

#[test]
#[file_serial(set_tabby_root)]
fn test_builder_code_empty_embedding() {
let origin_root = tabby_common::path::tabby_root();
tabby_common::path::set_tabby_root(get_tabby_root());

let embedding = MockEmbedding::new(vec![], true);
let builder = Arc::new(create_code_builder(Some(Arc::new(embedding))));

let repo = get_repository_config();
let code = CodeIntelligence::compute_source_file(&repo, &get_rust_source_file()).unwrap();
let index_id = code.to_index_id();

let (id, s) = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { builder.build(code).await });
assert_eq!(id, index_id.id);

let res = tokio::runtime::Runtime::new().unwrap().block_on(async {
s.buffer_unordered(std::cmp::max(
std::thread::available_parallelism().unwrap().get() * 2,
32,
))
.collect::<Vec<_>>()
.await
});

// the chunks should be failed as no embedding is provided
// the last element is the document itself
assert_eq!(res.len(), 1);
let doc = res.last().unwrap().as_ref().unwrap().as_ref().unwrap();

let schema = IndexSchema::instance();
let failed_count = doc
.get_first(schema.field_failed_chunks_count)
.and_then(|v| v.as_u64())
.unwrap();

// the first three are the chunks and failed, counted as 3
assert_eq!(failed_count, 3);

tabby_common::path::set_tabby_root(origin_root);
}

/// Test that the indexer return the document and none itself
/// when the embedding is empty
#[test]
Expand All @@ -176,7 +226,7 @@ mod builder_tests {
tabby_common::path::set_tabby_root(temp_dir.to_owned());

let test_id = "builder_empty_embedding";
let embedding = MockEmbedding::new(vec![]);
let embedding = MockEmbedding::new(vec![], true);
let builder = StructuredDocBuilder::new(Arc::new(embedding));
let tantivy_builder = TantivyDocBuilder::new(corpus::STRUCTURED_DOC, builder);

Expand Down Expand Up @@ -204,10 +254,12 @@ mod builder_tests {
.await
});

// the last element is the document itself
// the rest are the chunks
assert_eq!(res.len(), 2);
let doc = res[1].as_ref().unwrap().as_ref().unwrap();
// The last element is the document itself,
// while the preceding elements are the chunks.
// Given that the embedding is empty,
// all chunks should be considered failed and skipped.
assert_eq!(res.len(), 1);
let doc = res.last().unwrap().as_ref().unwrap().as_ref().unwrap();

let schema = IndexSchema::instance();
let failed_count = doc
Expand All @@ -231,7 +283,7 @@ mod builder_tests {
tabby_common::path::set_tabby_root(temp_dir.to_owned());

let test_id = "builder_with_embedding";
let embedding = MockEmbedding::new(vec![1.0]);
let embedding = MockEmbedding::new(vec![1.0], false);
let builder = StructuredDocBuilder::new(Arc::new(embedding));
let tantivy_builder = TantivyDocBuilder::new(corpus::STRUCTURED_DOC, builder);

Expand Down
5 changes: 4 additions & 1 deletion crates/tabby-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ mod code;
mod indexer;
mod tantivy_utils;

#[cfg(test)]
mod testutils;

use indexer::{IndexAttributeBuilder, Indexer};

mod structured_doc;

#[cfg(test)]
mod structured_doc_tests;
mod indexer_tests;

pub mod public {
use indexer::IndexGarbageCollector;
Expand Down
Loading

0 comments on commit 644c6ad

Please sign in to comment.