Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataframe API v2 p2: MVP implementation #7560

Merged
merged 9 commits into from
Oct 2, 2024
Merged

Conversation

teh-cmc
Copy link
Member

@teh-cmc teh-cmc commented Oct 1, 2024

A first implementation of the new dataframe APIs.
The name is now very misleading though: there isn't anything dataframe-y left in here, it is a row-based iterator with Rerun semantics baked in, driven by a sorted streaming join.

It is rather slow (related: #7558 (comment)), lacks many features and is full of edge cases, but it works.
It does support dedupe-latest semantics (slowly), view contents and selections, chunk overlaps, and pagination (horribly, by virtue of implementing Iterator).
It does not support Clears, nor latest-at sparse-filling, nor PoVs, nor index sampling. Yet.

Upcoming PRs will be all about fixing these shortcomings one by one.

It should look somewhat familiar:

let query_cache = QueryCache::new(store);
let query_engine = QueryEngine {
    store,
    cache: &query_cache,
};

let mut query = QueryExpression2::new(timeline);
query.view_contents = Some(
    query_engine
        .iter_entity_paths(&entity_path_filter)
        .map(|entity_path| (entity_path, None))
        .collect(),
);
query.filtered_index_range = Some(ResolvedTimeRange::new(time_from, time_to));
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
// eprintln!("{:#?}", query_handle.selected_contents());
for batch in query_handle.into_batch_iter().skip(offset).take(len) {
    eprintln!("{batch}");
}

No tests until we have the guarantee that these are the semantics we will commit to.

Checklist

  • I have read and agree to Contributor Guide and the Code of Conduct
  • I've included a screenshot or gif (if applicable)
  • I have tested the web demo (if applicable):
  • The PR title and labels are set such as to maximize their usefulness for the next release's CHANGELOG
  • If applicable, add a new check to the release checklist!
  • If have noted any breaking changes to the log API in CHANGELOG.md and the migration guide

To run all checks from main, comment on the PR with @rerun-bot full-check.

@teh-cmc teh-cmc added 🔍 re_query affects re_query itself do-not-merge Do not merge this PR include in changelog labels Oct 1, 2024
@teh-cmc teh-cmc force-pushed the cmc/dataframev2_2_api_impl branch from 82d2b01 to e37a795 Compare October 1, 2024 15:21
@teh-cmc teh-cmc marked this pull request as ready for review October 1, 2024 15:28
@teh-cmc teh-cmc force-pushed the cmc/dataframev2_1_api_def branch from 18886e3 to a989ac2 Compare October 1, 2024 16:55
@teh-cmc teh-cmc force-pushed the cmc/dataframev2_2_api_impl branch from e37a795 to ae03b16 Compare October 1, 2024 17:01
Comment on lines 460 to 483
let cur_index_value = streaming_state_per_component
.values()
// NOTE: We're purposefully ignoring RowId-related semantics here: we just want to know
// the value we're looking for on the "main" index (dedupe semantics).
.min_by_key(|streaming_state| streaming_state.index_value)
.map(|streaming_state| streaming_state.index_value)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than doing this on every call to next_row, I suspect it might be clearer to do this whole thing as a 2-phased process.

First, work just with the Timeline data from every view-relevant chunk to materialize a new column of sorted/unique TimeInt values (note, as an added benefit this is the same input you'll want to be able to feed into sample_index_values() anyways). This could still be done incrementally, "batch-wise" by only looking at overlapping chunks on some horizon.

Then, once we have the ability to iterate over batches of TimeInts, we iterate through them incrementally and look for the matching values from the relevant chunks, as you're doing below, which now becomes a common code-path between this implementation and sampled_index_values()

Additionally, my gut is that having batches of unique TimeInts in advance sets us up nicely for some future optimizations.

  • It lets us fairly easily parallelize the per-selected-column work. Each worker can independently yield a sequence of rows matching the requested sequence of TimeInts.
  • It lets us look ahead to check for matching runs in the given columns. Any time we have a matching run in a range with a single column (happy path) we can directly yield a slice of multiple rows from our column-generator.
  • Similarly, null runs can quickly be identified and generated when the last TimeInt in the requested batch is less than the next available time-int for the column.
  • The aggregator consuming from each of the parallel columns generators can then yield RecordBatches based on overlapping row-runs from the separate columns, which means in the happy path of dense non-overlapping chunks we return to getting nice contiguous slices again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do these improvements in follow up PRs, let's focus on landing all semantics first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed -- not a requested change. Just an observation about the structure to keep in mind as you refactor in the direction of supporting sampled_index_values()

@teh-cmc teh-cmc force-pushed the cmc/dataframev2_1_api_def branch from 7d1cb72 to 39cfb1a Compare October 2, 2024 10:02
Base automatically changed from cmc/dataframev2_1_api_def to main October 2, 2024 10:07
@teh-cmc teh-cmc force-pushed the cmc/dataframev2_2_api_impl branch from 1d76116 to 94a9c09 Compare October 2, 2024 10:07
@teh-cmc teh-cmc force-pushed the cmc/dataframev2_2_api_impl branch from 94a9c09 to 9ce5152 Compare October 2, 2024 12:42
@teh-cmc teh-cmc force-pushed the cmc/dataframev2_2_api_impl branch from bc4f392 to 9412150 Compare October 2, 2024 12:53
@teh-cmc teh-cmc removed the do-not-merge Do not merge this PR label Oct 2, 2024
@teh-cmc
Copy link
Member Author

teh-cmc commented Oct 2, 2024

We've integrated all of this in @abey79's work-in-progress dataframe-view -- everything works semantics-wise.

Next steps (future PRs):

  • implement all missing features
  • make it fast
  • minimal testing

Copy link
Member

@jleibs jleibs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

@teh-cmc teh-cmc merged commit aab3ed9 into main Oct 2, 2024
33 of 34 checks passed
@teh-cmc teh-cmc deleted the cmc/dataframev2_2_api_impl branch October 2, 2024 15:01
@teh-cmc teh-cmc changed the title Dataframe API v2 #2: MVP implementation Dataframe API v2 [2: MVP implementation Oct 3, 2024
@teh-cmc teh-cmc changed the title Dataframe API v2 [2: MVP implementation Dataframe API v2 p2: MVP implementation Oct 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🔍 re_query affects re_query itself
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants