Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(index): delete skipped documents for StructuredDoc #3463

Merged
merged 9 commits into from
Nov 25, 2024
9 changes: 9 additions & 0 deletions crates/tabby-index/src/structured_doc/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@
true
}

pub async fn delete(&self, id: &str) -> bool {
if self.indexer.is_indexed(id) {
self.indexer.delete(id);
true

Check warning on line 50 in crates/tabby-index/src/structured_doc/public.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-index/src/structured_doc/public.rs#L47-L50

Added lines #L47 - L50 were not covered by tests
} else {
false

Check warning on line 52 in crates/tabby-index/src/structured_doc/public.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-index/src/structured_doc/public.rs#L52

Added line #L52 was not covered by tests
}
}

Check warning on line 54 in crates/tabby-index/src/structured_doc/public.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-index/src/structured_doc/public.rs#L54

Added line #L54 was not covered by tests

pub fn commit(self) {
self.indexer.commit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,23 @@
stream! {
let mut count = 0;
let mut num_updated = 0;
for await (updated_at, doc) in issue_stream.chain(pull_stream) {
if index.add(updated_at, doc).await {
let mut num_deleted = 0;
for await (state, doc) in issue_stream.chain(pull_stream) {
let id = &doc.id().to_owned();
if !state.should_clean && index.add(state.updated_at, doc).await {

Check warning on line 146 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L143-L146

Added lines #L143 - L146 were not covered by tests
zwpaper marked this conversation as resolved.
Show resolved Hide resolved
num_updated += 1
}
if state.should_clean && index.delete(id).await {
num_deleted += 1;
}

Check warning on line 151 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L149-L151

Added lines #L149 - L151 were not covered by tests

count += 1;
if count % 100 == 0 {
logkit::info!("{} docs seen, {} docs updated", count, num_updated);
logkit::info!("{} docs seen, {} docs updated, {} docs deleted", count, num_updated, num_deleted);

Check warning on line 155 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L155

Added line #L155 was not covered by tests
zwpaper marked this conversation as resolved.
Show resolved Hide resolved
};
}

logkit::info!("{} docs seen, {} docs updated", count, num_updated);
logkit::info!("{} docs seen, {} docs updated, {} docs deleted", count, num_updated, num_deleted);

Check warning on line 159 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L159

Added line #L159 was not covered by tests
zwpaper marked this conversation as resolved.
Show resolved Hide resolved
index.commit();
}
.count()
Expand Down Expand Up @@ -179,11 +184,16 @@
}
}

pub struct FetchState {
updated_at: DateTime<Utc>,
should_clean: bool,
}
zwpaper marked this conversation as resolved.
Show resolved Hide resolved

async fn fetch_all_issues(
integration: &Integration,
repository: &ProvidedRepository,
) -> tabby_schema::Result<BoxStream<'static, (DateTime<Utc>, StructuredDoc)>> {
let s: BoxStream<(DateTime<Utc>, StructuredDoc)> = match &integration.kind {
) -> tabby_schema::Result<BoxStream<'static, (FetchState, StructuredDoc)>> {
let s: BoxStream<(FetchState, StructuredDoc)> = match &integration.kind {

Check warning on line 196 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L195-L196

Added lines #L195 - L196 were not covered by tests
IntegrationKind::Github | IntegrationKind::GithubSelfHosted => list_github_issues(
&repository.source_id(),
integration.api_base(),
Expand All @@ -207,8 +217,8 @@
async fn fetch_all_pulls(
integration: &Integration,
repository: &ProvidedRepository,
) -> tabby_schema::Result<BoxStream<'static, (DateTime<Utc>, StructuredDoc)>> {
let s: BoxStream<(DateTime<Utc>, StructuredDoc)> = list_github_pulls(
) -> tabby_schema::Result<BoxStream<'static, (FetchState, StructuredDoc)>> {
let s: BoxStream<(FetchState, StructuredDoc)> = list_github_pulls(

Check warning on line 221 in ee/tabby-webserver/src/service/background_job/third_party_integration.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration.rs#L220-L221

Added lines #L220 - L221 were not covered by tests
&repository.source_id(),
integration.api_base(),
&repository.display_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
use serde::Deserialize;
use tabby_index::public::{StructuredDoc, StructuredDocFields, StructuredDocIssueFields};

use super::FetchState;
use crate::service::create_gitlab_client;

pub async fn list_github_issues(
source_id: &str,
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (FetchState, StructuredDoc)>> {

Check warning on line 18 in ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs#L18

Added line #L18 was not covered by tests
let octocrab = Octocrab::builder()
.personal_token(access_token.to_string())
.base_uri(api_base)?
Expand Down Expand Up @@ -62,7 +63,10 @@
closed: issue.state == octocrab::models::IssueState::Closed,
})
};
yield (issue.updated_at, doc);
yield (FetchState {
updated_at: issue.updated_at,
should_clean: false,
}, doc);

Check warning on line 69 in ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs#L66-L69

Added lines #L66 - L69 were not covered by tests
}

page += 1;
Expand All @@ -89,7 +93,7 @@
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (FetchState, StructuredDoc)>> {

Check warning on line 96 in ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs#L96

Added line #L96 was not covered by tests
let gitlab = create_gitlab_client(api_base, access_token).await?;

let source_id = source_id.to_owned();
Expand Down Expand Up @@ -118,7 +122,10 @@
body: issue.description.unwrap_or_default(),
closed: issue.state == "closed",
})};
yield (issue.updated_at, doc);
yield (FetchState {
updated_at: issue.updated_at,
should_clean: false,
}, doc);

Check warning on line 128 in ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs#L125-L128

Added lines #L125 - L128 were not covered by tests
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use anyhow::{anyhow, Result};
use async_stream::stream;
use chrono::{DateTime, Utc};
use futures::Stream;
use octocrab::{models::IssueState, Octocrab};
use tabby_index::public::{StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields};

use super::FetchState;

pub async fn list_github_pulls(
source_id: &str,
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (DateTime<Utc>, StructuredDoc)>> {
) -> Result<impl Stream<Item = (FetchState, StructuredDoc)>> {

Check warning on line 14 in ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs#L14

Added line #L14 was not covered by tests
let octocrab = Octocrab::builder()
.personal_token(access_token.to_string())
.base_uri(api_base)?
Expand Down Expand Up @@ -43,14 +44,32 @@
let pages = response.number_of_pages().unwrap_or_default();

for pull in response.items {
let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url);
let title = pull.title.clone().unwrap_or_default();
let body = pull.body.clone().unwrap_or_default();
let doc = StructuredDoc {
source_id: source_id.to_string(),
fields: StructuredDocFields::Pull(StructuredDocPullDocumentFields {
link: url.clone(),
title,
body,
merged: pull.merged_at.is_some(),
diff: String::new(),
}),
};

Check warning on line 60 in ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs#L47-L60

Added lines #L47 - L60 were not covered by tests
// skip closed but not merged pulls
if let Some(state) = pull.state {
if state == IssueState::Closed && pull.merged_at.is_none() {
continue
yield (FetchState{
updated_at: pull.updated_at.unwrap(),
should_clean: true,
}, doc);
continue;

Check warning on line 68 in ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs#L64-L68

Added lines #L64 - L68 were not covered by tests
}
}

let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url);

Check warning on line 72 in ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs#L72

Added line #L72 was not covered by tests
let diff = match octocrab.pulls(&owner, &repo).get_diff(pull.number).await {
Ok(x) if x.len() < 1024*1024*10 => x,
Ok(_) => {
Expand All @@ -71,10 +90,13 @@
body: pull.body.unwrap_or_default(),
diff,
merged: pull.merged_at.is_some(),
})
};
})};

Check warning on line 94 in ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs#L93-L94

Added lines #L93 - L94 were not covered by tests

yield (pull.updated_at.unwrap(), doc);
yield (FetchState{
updated_at: pull.updated_at.unwrap(),
should_clean: false,
}, doc);

Check warning on line 99 in ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs

View check run for this annotation

Codecov / codecov/patch

ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs#L96-L99

Added lines #L96 - L99 were not covered by tests
}

page += 1;
Expand Down
Loading