Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): add table_id parameter in state store write and sync method for future partial checkpoint support #1812

Closed
wants to merge 9 commits into from

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented Apr 13, 2022

What's changed and what's your intention?

This PR is for supporting partial checkpoint proposed in #1157.

In this PR, we add a table_id parameter in ingest_batch, sync and wait_epoch method of trait StateStore so that we are able to keep track of where the kvs are written from. Besides, we group the write batches by their table_id in shared_buffer_uploader and enable syncing a subset of tables in an epoch.

Such concept of table is introduced for finer-grained tracking about where the kvs are written from, so that we are able to support partial checkpoint, vertical grouping and shared arrangement in the future.

A storage table id is usually the operator id, or also the table id of the relational table. Currently the table_id is not assigned yet, and all table_ids are GLOBAL_STORAGE_TABLE_ID for the time being. In future development we will assign table_id according to operator id.

Some unit tests and related structs are modified accordingly.

Checklist

  • I have written necessary docs and comments
  • I have added necessary unit tests and integration tests

Refer to a related PR or issue link (optional)

#1157

@wenym1 wenym1 requested a review from hzxa21 April 13, 2022 07:35
@codecov
Copy link

codecov bot commented Apr 13, 2022

Codecov Report

Merging #1812 (7ca89ea) into main (4f31c18) will increase coverage by 0.08%.
The diff coverage is 70.34%.

@@            Coverage Diff             @@
##             main    #1812      +/-   ##
==========================================
+ Coverage   70.76%   70.85%   +0.08%     
==========================================
  Files         607      609       +2     
  Lines       79619    79943     +324     
==========================================
+ Hits        56344    56644     +300     
- Misses      23275    23299      +24     
Flag Coverage Δ
rust 70.85% <70.34%> (+0.08%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/bench/ss_bench/operations/write_batch.rs 0.00% <0.00%> (ø)
src/storage/src/monitor/monitored_store.rs 2.64% <0.00%> (-0.17%) ⬇️
src/storage/src/panic_store.rs 0.00% <0.00%> (ø)
src/storage/src/rocksdb_local_mock.rs 0.00% <0.00%> (ø)
src/storage/src/tikv_mock.rs 0.00% <0.00%> (ø)
src/stream/src/task/stream_manager.rs 0.00% <0.00%> (ø)
...rc/hummock/shared_buffer/shared_buffer_uploader.rs 82.97% <72.72%> (-4.10%) ⬇️
src/storage/src/hummock/state_store_tests.rs 68.67% <72.72%> (+0.11%) ⬆️
src/storage/src/write_batch.rs 79.12% <75.00%> (-0.19%) ⬇️
src/storage/src/memory.rs 71.61% <76.78%> (+0.11%) ⬆️
... and 32 more

📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

@BugenZhao
Copy link
Member

Why don't we directly store the "table id" in the Keyspace when we creating the instance?

Also, could you please explain the meaning of table here? It sounds confusing to me.

@wenym1 wenym1 self-assigned this Apr 13, 2022
@wenym1
Copy link
Contributor Author

wenym1 commented Apr 13, 2022

Why don't we directly store the "table id" in the Keyspace when we creating the instance?

In the best case, we will put this table_id in Keyspace, and the general key encoding in storage will become (table_id, 'user space key', epoch), and then executor can read and write through Keyspace providing only user space key.

However, we still need much extra work to reach such best case, since all code related to key coding in executor should be changed, and this is not strongly relevant to the topic of this PR. Besides, in this PR, table_id is not included in key encoding yet, it is just a marker about where the data is written from. Therefore I'd prefer not put table_id in Keyspace yet and do it when relational table is implemented.

Also, could you please explain the meaning of table here? It sounds confusing to me.

I've been struggling about the naming for days. In general, such table is a concept in storage layer, corresponding to the relational table written by executors in individual stateful operator. Since it is corresponding to the relation table, I call it table.

In the first place, I introduce this table for partial checkpoint. We will keep track of all table ids of each mview in meta node, and when all tables of a mview have collected the barrier and synced the state, we can commit the epoch on the mview. And in the future we can use this table_id to do vertical grouping or shared arrangement.

Do you have any naming suggestion on this concept?

@skyzh
Copy link
Contributor

skyzh commented Apr 13, 2022

In this PR, we add a table_id parameter in ingest_batch, sync and wait_epoch method of trait StateStore so that we are able to keep track of where the kvs are written from. Besides, we group the write batches by their table_id in shared_buffer_uploader and enable syncing a subset of tables in an epoch.

Why not simply use the key prefix as table id? e.g. the first 4 bytes? All states are now prefixed by either table id or executor id.

And this table_id looks somehow similar to a hummock group?

@skyzh
Copy link
Contributor

skyzh commented Apr 13, 2022

Okay, I see the comments above.

it is just a marker about where the data is written from

This is exactly the same as executor id? Do you want to know which actor the data are written from?

@BugenZhao
Copy link
Member

Therefore I'd prefer not put table_id in Keyspace yet and do it when relational table is implemented.

I mean putting the table_id into the struct of Keyspace since we can extract them when creating. Under current design, the Keyspace is per-executor or per-operator, so it will always belong to a single "table". Thus, there's no need to pass table_id as a parameter everywhere since this info is fixed and so that redundant. A lot of changes in this PR can be avoided.

Do you have any naming suggestion on this concept?

How about "group" or "checkpoint group"?

@skyzh
Copy link
Contributor

skyzh commented Apr 13, 2022

I mean putting the table_id into the struct of Keyspace

Yes, I'd agree on that. We already have such keyspace to determine the prefix, we can make it also determine the so-called "checkpoint group". It's already making life hard for stateful executors, as they are now required to store an epoch. This PR requires them to store a group id, and there will only be more and more in the future. Executors themselves don't care about such information, we can make it part of keyspace. 🤣

src/storage/src/hummock/conflict_detector.rs Outdated Show resolved Hide resolved
src/storage/src/lib.rs Outdated Show resolved Hide resolved
@wenym1
Copy link
Contributor Author

wenym1 commented Apr 13, 2022

Thus, there's no need to pass table_id as a parameter everywhere since this info is fixed and so that redundant. A lot of changes in this PR can be avoided.

@BugenZhao
I've tried putting the table_id in Keyspace in the first place, and the PR is of the same size, or even larger. Code involving Keyspace and table_id at the same time are almost all keyspace.state_store().start_write_batch(table_id). The diff is caused by the interface change in the start_write_batch of StateStore rather than the Keyspace.

Besides, if we want to put table_id in Keyspace, we must change the method interface that initializes Keyspace, and any code initializing Keyspace will also change to pass the table_id parameter in. Whenever there is a Keyspace initialization, there will be a keyspace.scan(...) or keyspace.state_store().start_write_batch. And under current PR, only code with keyspace.state_store().start_write_batch have diff, which is less than changing the Keyspace initialization. And in general It is inevitable to have a lot of change.

@BugenZhao
Copy link
Member

The diff is caused by the interface change in the start_write_batch of StateStore rather than the Keyspace.

I think there's even no need to change the signature of initialization function of Keyspaces.

@skyzh
Copy link
Contributor

skyzh commented Apr 13, 2022

For test cases, simply have something like new_without_id. For production code, I think there are no more than 20 occurrences of creating a key space throughout the codebase, which should be easy to work through.

@skyzh
Copy link
Contributor

skyzh commented Apr 13, 2022

Also we can have write_batch and write_batch_with_group_id. Even having the group_id in write_batch struct sounds okay to me.

@wenym1
Copy link
Contributor Author

wenym1 commented Apr 13, 2022

I mean putting the table_id into the struct of Keyspace

Yes, I'd agree on that. We already have such keyspace to determine the prefix, we can make it also determine the so-called "checkpoint group". It's already making life hard for stateful executors, as they are now required to store an epoch. This PR requires them to store a group id, and there will only be more and more in the future. Executors themselves don't care about such information, we can make it part of keyspace. 🤣

@skyzh Yes, I agree on avoiding executor to store this table_id or group_id or whatever. My point is this table_id is not part of the key encoding yet, it is only used as a marker in write, while Keyspace is only for read for the time being. So I don't think we should store the table_id in Keyspace in this PR.

As for where to store the table id, in near future when the relation table is implemented, the table itself will definitely store its own id, and then we can pass its own id when it want to write to state store.

@skyzh
Copy link
Contributor

skyzh commented Apr 13, 2022

... and if table_id conveys the same meaning as described in PR body (table_id or operator_id), it's simply first 5 bytes of the key.

@skyzh
Copy link
Contributor

skyzh commented Apr 13, 2022

My point is this table_id is not part of the key encoding yet,

It is of course part of the key encoding -- every keyspace begins with either table_root, executor_root or shared_executor_root. And therefore the prefix generated for key space's write batch begins with such prefixes.

https://github.com/singularity-data/risingwave/blob/59b689860a12f1bd887f7041c5a488359f2351eb/src/storage/src/keyspace.rs#L32-L70

@BugenZhao
Copy link
Member

BugenZhao commented Apr 13, 2022

while Keyspace is only for read for the time being

The design of Keyspace is for both reading & writing. We extracted the WriteBatch just to make the caller easier and you can find a reference of Keyspace inside as well.

My point is that since it never changes from the view of executor, there's no need to pass it everywhere. We should minimize the impact of codebase as much as possible.

@wenym1
Copy link
Contributor Author

wenym1 commented Apr 13, 2022

My point is this table_id is not part of the key encoding yet,

It is of course part of the key encoding -- every keyspace begins with either table_root, executor_root or shared_executor_root. And therefore the prefix generated for key space's write batch begins with such prefixes.

https://github.com/singularity-data/risingwave/blob/59b689860a12f1bd887f7041c5a488359f2351eb/src/storage/src/keyspace.rs#L32-L70

I mean the table_id to pass to state store write is not yet part of the encoding (though after partial checkpoint is implemented their value will be the same). And I don't want to touch the logic on the executor side (the key encoding part) in this PR.

My development plan is that at the very first beginning, the table_id to pass to state store write is independent to the relation table id or operator id and all storage tables share the GLOBAL_STORAGE_TABLE_ID. Then I change the metadata schema to support partial checkpoint, and then other component. During the development, partial checkpoint is enabled step by step, but the whole graph will still commit all together because they share the same GLOBAL_STORAGE_TABLE_ID, and when everything is ready, we then assign the real operator id or relation table id to the storage table id and start testing partial checkpoint.

@wenym1
Copy link
Contributor Author

wenym1 commented Apr 13, 2022

while Keyspace is only for read for the time being

The design of Keyspace is for both reading & writing. We extracted the WriteBatch just to make the caller easier and you can find a reference of Keyspace inside as well.

My point is that since it never changes from the view of executor, there's no need to pass it everywhere. We should minimize the impact of codebase as much as possible.

I think I do things like the following.

The signature for Keyspace initialization is unchanged, and then the default GLOBAL_STORAGE_TABLE_ID will be assigned to an extra field call table_id in Keyspace during initialization. And then we will not expose the state_store in Keyspace and instead we provide a method call start_write_batch() in Keyspace. Though code with keyspace.state_store().start_write_batch will still have diff, the GLOBAL_STORAGE_TABLE_ID is not passed everywhere

@skyzh
Copy link
Contributor

skyzh commented Apr 13, 2022

I think I do things like the following.

The signature for Keyspace initialization is unchanged, and then the default GLOBAL_STORAGE_TABLE_ID will be assigned to an extra field call table_id in Keyspace during initialization. And then we will not expose the state_store in Keyspace and instead we provide a method call start_write_batch() in Keyspace. Though code with keyspace.state_store().start_write_batch will still have diff, the GLOBAL_STORAGE_TABLE_ID is not passed everywhere

I would +1 for that.

Also I think there's one problem before we can actually working on partial checkpoint. How do you plan to update table_id (or checkpoint_group_id) across the graph, when there's scale-in or scale-out?

@skyzh
Copy link
Contributor

skyzh commented Apr 13, 2022

Also, how can we know a checkpoint group's data are updated by all required executors for a given epoch?

@skyzh
Copy link
Contributor

skyzh commented Apr 13, 2022

... furthermore, why do we even need table_id or checkpoint_group_id on the compute-node side? Meta knows mapping between fragment id to each actor, and meta can simply pass "prefix to commit" to the shared buffer manager when calling sync.

@skyzh
Copy link
Contributor

skyzh commented Apr 13, 2022

Time to go off work 😇 let's come back tomorrow

@wenym1
Copy link
Contributor Author

wenym1 commented Apr 13, 2022

Also we can have write_batch and write_batch_with_group_id. Even having the group_id in write_batch struct sounds okay to me.

My first thought was also to pass group_id (the id of a checkpoint group for partial checkpoint) to write_batch. But I had a discussion with @hzxa21 , and we agreed on that, instead of passing group_id, we pass a finer-grained table_id. And then in hummock storage layer, we don't track anything about the checkpoint group, and we only track the MCE of each storage table. A checkpoint group contains the storage table of a mview, and this information is tracked by barrier manager, and the epoch alignment of all storage tables of mview is controlled by barrier manager.

Another benefit for passing a finer-grained table_id is that in the future if we want to implement vertical grouping and shared arrangement, we can make use of the table_id directly when necessary.

@CLAassistant
Copy link

CLAassistant commented Apr 13, 2022

CLA assistant check
All committers have signed the CLA.

@wenym1
Copy link
Contributor Author

wenym1 commented Apr 13, 2022

I think I do things like the following.
The signature for Keyspace initialization is unchanged, and then the default GLOBAL_STORAGE_TABLE_ID will be assigned to an extra field call table_id in Keyspace during initialization. And then we will not expose the state_store in Keyspace and instead we provide a method call start_write_batch() in Keyspace. Though code with keyspace.state_store().start_write_batch will still have diff, the GLOBAL_STORAGE_TABLE_ID is not passed everywhere

I would +1 for that.

Also I think there's one problem before we can actually working on partial checkpoint. How do you plan to update table_id (or checkpoint_group_id) across the graph, when there's scale-in or scale-out?

I think scaling in/out is in physical execution layer and this table_id is for operator, a logical concept? If we just add more actor for a fragment when scaling out, as long as the logical graph is unchanged, the table_id is unchanged.

@wenym1
Copy link
Contributor Author

wenym1 commented Apr 13, 2022

... furthermore, why do we even need table_id or checkpoint_group_id on the compute-node side? Meta knows mapping between fragment id to each actor, and meta can simply pass "prefix to commit" to the shared buffer manager when calling sync.

I think when we have relational table in the future, there will be a table_id stored on the cn-side?

@wenym1
Copy link
Contributor Author

wenym1 commented Apr 13, 2022

Time to go off work 😇 let's come back tomorrow

Off off. Stop juan.

@BugenZhao
Copy link
Member

while Keyspace is only for read for the time being

The design of Keyspace is for both reading & writing. We extracted the WriteBatch just to make the caller easier and you can find a reference of Keyspace inside as well.
My point is that since it never changes from the view of executor, there's no need to pass it everywhere. We should minimize the impact of codebase as much as possible.

I think I do things like the following.

The signature for Keyspace initialization is unchanged, and then the default GLOBAL_STORAGE_TABLE_ID will be assigned to an extra field call table_id in Keyspace during initialization. And then we will not expose the state_store in Keyspace and instead we provide a method call start_write_batch() in Keyspace. Though code with keyspace.state_store().start_write_batch will still have diff, the GLOBAL_STORAGE_TABLE_ID is not passed everywhere

+1 for this. :)

@skyzh
Copy link
Contributor

skyzh commented Apr 14, 2022

... furthermore, why do we even need table_id or checkpoint_group_id on the compute-node side? Meta knows mapping between fragment id to each actor, and meta can simply pass "prefix to commit" to the shared buffer manager when calling sync.

So what about this comment? Currently, all write batch only contain data from one executor state. The first 5 bytes of keys in all write batch will always be the same. (We can add a sanity check for this). Therefore, we can store shared buffer memtables grouped by such prefix, and when sync is called from the meta side, we specify "prefix to sync".

src/storage/src/store.rs Outdated Show resolved Hide resolved
@skyzh
Copy link
Contributor

skyzh commented Apr 14, 2022

The checkpoint group -> state id mapping can be stored on the meta side. I'd prefer not to make too much changes and add too many fancy functionalities on compute-node side interfaces.

@wenym1 wenym1 closed this Apr 14, 2022
@wenym1 wenym1 deleted the yiming/support-partial-checkpoint branch November 29, 2022 05:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants