diff --git a/Cargo.lock b/Cargo.lock index 92d72052d1db6..1cce157fa2f39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5994,6 +5994,7 @@ dependencies = [ "prost 0.11.2", "rand 0.8.5", "regex", + "risingwave_backup", "risingwave_common", "risingwave_common_service", "risingwave_hummock_sdk", diff --git a/dashboard/proto/gen/backup_service.ts b/dashboard/proto/gen/backup_service.ts index 107bb48773ef8..a30c09c2ae231 100644 --- a/dashboard/proto/gen/backup_service.ts +++ b/dashboard/proto/gen/backup_service.ts @@ -55,6 +55,10 @@ export function backupJobStatusToJSON(object: BackupJobStatus): string { } } +export interface MetaBackupManifestId { + id: number; +} + export interface BackupMetaRequest { } @@ -78,6 +82,47 @@ export interface DeleteMetaSnapshotRequest { export interface DeleteMetaSnapshotResponse { } +export interface GetMetaSnapshotManifestRequest { +} + +export interface GetMetaSnapshotManifestResponse { + manifest: MetaSnapshotManifest | undefined; +} + +export interface MetaSnapshotManifest { + manifestId: number; + snapshotMetadata: MetaSnapshotMetadata[]; +} + +export interface MetaSnapshotMetadata { + id: number; + hummockVersionId: number; + maxCommittedEpoch: number; + safeEpoch: number; +} + +function createBaseMetaBackupManifestId(): MetaBackupManifestId { + return { id: 0 }; +} + +export const MetaBackupManifestId = { + fromJSON(object: any): MetaBackupManifestId { + return { id: isSet(object.id) ? Number(object.id) : 0 }; + }, + + toJSON(message: MetaBackupManifestId): unknown { + const obj: any = {}; + message.id !== undefined && (obj.id = Math.round(message.id)); + return obj; + }, + + fromPartial, I>>(object: I): MetaBackupManifestId { + const message = createBaseMetaBackupManifestId(); + message.id = object.id ?? 0; + return message; + }, +}; + function createBaseBackupMetaRequest(): BackupMetaRequest { return {}; } @@ -215,6 +260,119 @@ export const DeleteMetaSnapshotResponse = { }, }; +function createBaseGetMetaSnapshotManifestRequest(): GetMetaSnapshotManifestRequest { + return {}; +} + +export const GetMetaSnapshotManifestRequest = { + fromJSON(_: any): GetMetaSnapshotManifestRequest { + return {}; + }, + + toJSON(_: GetMetaSnapshotManifestRequest): unknown { + const obj: any = {}; + return obj; + }, + + fromPartial, I>>(_: I): GetMetaSnapshotManifestRequest { + const message = createBaseGetMetaSnapshotManifestRequest(); + return message; + }, +}; + +function createBaseGetMetaSnapshotManifestResponse(): GetMetaSnapshotManifestResponse { + return { manifest: undefined }; +} + +export const GetMetaSnapshotManifestResponse = { + fromJSON(object: any): GetMetaSnapshotManifestResponse { + return { manifest: isSet(object.manifest) ? MetaSnapshotManifest.fromJSON(object.manifest) : undefined }; + }, + + toJSON(message: GetMetaSnapshotManifestResponse): unknown { + const obj: any = {}; + message.manifest !== undefined && + (obj.manifest = message.manifest ? MetaSnapshotManifest.toJSON(message.manifest) : undefined); + return obj; + }, + + fromPartial, I>>( + object: I, + ): GetMetaSnapshotManifestResponse { + const message = createBaseGetMetaSnapshotManifestResponse(); + message.manifest = (object.manifest !== undefined && object.manifest !== null) + ? MetaSnapshotManifest.fromPartial(object.manifest) + : undefined; + return message; + }, +}; + +function createBaseMetaSnapshotManifest(): MetaSnapshotManifest { + return { manifestId: 0, snapshotMetadata: [] }; +} + +export const MetaSnapshotManifest = { + fromJSON(object: any): MetaSnapshotManifest { + return { + manifestId: isSet(object.manifestId) ? Number(object.manifestId) : 0, + snapshotMetadata: Array.isArray(object?.snapshotMetadata) + ? object.snapshotMetadata.map((e: any) => MetaSnapshotMetadata.fromJSON(e)) + : [], + }; + }, + + toJSON(message: MetaSnapshotManifest): unknown { + const obj: any = {}; + message.manifestId !== undefined && (obj.manifestId = Math.round(message.manifestId)); + if (message.snapshotMetadata) { + obj.snapshotMetadata = message.snapshotMetadata.map((e) => e ? MetaSnapshotMetadata.toJSON(e) : undefined); + } else { + obj.snapshotMetadata = []; + } + return obj; + }, + + fromPartial, I>>(object: I): MetaSnapshotManifest { + const message = createBaseMetaSnapshotManifest(); + message.manifestId = object.manifestId ?? 0; + message.snapshotMetadata = object.snapshotMetadata?.map((e) => MetaSnapshotMetadata.fromPartial(e)) || []; + return message; + }, +}; + +function createBaseMetaSnapshotMetadata(): MetaSnapshotMetadata { + return { id: 0, hummockVersionId: 0, maxCommittedEpoch: 0, safeEpoch: 0 }; +} + +export const MetaSnapshotMetadata = { + fromJSON(object: any): MetaSnapshotMetadata { + return { + id: isSet(object.id) ? Number(object.id) : 0, + hummockVersionId: isSet(object.hummockVersionId) ? Number(object.hummockVersionId) : 0, + maxCommittedEpoch: isSet(object.maxCommittedEpoch) ? Number(object.maxCommittedEpoch) : 0, + safeEpoch: isSet(object.safeEpoch) ? Number(object.safeEpoch) : 0, + }; + }, + + toJSON(message: MetaSnapshotMetadata): unknown { + const obj: any = {}; + message.id !== undefined && (obj.id = Math.round(message.id)); + message.hummockVersionId !== undefined && (obj.hummockVersionId = Math.round(message.hummockVersionId)); + message.maxCommittedEpoch !== undefined && (obj.maxCommittedEpoch = Math.round(message.maxCommittedEpoch)); + message.safeEpoch !== undefined && (obj.safeEpoch = Math.round(message.safeEpoch)); + return obj; + }, + + fromPartial, I>>(object: I): MetaSnapshotMetadata { + const message = createBaseMetaSnapshotMetadata(); + message.id = object.id ?? 0; + message.hummockVersionId = object.hummockVersionId ?? 0; + message.maxCommittedEpoch = object.maxCommittedEpoch ?? 0; + message.safeEpoch = object.safeEpoch ?? 0; + return message; + }, +}; + type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined; export type DeepPartial = T extends Builtin ? T diff --git a/dashboard/proto/gen/batch_plan.ts b/dashboard/proto/gen/batch_plan.ts index 56436c2b71b6d..3f8a4e3413929 100644 --- a/dashboard/proto/gen/batch_plan.ts +++ b/dashboard/proto/gen/batch_plan.ts @@ -1,6 +1,6 @@ /* eslint-disable */ import { SourceInfo } from "./catalog"; -import { Buffer, HostAddress, WorkerNode } from "./common"; +import { BatchQueryEpoch, Buffer, HostAddress, WorkerNode } from "./common"; import { IntervalUnit } from "./data"; import { AggCall, ExprNode, InputRefExpr, ProjectSetSelectItem, TableFunction } from "./expr"; import { @@ -221,7 +221,7 @@ export interface TaskOutputId { export interface LocalExecutePlan { plan: PlanFragment | undefined; - epoch: number; + epoch: BatchQueryEpoch | undefined; } /** ExchangeSource describes where to read results from children operators */ @@ -1431,21 +1431,21 @@ export const TaskOutputId = { }; function createBaseLocalExecutePlan(): LocalExecutePlan { - return { plan: undefined, epoch: 0 }; + return { plan: undefined, epoch: undefined }; } export const LocalExecutePlan = { fromJSON(object: any): LocalExecutePlan { return { plan: isSet(object.plan) ? PlanFragment.fromJSON(object.plan) : undefined, - epoch: isSet(object.epoch) ? Number(object.epoch) : 0, + epoch: isSet(object.epoch) ? BatchQueryEpoch.fromJSON(object.epoch) : undefined, }; }, toJSON(message: LocalExecutePlan): unknown { const obj: any = {}; message.plan !== undefined && (obj.plan = message.plan ? PlanFragment.toJSON(message.plan) : undefined); - message.epoch !== undefined && (obj.epoch = Math.round(message.epoch)); + message.epoch !== undefined && (obj.epoch = message.epoch ? BatchQueryEpoch.toJSON(message.epoch) : undefined); return obj; }, @@ -1454,7 +1454,9 @@ export const LocalExecutePlan = { message.plan = (object.plan !== undefined && object.plan !== null) ? PlanFragment.fromPartial(object.plan) : undefined; - message.epoch = object.epoch ?? 0; + message.epoch = (object.epoch !== undefined && object.epoch !== null) + ? BatchQueryEpoch.fromPartial(object.epoch) + : undefined; return message; }, }; diff --git a/dashboard/proto/gen/common.ts b/dashboard/proto/gen/common.ts index c4d081800507d..1042efe39a81a 100644 --- a/dashboard/proto/gen/common.ts +++ b/dashboard/proto/gen/common.ts @@ -213,6 +213,13 @@ export interface ParallelUnitMapping { data: number[]; } +export interface BatchQueryEpoch { + epoch?: { $case: "committed"; committed: number } | { $case: "current"; current: number } | { + $case: "backup"; + backup: number; + }; +} + function createBaseStatus(): Status { return { code: Status_Code.UNSPECIFIED, message: "" }; } @@ -438,6 +445,48 @@ export const ParallelUnitMapping = { }, }; +function createBaseBatchQueryEpoch(): BatchQueryEpoch { + return { epoch: undefined }; +} + +export const BatchQueryEpoch = { + fromJSON(object: any): BatchQueryEpoch { + return { + epoch: isSet(object.committed) + ? { $case: "committed", committed: Number(object.committed) } + : isSet(object.current) + ? { $case: "current", current: Number(object.current) } + : isSet(object.backup) + ? { $case: "backup", backup: Number(object.backup) } + : undefined, + }; + }, + + toJSON(message: BatchQueryEpoch): unknown { + const obj: any = {}; + message.epoch?.$case === "committed" && (obj.committed = Math.round(message.epoch?.committed)); + message.epoch?.$case === "current" && (obj.current = Math.round(message.epoch?.current)); + message.epoch?.$case === "backup" && (obj.backup = Math.round(message.epoch?.backup)); + return obj; + }, + + fromPartial, I>>(object: I): BatchQueryEpoch { + const message = createBaseBatchQueryEpoch(); + if ( + object.epoch?.$case === "committed" && object.epoch?.committed !== undefined && object.epoch?.committed !== null + ) { + message.epoch = { $case: "committed", committed: object.epoch.committed }; + } + if (object.epoch?.$case === "current" && object.epoch?.current !== undefined && object.epoch?.current !== null) { + message.epoch = { $case: "current", current: object.epoch.current }; + } + if (object.epoch?.$case === "backup" && object.epoch?.backup !== undefined && object.epoch?.backup !== null) { + message.epoch = { $case: "backup", backup: object.epoch.backup }; + } + return message; + }, +}; + declare var self: any | undefined; declare var window: any | undefined; declare var global: any | undefined; diff --git a/dashboard/proto/gen/meta.ts b/dashboard/proto/gen/meta.ts index 0fc8b56c042a3..bb585a3e5dd68 100644 --- a/dashboard/proto/gen/meta.ts +++ b/dashboard/proto/gen/meta.ts @@ -1,4 +1,5 @@ /* eslint-disable */ +import { MetaBackupManifestId } from "./backup_service"; import { Database, Index, Schema, Sink, Source, Table, View } from "./catalog"; import { HostAddress, @@ -379,6 +380,7 @@ export interface MetaSnapshot { hummockSnapshot: HummockSnapshot | undefined; hummockVersion: HummockVersion | undefined; version: MetaSnapshot_SnapshotVersion | undefined; + metaBackupManifestId: MetaBackupManifestId | undefined; } export interface MetaSnapshot_SnapshotVersion { @@ -404,7 +406,8 @@ export interface SubscribeResponse { | { $case: "node"; node: WorkerNode } | { $case: "hummockSnapshot"; hummockSnapshot: HummockSnapshot } | { $case: "hummockVersionDeltas"; hummockVersionDeltas: HummockVersionDeltas } - | { $case: "snapshot"; snapshot: MetaSnapshot }; + | { $case: "snapshot"; snapshot: MetaSnapshot } + | { $case: "metaBackupManifestId"; metaBackupManifestId: MetaBackupManifestId }; } export const SubscribeResponse_Operation = { @@ -1462,6 +1465,7 @@ function createBaseMetaSnapshot(): MetaSnapshot { hummockSnapshot: undefined, hummockVersion: undefined, version: undefined, + metaBackupManifestId: undefined, }; } @@ -1485,6 +1489,9 @@ export const MetaSnapshot = { hummockSnapshot: isSet(object.hummockSnapshot) ? HummockSnapshot.fromJSON(object.hummockSnapshot) : undefined, hummockVersion: isSet(object.hummockVersion) ? HummockVersion.fromJSON(object.hummockVersion) : undefined, version: isSet(object.version) ? MetaSnapshot_SnapshotVersion.fromJSON(object.version) : undefined, + metaBackupManifestId: isSet(object.metaBackupManifestId) + ? MetaBackupManifestId.fromJSON(object.metaBackupManifestId) + : undefined, }; }, @@ -1546,6 +1553,9 @@ export const MetaSnapshot = { (obj.hummockVersion = message.hummockVersion ? HummockVersion.toJSON(message.hummockVersion) : undefined); message.version !== undefined && (obj.version = message.version ? MetaSnapshot_SnapshotVersion.toJSON(message.version) : undefined); + message.metaBackupManifestId !== undefined && (obj.metaBackupManifestId = message.metaBackupManifestId + ? MetaBackupManifestId.toJSON(message.metaBackupManifestId) + : undefined); return obj; }, @@ -1570,6 +1580,9 @@ export const MetaSnapshot = { message.version = (object.version !== undefined && object.version !== null) ? MetaSnapshot_SnapshotVersion.fromPartial(object.version) : undefined; + message.metaBackupManifestId = (object.metaBackupManifestId !== undefined && object.metaBackupManifestId !== null) + ? MetaBackupManifestId.fromPartial(object.metaBackupManifestId) + : undefined; return message; }, }; @@ -1651,6 +1664,11 @@ export const SubscribeResponse = { } : isSet(object.snapshot) ? { $case: "snapshot", snapshot: MetaSnapshot.fromJSON(object.snapshot) } + : isSet(object.metaBackupManifestId) + ? { + $case: "metaBackupManifestId", + metaBackupManifestId: MetaBackupManifestId.fromJSON(object.metaBackupManifestId), + } : undefined, }; }, @@ -1686,6 +1704,9 @@ export const SubscribeResponse = { : undefined); message.info?.$case === "snapshot" && (obj.snapshot = message.info?.snapshot ? MetaSnapshot.toJSON(message.info?.snapshot) : undefined); + message.info?.$case === "metaBackupManifestId" && (obj.metaBackupManifestId = message.info?.metaBackupManifestId + ? MetaBackupManifestId.toJSON(message.info?.metaBackupManifestId) + : undefined); return obj; }, @@ -1756,6 +1777,16 @@ export const SubscribeResponse = { if (object.info?.$case === "snapshot" && object.info?.snapshot !== undefined && object.info?.snapshot !== null) { message.info = { $case: "snapshot", snapshot: MetaSnapshot.fromPartial(object.info.snapshot) }; } + if ( + object.info?.$case === "metaBackupManifestId" && + object.info?.metaBackupManifestId !== undefined && + object.info?.metaBackupManifestId !== null + ) { + message.info = { + $case: "metaBackupManifestId", + metaBackupManifestId: MetaBackupManifestId.fromPartial(object.info.metaBackupManifestId), + }; + } return message; }, }; diff --git a/dashboard/proto/gen/task_service.ts b/dashboard/proto/gen/task_service.ts index 70d84a1d90b62..8446c702d48b5 100644 --- a/dashboard/proto/gen/task_service.ts +++ b/dashboard/proto/gen/task_service.ts @@ -1,6 +1,6 @@ /* eslint-disable */ import { PlanFragment, TaskId as TaskId1, TaskOutputId } from "./batch_plan"; -import { Status } from "./common"; +import { BatchQueryEpoch, Status } from "./common"; import { DataChunk } from "./data"; import { StreamMessage } from "./stream_plan"; @@ -81,7 +81,7 @@ export function taskInfo_TaskStatusToJSON(object: TaskInfo_TaskStatus): string { export interface CreateTaskRequest { taskId: TaskId1 | undefined; plan: PlanFragment | undefined; - epoch: number; + epoch: BatchQueryEpoch | undefined; } export interface AbortTaskRequest { @@ -109,7 +109,7 @@ export interface GetDataResponse { export interface ExecuteRequest { taskId: TaskId1 | undefined; plan: PlanFragment | undefined; - epoch: number; + epoch: BatchQueryEpoch | undefined; } export interface GetDataRequest { @@ -206,7 +206,7 @@ export const TaskInfo = { }; function createBaseCreateTaskRequest(): CreateTaskRequest { - return { taskId: undefined, plan: undefined, epoch: 0 }; + return { taskId: undefined, plan: undefined, epoch: undefined }; } export const CreateTaskRequest = { @@ -214,7 +214,7 @@ export const CreateTaskRequest = { return { taskId: isSet(object.taskId) ? TaskId1.fromJSON(object.taskId) : undefined, plan: isSet(object.plan) ? PlanFragment.fromJSON(object.plan) : undefined, - epoch: isSet(object.epoch) ? Number(object.epoch) : 0, + epoch: isSet(object.epoch) ? BatchQueryEpoch.fromJSON(object.epoch) : undefined, }; }, @@ -222,7 +222,7 @@ export const CreateTaskRequest = { const obj: any = {}; message.taskId !== undefined && (obj.taskId = message.taskId ? TaskId1.toJSON(message.taskId) : undefined); message.plan !== undefined && (obj.plan = message.plan ? PlanFragment.toJSON(message.plan) : undefined); - message.epoch !== undefined && (obj.epoch = Math.round(message.epoch)); + message.epoch !== undefined && (obj.epoch = message.epoch ? BatchQueryEpoch.toJSON(message.epoch) : undefined); return obj; }, @@ -234,7 +234,9 @@ export const CreateTaskRequest = { message.plan = (object.plan !== undefined && object.plan !== null) ? PlanFragment.fromPartial(object.plan) : undefined; - message.epoch = object.epoch ?? 0; + message.epoch = (object.epoch !== undefined && object.epoch !== null) + ? BatchQueryEpoch.fromPartial(object.epoch) + : undefined; return message; }, }; @@ -375,7 +377,7 @@ export const GetDataResponse = { }; function createBaseExecuteRequest(): ExecuteRequest { - return { taskId: undefined, plan: undefined, epoch: 0 }; + return { taskId: undefined, plan: undefined, epoch: undefined }; } export const ExecuteRequest = { @@ -383,7 +385,7 @@ export const ExecuteRequest = { return { taskId: isSet(object.taskId) ? TaskId1.fromJSON(object.taskId) : undefined, plan: isSet(object.plan) ? PlanFragment.fromJSON(object.plan) : undefined, - epoch: isSet(object.epoch) ? Number(object.epoch) : 0, + epoch: isSet(object.epoch) ? BatchQueryEpoch.fromJSON(object.epoch) : undefined, }; }, @@ -391,7 +393,7 @@ export const ExecuteRequest = { const obj: any = {}; message.taskId !== undefined && (obj.taskId = message.taskId ? TaskId1.toJSON(message.taskId) : undefined); message.plan !== undefined && (obj.plan = message.plan ? PlanFragment.toJSON(message.plan) : undefined); - message.epoch !== undefined && (obj.epoch = Math.round(message.epoch)); + message.epoch !== undefined && (obj.epoch = message.epoch ? BatchQueryEpoch.toJSON(message.epoch) : undefined); return obj; }, @@ -403,7 +405,9 @@ export const ExecuteRequest = { message.plan = (object.plan !== undefined && object.plan !== null) ? PlanFragment.fromPartial(object.plan) : undefined; - message.epoch = object.epoch ?? 0; + message.epoch = (object.epoch !== undefined && object.epoch !== null) + ? BatchQueryEpoch.fromPartial(object.epoch) + : undefined; return message; }, }; diff --git a/e2e_test/batch/catalog/pg_namespace.slt.part b/e2e_test/batch/catalog/pg_namespace.slt.part index 6396ec03214c6..3ea9fd4141484 100644 --- a/e2e_test/batch/catalog/pg_namespace.slt.part +++ b/e2e_test/batch/catalog/pg_namespace.slt.part @@ -4,3 +4,4 @@ SELECT nspname FROM pg_catalog.pg_namespace; information_schema pg_catalog public +rw_catalog \ No newline at end of file diff --git a/e2e_test/database/test.slt b/e2e_test/database/test.slt index d177d4e419c6f..c851104925e15 100644 --- a/e2e_test/database/test.slt +++ b/e2e_test/database/test.slt @@ -6,6 +6,7 @@ show schemas; information_schema pg_catalog public +rw_catalog statement ok create table ddl_t (v1 int); diff --git a/e2e_test/ddl/show.slt b/e2e_test/ddl/show.slt index 0419746d60cf9..bbc0f45bd5560 100644 --- a/e2e_test/ddl/show.slt +++ b/e2e_test/ddl/show.slt @@ -43,6 +43,7 @@ show schemas; information_schema pg_catalog public +rw_catalog query T show tables; diff --git a/proto/backup_service.proto b/proto/backup_service.proto index c5fd3ec01cc78..4f33769016858 100644 --- a/proto/backup_service.proto +++ b/proto/backup_service.proto @@ -4,6 +4,10 @@ package backup_service; option optimize_for = SPEED; +message MetaBackupManifestId { + uint64 id = 1; +} + enum BackupJobStatus { UNSPECIFIED = 0; RUNNING = 1; @@ -29,9 +33,24 @@ message DeleteMetaSnapshotRequest { repeated uint64 snapshot_ids = 1; } message DeleteMetaSnapshotResponse {} +message GetMetaSnapshotManifestRequest {} +message GetMetaSnapshotManifestResponse { + MetaSnapshotManifest manifest = 1; +} +message MetaSnapshotManifest { + uint64 manifest_id = 1; + repeated MetaSnapshotMetadata snapshot_metadata = 2; +} +message MetaSnapshotMetadata { + uint64 id = 1; + uint64 hummock_version_id = 2; + uint64 max_committed_epoch = 3; + uint64 safe_epoch = 4; +} service BackupService { rpc BackupMeta(BackupMetaRequest) returns (BackupMetaResponse); rpc GetBackupJobStatus(GetBackupJobStatusRequest) returns (GetBackupJobStatusResponse); rpc DeleteMetaSnapshot(DeleteMetaSnapshotRequest) returns (DeleteMetaSnapshotResponse); + rpc GetMetaSnapshotManifest(GetMetaSnapshotManifestRequest) returns (GetMetaSnapshotManifestResponse); } diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 6852551150e42..8f7fc044a6436 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -191,7 +191,7 @@ message TaskOutputId { message LocalExecutePlan { batch_plan.PlanFragment plan = 1; - uint64 epoch = 2; + common.BatchQueryEpoch epoch = 2; } // ExchangeSource describes where to read results from children operators diff --git a/proto/common.proto b/proto/common.proto index 5cadbe01dce2d..9233241b071cd 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -66,3 +66,11 @@ message ParallelUnitMapping { repeated uint64 original_indices = 2; repeated uint32 data = 3; } + +message BatchQueryEpoch { + oneof epoch { + uint64 committed = 1; + uint64 current = 2; + uint64 backup = 3; + } +} diff --git a/proto/meta.proto b/proto/meta.proto index 067c2bd92b449..e8af12e17b4e4 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package meta; +import "backup_service.proto"; import "catalog.proto"; import "common.proto"; import "hummock.proto"; @@ -206,6 +207,7 @@ message MetaSnapshot { hummock.HummockVersion hummock_version = 12; SnapshotVersion version = 13; + backup_service.MetaBackupManifestId meta_backup_manifest_id = 14; } message SubscribeResponse { @@ -233,6 +235,7 @@ message SubscribeResponse { hummock.HummockSnapshot hummock_snapshot = 14; hummock.HummockVersionDeltas hummock_version_deltas = 15; MetaSnapshot snapshot = 16; + backup_service.MetaBackupManifestId meta_backup_manifest_id = 17; } } diff --git a/proto/task_service.proto b/proto/task_service.proto index 255aaa34a6a0e..ed3f6de5458d4 100644 --- a/proto/task_service.proto +++ b/proto/task_service.proto @@ -33,7 +33,7 @@ message TaskInfo { message CreateTaskRequest { batch_plan.TaskId task_id = 1; batch_plan.PlanFragment plan = 2; - uint64 epoch = 3; + common.BatchQueryEpoch epoch = 3; } message AbortTaskRequest { @@ -61,7 +61,7 @@ message GetDataResponse { message ExecuteRequest { batch_plan.TaskId task_id = 1; batch_plan.PlanFragment plan = 2; - uint64 epoch = 3; + common.BatchQueryEpoch epoch = 3; } service TaskService { diff --git a/src/batch/src/executor/insert.rs b/src/batch/src/executor/insert.rs index aee76a2281b47..49bd4ed4ceb18 100644 --- a/src/batch/src/executor/insert.rs +++ b/src/batch/src/executor/insert.rs @@ -312,6 +312,7 @@ mod tests { ignore_range_tombstone: false, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await?; diff --git a/src/batch/src/executor/join/distributed_lookup_join.rs b/src/batch/src/executor/join/distributed_lookup_join.rs index e3c73f9cbee61..d9709e3f3e94c 100644 --- a/src/batch/src/executor/join/distributed_lookup_join.rs +++ b/src/batch/src/executor/join/distributed_lookup_join.rs @@ -27,8 +27,8 @@ use risingwave_common::util::scan_range::ScanRange; use risingwave_common::util::sort_util::OrderType; use risingwave_expr::expr::expr_unary::new_unary_expr; use risingwave_expr::expr::{build_from_prost, BoxedExpression, LiteralExpression}; -use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::expr::expr_node::Type; use risingwave_pb::plan_common::OrderType as ProstOrderType; use risingwave_storage::table::batch_table::storage_table::StorageTable; @@ -314,7 +314,7 @@ struct InnerSideExecutorBuilder { outer_side_key_types: Vec, inner_side_key_types: Vec, lookup_prefix_len: usize, - epoch: u64, + epoch: BatchQueryEpoch, row_list: Vec, table: StorageTable, chunk_size: usize, @@ -325,7 +325,7 @@ impl InnerSideExecutorBuilder { outer_side_key_types: Vec, inner_side_key_types: Vec, lookup_prefix_len: usize, - epoch: u64, + epoch: BatchQueryEpoch, row_list: Vec, table: StorageTable, chunk_size: usize, @@ -385,7 +385,7 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder { if self.lookup_prefix_len == self.table.pk_indices().len() { let row = self .table - .get_row(&pk_prefix, HummockReadEpoch::Committed(self.epoch)) + .get_row(&pk_prefix, self.epoch.clone().into()) .await?; if let Some(row) = row { @@ -394,12 +394,7 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder { } else { let iter = self .table - .batch_iter_with_pk_bounds( - HummockReadEpoch::Committed(self.epoch), - &pk_prefix, - .., - false, - ) + .batch_iter_with_pk_bounds(self.epoch.clone().into(), &pk_prefix, .., false) .await?; pin_mut!(iter); diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index 2d711cc70ef27..27dc138479369 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -36,7 +36,7 @@ use risingwave_pb::batch_plan::{ ExchangeInfo, ExchangeNode, ExchangeSource as ProstExchangeSource, LocalExecutePlan, PlanFragment, PlanNode, RowSeqScanNode, TaskId as ProstTaskId, TaskOutputId, }; -use risingwave_pb::common::WorkerNode; +use risingwave_pb::common::{BatchQueryEpoch, WorkerNode}; use risingwave_pb::expr::expr_node::Type; use risingwave_pb::plan_common::StorageTableDesc; use uuid::Uuid; @@ -58,7 +58,7 @@ struct InnerSideExecutorBuilder { lookup_prefix_len: usize, context: C, task_id: TaskId, - epoch: u64, + epoch: BatchQueryEpoch, pu_to_worker_mapping: HashMap, pu_to_scan_range_mapping: HashMap>, chunk_size: usize, @@ -136,7 +136,7 @@ impl InnerSideExecutorBuilder { ..Default::default() }), }), - epoch: self.epoch, + epoch: Some(self.epoch.clone()), }; let prost_exchange_source = ProstExchangeSource { @@ -237,8 +237,12 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder let task_id = self.task_id.clone(); - let executor_builder = - ExecutorBuilder::new(&plan_node, &task_id, self.context.clone(), self.epoch); + let executor_builder = ExecutorBuilder::new( + &plan_node, + &task_id, + self.context.clone(), + self.epoch.clone(), + ); executor_builder.build().await } diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 9b0b8f38a2805..da35ad8e9d4a6 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -62,6 +62,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::PlanNode; +use risingwave_pb::common::BatchQueryEpoch; pub use row_seq_scan::*; pub use sort_agg::*; pub use source::*; @@ -120,7 +121,7 @@ pub struct ExecutorBuilder<'a, C> { pub plan_node: &'a PlanNode, pub task_id: &'a TaskId, context: C, - epoch: u64, + epoch: BatchQueryEpoch, } macro_rules! build_executor { @@ -136,7 +137,12 @@ macro_rules! build_executor { } impl<'a, C: Clone> ExecutorBuilder<'a, C> { - pub fn new(plan_node: &'a PlanNode, task_id: &'a TaskId, context: C, epoch: u64) -> Self { + pub fn new( + plan_node: &'a PlanNode, + task_id: &'a TaskId, + context: C, + epoch: BatchQueryEpoch, + ) -> Self { Self { plan_node, task_id, @@ -147,7 +153,12 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> { #[must_use] pub fn clone_for_plan(&self, plan_node: &'a PlanNode) -> Self { - ExecutorBuilder::new(plan_node, self.task_id, self.context.clone(), self.epoch) + ExecutorBuilder::new( + plan_node, + self.task_id, + self.context.clone(), + self.epoch.clone(), + ) } pub fn plan_node(&self) -> &PlanNode { @@ -158,8 +169,8 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> { &self.context } - pub fn epoch(&self) -> u64 { - self.epoch + pub fn epoch(&self) -> BatchQueryEpoch { + self.epoch.clone() } } @@ -218,7 +229,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> { #[cfg(test)] mod tests { - + use risingwave_hummock_sdk::to_committed_batch_query_epoch; use risingwave_pb::batch_plan::PlanNode; use crate::executor::ExecutorBuilder; @@ -238,7 +249,7 @@ mod tests { &plan_node, task_id, ComputeNodeContext::for_test(), - u64::MAX, + to_committed_batch_query_epoch(u64::MAX), ); let child_plan = &PlanNode { ..Default::default() diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 558d4194eb376..b580c99b01714 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -28,9 +28,9 @@ use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::select_all; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::deserialize_datum; -use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{scan_range, ScanRange as ProstScanRange}; +use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::plan_common::{OrderType as ProstOrderType, StorageTableDesc}; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::{Distribution, TableIter}; @@ -53,7 +53,7 @@ pub struct RowSeqScanExecutor { table: StorageTable, scan_ranges: Vec, - epoch: u64, + epoch: BatchQueryEpoch, } /// Range for batch scan. @@ -132,7 +132,7 @@ impl RowSeqScanExecutor { pub fn new( table: StorageTable, scan_ranges: Vec, - epoch: u64, + epoch: BatchQueryEpoch, chunk_size: usize, identity: String, metrics: Option, @@ -236,7 +236,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { } }; - let epoch = source.epoch; + let epoch = source.epoch.clone(); let chunk_size = source.context.get_config().developer.batch_chunk_size; let metrics = source.context().task_metrics(); @@ -291,7 +291,6 @@ impl RowSeqScanExecutor { scan_ranges, epoch, } = *self; - let table = Arc::new(table); // Create collector. @@ -317,7 +316,9 @@ impl RowSeqScanExecutor { for point_get in point_gets { let table = table.clone(); let histogram = histogram.clone(); - if let Some(row) = Self::execute_point_get(table, point_get, epoch, histogram).await? { + if let Some(row) = + Self::execute_point_get(table, point_get, epoch.clone(), histogram).await? + { if let Some(chunk) = data_chunk_builder.append_one_row(row) { yield chunk; } @@ -332,7 +333,11 @@ impl RowSeqScanExecutor { let table = table.clone(); let histogram = histogram.clone(); Box::pin(Self::execute_range( - table, range_scan, epoch, chunk_size, histogram, + table, + range_scan, + epoch.clone(), + chunk_size, + histogram, )) })); #[for_await] @@ -344,7 +349,7 @@ impl RowSeqScanExecutor { async fn execute_point_get( table: Arc>, scan_range: ScanRange, - epoch: u64, + epoch: BatchQueryEpoch, histogram: Option, ) -> Result> { let pk_prefix = scan_range.pk_prefix; @@ -353,9 +358,7 @@ impl RowSeqScanExecutor { let timer = histogram.as_ref().map(|histogram| histogram.start_timer()); // Point Get. - let row = table - .get_row(&pk_prefix, HummockReadEpoch::Committed(epoch)) - .await?; + let row = table.get_row(&pk_prefix, epoch.into()).await?; if let Some(timer) = timer { timer.observe_duration() @@ -368,7 +371,7 @@ impl RowSeqScanExecutor { async fn execute_range( table: Arc>, scan_range: ScanRange, - epoch: u64, + epoch: BatchQueryEpoch, chunk_size: usize, histogram: Option, ) { @@ -381,7 +384,7 @@ impl RowSeqScanExecutor { assert!(pk_prefix.len() < table.pk_indices().len()); let iter = table .batch_iter_with_pk_bounds( - HummockReadEpoch::Committed(epoch), + epoch.into(), &pk_prefix, ( next_col_bounds.0.map(|x| OwnedRow::new(vec![x])), diff --git a/src/batch/src/rpc/service/task_service.rs b/src/batch/src/rpc/service/task_service.rs index 281d636bf3bd1..d24737d36d1a2 100644 --- a/src/batch/src/rpc/service/task_service.rs +++ b/src/batch/src/rpc/service/task_service.rs @@ -64,7 +64,7 @@ impl TaskService for BatchServiceImpl { .fire_task( task_id.as_ref().expect("no task id found"), plan.expect("no plan found").clone(), - epoch, + epoch.expect("no epoch found"), ComputeNodeContext::new( self.env.clone(), TaskId::from(task_id.as_ref().expect("no task id found")), @@ -111,6 +111,7 @@ impl TaskService for BatchServiceImpl { } = req.into_inner(); let task_id = task_id.expect("no task id found"); let plan = plan.expect("no plan found").clone(); + let epoch = epoch.expect("no epoch found"); let context = ComputeNodeContext::new_for_local(self.env.clone()); trace!( "local execute request: plan:{:?} with task id:{:?}", diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index c1ea463c3d47c..0d33db551c48c 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -24,6 +24,7 @@ use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_pb::batch_plan::{ PlanFragment, TaskId as ProstTaskId, TaskOutputId as ProstOutputId, }; +use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::task_service::task_info::TaskStatus; use risingwave_pb::task_service::{GetDataResponse, TaskInfo, TaskInfoResponse}; use tokio::runtime::Runtime; @@ -216,7 +217,7 @@ pub struct BatchTaskExecution { /// This is a hack, cuz there is no easy way to get out the receiver. state_rx: Mutex>>, - epoch: u64, + epoch: BatchQueryEpoch, /// Runtime for the batch tasks. runtime: &'static Runtime, @@ -227,7 +228,7 @@ impl BatchTaskExecution { prost_tid: &ProstTaskId, plan: PlanFragment, context: C, - epoch: u64, + epoch: BatchQueryEpoch, runtime: &'static Runtime, ) -> Result { let task_id = TaskId::from(prost_tid); @@ -266,7 +267,7 @@ impl BatchTaskExecution { self.plan.root.as_ref().unwrap(), &self.task_id, self.context.clone(), - self.epoch, + self.epoch.clone(), ) .build() .await?; diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index 9eff31ab0bfbc..b10a15a286daf 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -23,6 +23,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::{ PlanFragment, TaskId as ProstTaskId, TaskOutputId as ProstTaskOutputId, }; +use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::task_service::GetDataResponse; use tokio::runtime::Runtime; use tokio::sync::mpsc::Sender; @@ -72,7 +73,7 @@ impl BatchManager { &self, tid: &ProstTaskId, plan: PlanFragment, - epoch: u64, + epoch: BatchQueryEpoch, context: ComputeNodeContext, ) -> Result<()> { trace!("Received task id: {:?}, plan: {:?}", tid, plan); @@ -214,6 +215,7 @@ mod tests { use risingwave_common::config::BatchConfig; use risingwave_common::types::DataType; use risingwave_expr::expr::make_i32_literal; + use risingwave_hummock_sdk::to_committed_batch_query_epoch; use risingwave_pb::batch_plan::exchange_info::DistributionMode; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{ @@ -279,11 +281,16 @@ mod tests { task_id: 0, }; manager - .fire_task(&task_id, plan.clone(), 0, context.clone()) + .fire_task( + &task_id, + plan.clone(), + to_committed_batch_query_epoch(0), + context.clone(), + ) .await .unwrap(); let err = manager - .fire_task(&task_id, plan, 0, context) + .fire_task(&task_id, plan, to_committed_batch_query_epoch(0), context) .await .unwrap_err(); assert!(err @@ -324,7 +331,12 @@ mod tests { task_id: 0, }; manager - .fire_task(&task_id, plan.clone(), 0, context.clone()) + .fire_task( + &task_id, + plan.clone(), + to_committed_batch_query_epoch(0), + context.clone(), + ) .await .unwrap(); manager.abort_task(&task_id); diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index dcf7129af8506..c0d3dbe0496c8 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -130,6 +130,7 @@ where version_delta.version_deltas[0].id > info.hummock_version.as_ref().unwrap().id } Info::HummockSnapshot(_) => true, + Info::MetaBackupManifestId(_) => true, Info::Snapshot(_) => unreachable!(), }); diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 5ba4653f44cbc..4a8faab664f56 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -36,6 +36,7 @@ pub const DEFAULT_DATABASE_NAME: &str = "dev"; pub const DEFAULT_SCHEMA_NAME: &str = "public"; pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog"; pub const INFORMATION_SCHEMA_SCHEMA_NAME: &str = "information_schema"; +pub const RW_CATALOG_SCHEMA_NAME: &str = "rw_catalog"; pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_"; pub const DEFAULT_SUPER_USER: &str = "root"; pub const DEFAULT_SUPER_USER_ID: u32 = 1; @@ -46,6 +47,12 @@ pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2; pub const NON_RESERVED_USER_ID: i32 = 11; pub const NON_RESERVED_PG_CATALOG_TABLE_ID: i32 = 1001; +pub const SYSTEM_SCHEMAS: [&str; 3] = [ + PG_CATALOG_SCHEMA_NAME, + INFORMATION_SCHEMA_SCHEMA_NAME, + RW_CATALOG_SCHEMA_NAME, +]; + /// The local system catalog reader in the frontend node. #[async_trait] pub trait SysCatalogReader: Sync + Send + 'static { diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index de76c082f9095..6b957cdef18c1 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -24,10 +24,11 @@ pub use search_path::{SearchPath, USER_NAME_WILD_CARD}; use crate::error::{ErrorCode, RwError}; use crate::session_config::transaction_isolation_level::IsolationLevel; +use crate::util::epoch::Epoch; // This is a hack, &'static str is not allowed as a const generics argument. // TODO: refine this using the adt_const_params feature. -const CONFIG_KEYS: [&str; 10] = [ +const CONFIG_KEYS: [&str; 11] = [ "RW_IMPLICIT_FLUSH", "CREATE_COMPACTION_GROUP_FOR_MV", "QUERY_MODE", @@ -38,6 +39,7 @@ const CONFIG_KEYS: [&str; 10] = [ "MAX_SPLIT_RANGE_GAP", "SEARCH_PATH", "TRANSACTION ISOLATION LEVEL", + "QUERY_EPOCH", ]; // MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] = @@ -52,6 +54,7 @@ const BATCH_ENABLE_LOOKUP_JOIN: usize = 6; const MAX_SPLIT_RANGE_GAP: usize = 7; const SEARCH_PATH: usize = 8; const TRANSACTION_ISOLATION_LEVEL: usize = 9; +const QUERY_EPOCH: usize = 10; trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> { fn entry_name() -> &'static str; @@ -184,6 +187,51 @@ impl TryFrom<&[&str]> for ConfigI32(u64); + +impl Default for ConfigU64 { + fn default() -> Self { + ConfigU64(DEFAULT) + } +} + +impl Deref for ConfigU64 { + type Target = u64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl ConfigEntry for ConfigU64 { + fn entry_name() -> &'static str { + CONFIG_KEYS[NAME] + } +} + +impl TryFrom<&[&str]> for ConfigU64 { + type Error = RwError; + + fn try_from(value: &[&str]) -> Result { + if value.len() != 1 { + return Err(ErrorCode::InternalError(format!( + "SET {} takes only one argument", + Self::entry_name() + )) + .into()); + } + + let s = value[0]; + s.parse::().map(ConfigU64).map_err(|_e| { + ErrorCode::InvalidConfigValue { + config_entry: Self::entry_name().to_string(), + config_value: s.to_string(), + } + .into() + }) + } +} + pub struct VariableInfo { pub name: String, pub setting: String, @@ -198,6 +246,7 @@ type ExtraFloatDigit = ConfigI32; type DateStyle = ConfigString; type BatchEnableLookupJoin = ConfigBool; type MaxSplitRangeGap = ConfigI32; +type QueryEpoch = ConfigU64; #[derive(Default)] pub struct ConfigMap { @@ -234,6 +283,9 @@ pub struct ConfigMap { /// see transaction_isolation_level: IsolationLevel, + + /// select as of specific epoch + query_epoch: QueryEpoch, } impl ConfigMap { @@ -257,6 +309,8 @@ impl ConfigMap { self.max_split_range_gap = val.as_slice().try_into()?; } else if key.eq_ignore_ascii_case(SearchPath::entry_name()) { self.search_path = val.as_slice().try_into()?; + } else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) { + self.query_epoch = val.as_slice().try_into()?; } else { return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()); } @@ -285,6 +339,8 @@ impl ConfigMap { Ok(self.search_path.to_string()) } else if key.eq_ignore_ascii_case(IsolationLevel::entry_name()) { Ok(self.transaction_isolation_level.to_string()) + } else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) { + Ok(self.query_epoch.to_string()) } else { Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()) } @@ -336,6 +392,11 @@ impl ConfigMap { name: SearchPath::entry_name().to_lowercase(), setting : self.search_path.to_string(), description : String::from("Sets the order in which schemas are searched when an object (table, data type, function, etc.) is referenced by a simple name with no schema specified") + }, + VariableInfo { + name: QueryEpoch::entry_name().to_lowercase(), + setting : self.query_epoch.to_string(), + description : String::from("Sets the historical epoch for querying data. If 0, querying latest data.") } ] } @@ -379,4 +440,11 @@ impl ConfigMap { pub fn get_search_path(&self) -> SearchPath { self.search_path.clone() } + + pub fn get_query_epoch(&self) -> Option { + if self.query_epoch.0 != 0 { + return Some((self.query_epoch.0).into()); + } + None + } } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index c96101605c18b..3cf7b26be28f3 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -116,7 +116,7 @@ pub async fn compute_node_serve( let state_store = StateStoreImpl::new( &opts.state_store, &opts.file_cache_dir, - storage_config.clone(), + &config, hummock_meta_client.clone(), state_store_metrics.clone(), object_store_metrics, diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 38b709238ee8b..ce86a6c981005 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -35,6 +35,7 @@ use risingwave_common::test_prelude::DataChunkTestExt; use risingwave_common::types::{DataType, IntoOrdered}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::sort_util::{OrderPair, OrderType}; +use risingwave_hummock_sdk::to_committed_batch_query_epoch; use risingwave_source::table_test_utils::create_table_source_desc_builder; use risingwave_source::{TableSourceManager, TableSourceManagerRef}; use risingwave_storage::memory::MemoryStateStore; @@ -220,7 +221,7 @@ async fn test_table_materialize() -> StreamResult<()> { let scan = Box::new(RowSeqScanExecutor::new( table.clone(), vec![ScanRange::full()], - u64::MAX, + to_committed_batch_query_epoch(u64::MAX), 1024, "RowSeqExecutor2".to_string(), None, @@ -282,7 +283,7 @@ async fn test_table_materialize() -> StreamResult<()> { let scan = Box::new(RowSeqScanExecutor::new( table.clone(), vec![ScanRange::full()], - u64::MAX, + to_committed_batch_query_epoch(u64::MAX), 1024, "RowSeqScanExecutor2".to_string(), None, @@ -354,7 +355,7 @@ async fn test_table_materialize() -> StreamResult<()> { let scan = Box::new(RowSeqScanExecutor::new( table, vec![ScanRange::full()], - u64::MAX, + to_committed_batch_query_epoch(u64::MAX), 1024, "RowSeqScanExecutor2".to_string(), None, @@ -421,7 +422,7 @@ async fn test_row_seq_scan() -> Result<()> { let executor = Box::new(RowSeqScanExecutor::new( table, vec![ScanRange::full()], - u64::MAX, + to_committed_batch_query_epoch(u64::MAX), 1, "RowSeqScanExecutor2".to_string(), None, diff --git a/src/ctl/src/cmd_impl/hummock/list_kv.rs b/src/ctl/src/cmd_impl/hummock/list_kv.rs index 4d289ec848b55..a93cff37b1d6e 100644 --- a/src/ctl/src/cmd_impl/hummock/list_kv.rs +++ b/src/ctl/src/cmd_impl/hummock/list_kv.rs @@ -38,6 +38,7 @@ pub async fn list_kv(epoch: u64, table_id: u32) -> anyhow::Result<()> { table_id: TableId { table_id }, retention_seconds: None, check_bloom_filter: false, + read_version_from_backup: false, }, ) .await? diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index 44d79ac522d63..12007216dfdca 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, bail, Result}; -use risingwave_common::config::StorageConfig; +use risingwave_common::config::{RwConfig, StorageConfig}; use risingwave_rpc_client::MetaClient; use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient; use risingwave_storage::hummock::{HummockStorage, TieredCacheMetricsBuilder}; @@ -100,6 +100,10 @@ For `./risedev apply-compose-deploy` users, share_buffer_compaction_worker_threads_number: 0, ..Default::default() }; + let rw_config = RwConfig { + storage: config.clone(), + ..Default::default() + }; tracing::info!("using Hummock config: {:#?}", config); @@ -112,7 +116,7 @@ For `./risedev apply-compose-deploy` users, let state_store_impl = StateStoreImpl::new( &self.hummock_url, "", - Arc::new(config), + &rw_config, Arc::new(MonitoredHummockMetaClient::new( meta_client.clone(), metrics.hummock_metrics.clone(), diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index 24ebb20997b50..b6bf86d272f0c 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -16,9 +16,7 @@ use std::ops::Deref; use std::sync::Arc; use itertools::Itertools; -use risingwave_common::catalog::{ - ColumnDesc, Field, INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME, -}; +use risingwave_common::catalog::{ColumnDesc, Field, SYSTEM_SCHEMAS}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_sqlparser::ast::{Statement, TableAlias}; @@ -78,7 +76,7 @@ impl Binder { alias: Option, ) -> Result { fn is_system_schema(schema_name: &str) -> bool { - schema_name == PG_CATALOG_SCHEMA_NAME || schema_name == INFORMATION_SCHEMA_SCHEMA_NAME + SYSTEM_SCHEMAS.iter().any(|s| *s == schema_name) } // define some helper functions converting catalog to bound relation diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 37477c06ff53e..1b15746ba7cde 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -14,6 +14,7 @@ pub mod information_schema; pub mod pg_catalog; +pub mod rw_catalog; use std::collections::HashMap; use std::sync::{Arc, LazyLock}; @@ -22,7 +23,7 @@ use async_trait::async_trait; use paste::paste; use risingwave_common::catalog::{ ColumnDesc, SysCatalogReader, TableDesc, TableId, DEFAULT_SUPER_USER_ID, - INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME, + INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME, }; use risingwave_common::error::Result; use risingwave_common::row::OwnedRow; @@ -32,6 +33,7 @@ use crate::catalog::catalog_service::CatalogReader; use crate::catalog::column_catalog::ColumnCatalog; use crate::catalog::system_catalog::information_schema::*; use crate::catalog::system_catalog::pg_catalog::*; +use crate::catalog::system_catalog::rw_catalog::*; use crate::meta_client::FrontendMetaClient; use crate::scheduler::worker_node_manager::WorkerNodeManagerRef; use crate::session::AuthContext; @@ -197,4 +199,5 @@ prepare_sys_catalog! { { PG_CATALOG, PG_KEYWORDS, vec![0], read_keywords_info }, { INFORMATION_SCHEMA, COLUMNS, vec![], read_columns_info }, { INFORMATION_SCHEMA, TABLES, vec![], read_tables_info }, + { RW_CATALOG, RW_META_SNAPSHOT, vec![], read_meta_snapshot await }, } diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs index 2793f263676f2..57e85f52bfedf 100644 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs @@ -53,7 +53,8 @@ pub use pg_views::*; use risingwave_common::array::ListValue; use risingwave_common::error::Result; use risingwave_common::row::OwnedRow; -use risingwave_common::types::ScalarImpl; +use risingwave_common::types::{NaiveDateTimeWrapper, ScalarImpl}; +use risingwave_common::util::epoch::Epoch; use risingwave_pb::user::grant_privilege::{Action, Object}; use risingwave_pb::user::UserInfo; use serde_json::json; @@ -179,6 +180,38 @@ impl SysCatalogReaderImpl { .collect_vec()) } + pub(super) async fn read_meta_snapshot(&self) -> Result> { + let try_get_date_time = |epoch: u64| { + if epoch == 0 { + return None; + } + let time_millis = Epoch::from(epoch).as_unix_millis(); + NaiveDateTimeWrapper::with_secs_nsecs( + (time_millis / 1000) as i64, + (time_millis % 1000 * 1_000_000) as u32, + ) + .map(ScalarImpl::NaiveDateTime) + .ok() + }; + let meta_snapshots = self + .meta_client + .list_meta_snapshots() + .await? + .into_iter() + .map(|s| { + OwnedRow::new(vec![ + Some(ScalarImpl::Int64(s.id as i64)), + Some(ScalarImpl::Int64(s.hummock_version_id as i64)), + Some(ScalarImpl::Int64(s.safe_epoch as i64)), + try_get_date_time(s.safe_epoch), + Some(ScalarImpl::Int64(s.max_committed_epoch as i64)), + try_get_date_time(s.max_committed_epoch), + ]) + }) + .collect_vec(); + Ok(meta_snapshots) + } + // FIXME(noel): Tracked by pub(super) fn read_opclass_info(&self) -> Result> { Ok(vec![]) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs new file mode 100644 index 0000000000000..9938b46462464 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod rw_meta_snapshot; +pub use rw_meta_snapshot::*; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_meta_snapshot.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_meta_snapshot.rs new file mode 100644 index 0000000000000..cf0c19a630718 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_meta_snapshot.rs @@ -0,0 +1,32 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::DataType; + +use crate::catalog::system_catalog::SystemCatalogColumnsDef; + +pub const RW_META_SNAPSHOT_TABLE_NAME: &str = "rw_meta_snapshot"; + +pub const RW_META_SNAPSHOT_COLUMNS: &[SystemCatalogColumnsDef<'_>] = &[ + (DataType::Int64, "meta_snapshot_id"), + (DataType::Int64, "hummock_version_id"), + // the smallest epoch this meta snapshot includes + (DataType::Int64, "safe_epoch"), + // human-readable timestamp of safe_epoch + (DataType::TIMESTAMP, "safe_epoch_ts"), + // the largest epoch this meta snapshot includes + (DataType::Int64, "max_committed_epoch"), + // human-readable timestamp of max_committed_epoch + (DataType::TIMESTAMP, "max_committed_epoch_ts"), +]; diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index d3386c1ed760e..9e8803665dbb7 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -35,7 +35,7 @@ use crate::planner::Planner; use crate::scheduler::plan_fragmenter::Query; use crate::scheduler::{ BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef, - HummockSnapshotGuard, LocalQueryExecution, LocalQueryStream, + LocalQueryExecution, LocalQueryStream, PinnedHummockSnapshot, }; use crate::session::SessionImpl; use crate::PlanRef; @@ -128,22 +128,27 @@ pub async fn handle_query( .collect_vec(); let mut row_stream = { - // Acquire hummock snapshot for execution. - // TODO: if there's no table scan, we don't need to acquire snapshot. - let hummock_snapshot_manager = session.env().hummock_snapshot_manager(); - let query_id = query.query_id().clone(); - let pinned_snapshot = hummock_snapshot_manager.acquire(&query_id).await?; - + let query_epoch = session.config().get_query_epoch(); + let query_snapshot = if let Some(query_epoch) = query_epoch { + PinnedHummockSnapshot::Other(query_epoch) + } else { + // Acquire hummock snapshot for execution. + // TODO: if there's no table scan, we don't need to acquire snapshot. + let hummock_snapshot_manager = session.env().hummock_snapshot_manager(); + let query_id = query.query_id().clone(); + let pinned_snapshot = hummock_snapshot_manager.acquire(&query_id).await?; + PinnedHummockSnapshot::FrontendPinned(pinned_snapshot) + }; match query_mode { QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new( - local_execute(session.clone(), query, pinned_snapshot).await?, + local_execute(session.clone(), query, query_snapshot).await?, column_types, format, )), // Local mode do not support cancel tasks. QueryMode::Distributed => { PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new( - distribute_execute(session.clone(), query, pinned_snapshot).await?, + distribute_execute(session.clone(), query, query_snapshot).await?, column_types, format, )) @@ -224,7 +229,7 @@ fn to_statement_type(stmt: &Statement) -> Result { pub async fn distribute_execute( session: Arc, query: Query, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: PinnedHummockSnapshot, ) -> Result { let execution_context: ExecutionContextRef = ExecutionContext::new(session.clone()).into(); let query_manager = session.env().query_manager().clone(); @@ -238,7 +243,7 @@ pub async fn distribute_execute( pub async fn local_execute( session: Arc, query: Query, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: PinnedHummockSnapshot, ) -> Result { let front_env = session.env(); diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 1b152d12c1db8..2df881cf0f487 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::hummock::HummockSnapshot; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_rpc_client::error::Result; @@ -40,6 +41,8 @@ pub trait FrontendMetaClient: Send + Sync { async fn unpin_snapshot(&self) -> Result<()>; async fn unpin_snapshot_before(&self, epoch: u64) -> Result<()>; + + async fn list_meta_snapshots(&self) -> Result>; } pub struct FrontendMetaClientImpl(pub MetaClient); @@ -72,4 +75,9 @@ impl FrontendMetaClient for FrontendMetaClientImpl { async fn unpin_snapshot_before(&self, epoch: u64) -> Result<()> { self.0.unpin_snapshot_before(epoch).await } + + async fn list_meta_snapshots(&self) -> Result> { + let manifest = self.0.get_meta_snapshot_manifest().await?; + Ok(manifest.snapshot_metadata) + } } diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 01be3d159ec0e..16e902cdde46c 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -75,6 +75,9 @@ impl ObserverState for FrontendObserverNode { Info::HummockVersionDeltas(_) => { panic!("frontend node should not receive HummockVersionDeltas"); } + Info::MetaBackupManifestId(_) => { + panic!("frontend node should not receive MetaBackupManifestId"); + } } } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 54b85ac9de2e7..28ecea83841ad 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -36,8 +36,7 @@ use crate::scheduler::distributed::StageExecution; use crate::scheduler::plan_fragmenter::{Query, StageId, ROOT_TASK_ID, ROOT_TASK_OUTPUT_ID}; use crate::scheduler::worker_node_manager::WorkerNodeManagerRef; use crate::scheduler::{ - ExecutionContextRef, HummockSnapshotGuard, PinnedHummockSnapshot, SchedulerError, - SchedulerResult, + ExecutionContextRef, PinnedHummockSnapshot, SchedulerError, SchedulerResult, }; /// Message sent to a `QueryRunner` to control its execution. @@ -115,7 +114,7 @@ impl QueryExecution { &self, context: ExecutionContextRef, worker_node_manager: WorkerNodeManagerRef, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: PinnedHummockSnapshot, compute_client_pool: ComputeClientPoolRef, catalog_reader: CatalogReader, query_execution_info: QueryExecutionInfoRef, @@ -209,7 +208,7 @@ impl QueryExecution { let stage_exec = Arc::new(StageExecution::new( // TODO: Add support to use current epoch when needed - pinned_snapshot.get_committed_epoch(), + pinned_snapshot.get_batch_query_epoch(), self.query.stage_graph.stages[&stage_id].clone(), worker_node_manager.clone(), self.shutdown_tx.clone(), @@ -424,7 +423,7 @@ pub(crate) mod tests { .start( ExecutionContext::new(SessionImpl::mock().into()).into(), worker_node_manager, - pinned_snapshot, + pinned_snapshot.into(), compute_client_pool, catalog_reader, query_execution_info, diff --git a/src/frontend/src/scheduler/distributed/query_manager.rs b/src/frontend/src/scheduler/distributed/query_manager.rs index c64280707c63f..c96621299e7ba 100644 --- a/src/frontend/src/scheduler/distributed/query_manager.rs +++ b/src/frontend/src/scheduler/distributed/query_manager.rs @@ -34,7 +34,7 @@ use crate::catalog::catalog_service::CatalogReader; use crate::scheduler::plan_fragmenter::{Query, QueryId}; use crate::scheduler::worker_node_manager::WorkerNodeManagerRef; use crate::scheduler::{ - ExecutionContextRef, HummockSnapshotGuard, HummockSnapshotManagerRef, SchedulerResult, + ExecutionContextRef, HummockSnapshotManagerRef, PinnedHummockSnapshot, SchedulerResult, }; pub struct DistributedQueryStream { @@ -160,7 +160,7 @@ impl QueryManager { &self, context: ExecutionContextRef, query: Query, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: PinnedHummockSnapshot, ) -> SchedulerResult { let query_id = query.query_id.clone(); let query_execution = Arc::new(QueryExecution::new(query, context.session().id())); diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index c1d3000cbbe4d..2d36dd77feffb 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -37,7 +37,7 @@ use risingwave_pb::batch_plan::{ DistributedLookupJoinNode, ExchangeNode, ExchangeSource, MergeSortExchangeNode, PlanFragment, PlanNode as PlanNodeProst, PlanNode, TaskId as TaskIdProst, TaskOutputId, }; -use risingwave_pb::common::{HostAddress, WorkerNode}; +use risingwave_pb::common::{BatchQueryEpoch, HostAddress, WorkerNode}; use risingwave_pb::task_service::{AbortTaskRequest, TaskInfoResponse}; use risingwave_rpc_client::ComputeClientPoolRef; use tokio::spawn; @@ -106,7 +106,7 @@ struct TaskStatusHolder { } pub struct StageExecution { - epoch: u64, + epoch: BatchQueryEpoch, stage: QueryStageRef, worker_node_manager: WorkerNodeManagerRef, tasks: Arc>, @@ -124,7 +124,7 @@ pub struct StageExecution { } struct StageRunner { - epoch: u64, + epoch: BatchQueryEpoch, state: Arc>, stage: QueryStageRef, worker_node_manager: WorkerNodeManagerRef, @@ -158,7 +158,7 @@ impl TaskStatusHolder { impl StageExecution { #[allow(clippy::too_many_arguments)] pub fn new( - epoch: u64, + epoch: BatchQueryEpoch, stage: QueryStageRef, worker_node_manager: WorkerNodeManagerRef, msg_sender: Sender, @@ -192,7 +192,7 @@ impl StageExecution { match cur_state { StageState::Pending { msg_sender } => { let runner = StageRunner { - epoch: self.epoch, + epoch: self.epoch.clone(), stage: self.stage.clone(), worker_node_manager: self.worker_node_manager.clone(), tasks: self.tasks.clone(), @@ -469,7 +469,7 @@ impl StageRunner { &plan_node, &task_id, self.ctx.to_batch_task_context(), - self.epoch, + self.epoch.clone(), ); let executor = executor.build().await?; @@ -700,7 +700,7 @@ impl StageRunner { let t_id = task_id.task_id; let stream_status = compute_client - .create_task(task_id, plan_fragment, self.epoch) + .create_task(task_id, plan_fragment, self.epoch.clone()) .await .map_err(|e| anyhow!(e))?; diff --git a/src/frontend/src/scheduler/hummock_snapshot_manager.rs b/src/frontend/src/scheduler/hummock_snapshot_manager.rs index ad0c3cdbf7e3b..cd8f8cd9677d5 100644 --- a/src/frontend/src/scheduler/hummock_snapshot_manager.rs +++ b/src/frontend/src/scheduler/hummock_snapshot_manager.rs @@ -19,7 +19,8 @@ use std::time::{Duration, Instant}; use anyhow::anyhow; use arc_swap::ArcSwap; -use risingwave_common::util::epoch::INVALID_EPOCH; +use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; +use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; use risingwave_pb::hummock::HummockSnapshot; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot::{channel as once_channel, Sender as Callback}; @@ -32,7 +33,37 @@ use crate::scheduler::{SchedulerError, SchedulerResult}; const UNPIN_INTERVAL_SECS: u64 = 10; pub type HummockSnapshotManagerRef = Arc; -pub type PinnedHummockSnapshot = HummockSnapshotGuard; +pub enum PinnedHummockSnapshot { + FrontendPinned(HummockSnapshotGuard), + /// Other arbitrary epoch, e.g. user specified. + /// Availability and consistency of underlying data should be guaranteed accordingly. + /// Currently it's only used for querying meta snapshot backup. + Other(Epoch), +} + +impl PinnedHummockSnapshot { + pub fn get_batch_query_epoch(&self) -> BatchQueryEpoch { + match self { + PinnedHummockSnapshot::FrontendPinned(s) => { + // extend Epoch::Current here + BatchQueryEpoch { + epoch: Some(batch_query_epoch::Epoch::Committed( + s.snapshot.committed_epoch, + )), + } + } + PinnedHummockSnapshot::Other(e) => BatchQueryEpoch { + epoch: Some(batch_query_epoch::Epoch::Backup(e.0)), + }, + } + } +} + +impl From for PinnedHummockSnapshot { + fn from(s: HummockSnapshotGuard) -> Self { + PinnedHummockSnapshot::FrontendPinned(s) + } +} type SnapshotRef = Arc>; @@ -165,7 +196,7 @@ impl HummockSnapshotManager { } } - pub async fn acquire(&self, query_id: &QueryId) -> SchedulerResult { + pub async fn acquire(&self, query_id: &QueryId) -> SchedulerResult { let (sender, rc) = once_channel(); let msg = EpochOperation::RequestEpoch { query_id: query_id.clone(), diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index d0328a4d07e13..d7d50d8112f16 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -39,11 +39,10 @@ use tracing::debug; use uuid::Uuid; use super::plan_fragmenter::{PartitionInfo, QueryStageRef}; -use super::HummockSnapshotGuard; use crate::optimizer::plan_node::PlanNodeType; use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId}; use crate::scheduler::task_context::FrontendBatchTaskContext; -use crate::scheduler::SchedulerResult; +use crate::scheduler::{PinnedHummockSnapshot, SchedulerResult}; use crate::session::{AuthContext, FrontendEnv}; pub struct LocalQueryStream { @@ -72,7 +71,7 @@ pub struct LocalQueryExecution { query: Query, front_env: FrontendEnv, // The snapshot will be released when LocalQueryExecution is dropped. - snapshot: HummockSnapshotGuard, + snapshot: PinnedHummockSnapshot, auth_context: Arc, } @@ -81,7 +80,7 @@ impl LocalQueryExecution { query: Query, front_env: FrontendEnv, sql: S, - snapshot: HummockSnapshotGuard, + snapshot: PinnedHummockSnapshot, auth_context: Arc, ) -> Self { Self { @@ -116,7 +115,7 @@ impl LocalQueryExecution { &task_id, context, // TODO: Add support to use current epoch when needed - self.snapshot.get_committed_epoch(), + self.snapshot.get_batch_query_epoch(), ); let executor = executor.build().await?; @@ -247,7 +246,7 @@ impl LocalQueryExecution { let local_execute_plan = LocalExecutePlan { plan: Some(second_stage_plan_fragment), // TODO: Add support to use current epoch when needed - epoch: self.snapshot.get_committed_epoch(), + epoch: Some(self.snapshot.get_batch_query_epoch()), }; let exchange_source = ExchangeSource { task_output_id: Some(TaskOutputId { @@ -280,7 +279,7 @@ impl LocalQueryExecution { let local_execute_plan = LocalExecutePlan { plan: Some(second_stage_plan_fragment), // TODO: Add support to use current epoch when needed - epoch: self.snapshot.get_committed_epoch(), + epoch: Some(self.snapshot.get_batch_query_epoch()), }; // NOTE: select a random work node here. let worker_node = self.front_env.worker_node_manager().next_random()?; @@ -313,7 +312,7 @@ impl LocalQueryExecution { let local_execute_plan = LocalExecutePlan { plan: Some(second_stage_plan_fragment), // TODO: Add support to use current epoch when needed - epoch: self.snapshot.get_committed_epoch(), + epoch: Some(self.snapshot.get_batch_query_epoch()), }; let workers = if second_stage.parallelism == 1 { diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 3781e02e6c535..f8cfb36113e72 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -27,6 +27,7 @@ use risingwave_common::catalog::{ DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME, }; use risingwave_common::error::Result; +use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ Database as ProstDatabase, Index as ProstIndex, Schema as ProstSchema, Sink as ProstSink, @@ -651,6 +652,10 @@ impl FrontendMetaClient for MockFrontendMetaClient { async fn unpin_snapshot_before(&self, _epoch: u64) -> RpcResult<()> { Ok(()) } + + async fn list_meta_snapshots(&self) -> RpcResult> { + Ok(vec![]) + } } #[cfg(test)] diff --git a/src/meta/src/backup_restore/backup_manager.rs b/src/meta/src/backup_restore/backup_manager.rs index 85ecb58def363..9b039b97e69b1 100644 --- a/src/meta/src/backup_restore/backup_manager.rs +++ b/src/meta/src/backup_restore/backup_manager.rs @@ -18,11 +18,12 @@ use std::time::Instant; use itertools::Itertools; use prometheus::Registry; use risingwave_backup::error::BackupError; -use risingwave_backup::storage::BackupStorageRef; -use risingwave_backup::{MetaBackupJobId, MetaSnapshotId}; +use risingwave_backup::storage::MetaSnapshotStorageRef; +use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest}; use risingwave_common::bail; use risingwave_hummock_sdk::HummockSstableId; -use risingwave_pb::backup_service::BackupJobStatus; +use risingwave_pb::backup_service::{BackupJobStatus, MetaBackupManifestId}; +use risingwave_pb::meta::subscribe_response::{Info, Operation}; use tokio::task::JoinHandle; use crate::backup_restore::meta_snapshot_builder::MetaSnapshotBuilder; @@ -40,6 +41,7 @@ pub enum BackupJobResult { /// `BackupJobHandle` tracks running job. struct BackupJobHandle { job_id: u64, + #[expect(dead_code)] hummock_version_safe_point: HummockVersionSafePoint, start_time: Instant, } @@ -60,7 +62,7 @@ pub type BackupManagerRef = Arc>; pub struct BackupManager { env: MetaSrvEnv, hummock_manager: HummockManagerRef, - backup_store: BackupStorageRef, + backup_store: MetaSnapshotStorageRef, /// Tracks the running backup job. Concurrent jobs is not supported. running_backup_job: tokio::sync::Mutex>, metrics: BackupManagerMetrics, @@ -70,7 +72,7 @@ impl BackupManager { pub fn new( env: MetaSrvEnv, hummock_manager: HummockManagerRef, - backup_store: BackupStorageRef, + backup_store: MetaSnapshotStorageRef, registry: Registry, ) -> Self { Self { @@ -87,7 +89,7 @@ impl BackupManager { Self::new( env, hummock_manager, - Arc::new(risingwave_backup::storage::DummyBackupStorage {}), + Arc::new(risingwave_backup::storage::DummyMetaSnapshotStorage::default()), Registry::new(), ) } @@ -132,8 +134,8 @@ impl BackupManager { } if self .backup_store - .list() - .await? + .manifest() + .snapshot_metadata .iter() .any(|m| m.id == job_id) { @@ -153,6 +155,14 @@ impl BackupManager { BackupJobResult::Succeeded => { self.metrics.job_latency_success.observe(job_latency); tracing::info!("succeeded backup job {}", job_id); + self.env + .notification_manager() + .notify_hummock_without_version( + Operation::Update, + Info::MetaBackupManifestId(MetaBackupManifestId { + id: self.backup_store.manifest().manifest_id, + }), + ); } BackupJobResult::Failed(e) => { self.metrics.job_latency_failure.observe(job_latency); @@ -183,16 +193,18 @@ impl BackupManager { } /// List all `SSTables` required by backups. - pub async fn list_pinned_ssts(&self) -> MetaResult> { - let r = self - .backup_store - .list() - .await? - .into_iter() - .flat_map(|s| s.ssts) + pub fn list_pinned_ssts(&self) -> Vec { + self.backup_store + .manifest() + .snapshot_metadata + .iter() + .flat_map(|s| s.ssts.clone()) .dedup() - .collect_vec(); - Ok(r) + .collect_vec() + } + + pub fn manifest(&self) -> Arc { + self.backup_store.manifest() } } diff --git a/src/meta/src/backup_restore/mod.rs b/src/meta/src/backup_restore/mod.rs index 3390fc3d635be..420b30cdc4698 100644 --- a/src/meta/src/backup_restore/mod.rs +++ b/src/meta/src/backup_restore/mod.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![allow(dead_code)] - mod backup_manager; pub use backup_manager::*; mod error; diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index d6da77b11641c..55b9e3729812f 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -16,7 +16,7 @@ use clap::Parser; use itertools::Itertools; use risingwave_backup::error::{BackupError, BackupResult}; use risingwave_backup::meta_snapshot::MetaSnapshot; -use risingwave_backup::storage::BackupStorageRef; +use risingwave_backup::storage::MetaSnapshotStorageRef; use crate::backup_restore::utils::{get_backup_store, get_meta_store, MetaStoreBackendImpl}; use crate::hummock::compaction_group::CompactionGroup; @@ -123,7 +123,7 @@ async fn restore_metadata(meta_store: S, snapshot: MetaSnapshot) - async fn restore_impl( opts: RestoreOpts, meta_store: Option, - backup_store: Option, + backup_store: Option, ) -> BackupResult<()> { if cfg!(not(test)) { assert!(meta_store.is_none()); @@ -138,7 +138,7 @@ async fn restore_impl( Some(b) => b, }; let target_id = opts.meta_snapshot_id; - let snapshot_list = backup_store.list().await?; + let snapshot_list = backup_store.manifest().snapshot_metadata.clone(); if !snapshot_list.iter().any(|m| m.id == target_id) { return Err(BackupError::Other(anyhow::anyhow!( "snapshot id {} not found", @@ -184,7 +184,7 @@ pub async fn restore(opts: RestoreOpts) -> BackupResult<()> { tracing::info!("restore succeeded"); } Err(e) => { - tracing::error!("restore failed: {}", e); + tracing::warn!("restore failed: {}", e); } } result diff --git a/src/meta/src/backup_restore/utils.rs b/src/meta/src/backup_restore/utils.rs index 67fcad7234cba..eec0a3e0355cf 100644 --- a/src/meta/src/backup_restore/utils.rs +++ b/src/meta/src/backup_restore/utils.rs @@ -17,10 +17,9 @@ use std::time::Duration; use etcd_client::ConnectOptions; use risingwave_backup::error::BackupResult; -use risingwave_backup::storage::{BackupStorageRef, ObjectStoreMetaSnapshotStorage}; +use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage}; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; -use xxhash_rust::xxh64; use crate::backup_restore::RestoreOpts; use crate::storage::{EtcdMetaStore, MemStore, WrappedEtcdClient as EtcdClient}; @@ -42,9 +41,6 @@ macro_rules! dispatch_meta_store { }}; } -pub fn xxhash64_checksum(data: &[u8]) -> u64 { - xxh64::xxh64(data, 0) -} // Code is copied from src/meta/src/rpc/server.rs. TODO #6482: extract method. pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult { let meta_store_backend = match opts.meta_store_type { @@ -80,7 +76,7 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult BackupResult { +pub async fn get_backup_store(opts: RestoreOpts) -> BackupResult { let object_store = parse_remote_object_store( &opts.storage_url, Arc::new(ObjectStoreMetrics::unused()), diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index 123830f3d73ee..4e749ea2522b1 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -165,12 +165,8 @@ where &self, ssts_to_delete: &mut Vec, ) -> MetaResult<()> { - let reject: HashSet = self - .backup_manager - .list_pinned_ssts() - .await? - .into_iter() - .collect(); + let reject: HashSet = + self.backup_manager.list_pinned_ssts().into_iter().collect(); // Ack these pinned SSTs directly. Otherwise delta log containing them cannot be GCed. // These SSTs will be GCed during full GC when they are no longer pinned. let to_ack = ssts_to_delete diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 8ed5347039cc4..bb81670f16c90 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -17,6 +17,7 @@ mod fragment; mod user; use std::collections::{HashMap, HashSet, VecDeque}; +use std::iter; use std::option::Option::Some; use std::sync::Arc; @@ -27,7 +28,7 @@ use itertools::Itertools; use risingwave_common::catalog::{ valid_table_name, TableId as StreamingJobId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG, DEFAULT_SUPER_USER_FOR_PG_ID, - DEFAULT_SUPER_USER_ID, INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME, + DEFAULT_SUPER_USER_ID, SYSTEM_SCHEMAS, }; use risingwave_common::{bail, ensure}; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; @@ -170,11 +171,7 @@ where let mut schemas = BTreeMapTransaction::new(&mut database_core.schemas); databases.insert(database.id, database.clone()); let mut schemas_added = vec![]; - for schema_name in [ - DEFAULT_SCHEMA_NAME, - PG_CATALOG_SCHEMA_NAME, - INFORMATION_SCHEMA_SCHEMA_NAME, - ] { + for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) { let schema = Schema { id: self .env diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index 34433ed087e7e..dfc1ecd73b9d8 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -395,6 +395,7 @@ pub async fn rpc_serve_with_store( cluster_manager.clone(), hummock_manager.clone(), fragment_manager.clone(), + backup_manager.clone(), ); let health_srv = HealthServiceImpl::new(); let backup_srv = BackupServiceImpl::new(backup_manager); diff --git a/src/meta/src/rpc/service/backup_service.rs b/src/meta/src/rpc/service/backup_service.rs index c31205c5527b9..5b2ac4188c45a 100644 --- a/src/meta/src/rpc/service/backup_service.rs +++ b/src/meta/src/rpc/service/backup_service.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Deref; + use risingwave_pb::backup_service::backup_service_server::BackupService; use risingwave_pb::backup_service::{ BackupMetaRequest, BackupMetaResponse, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse, - GetBackupJobStatusRequest, GetBackupJobStatusResponse, + GetBackupJobStatusRequest, GetBackupJobStatusResponse, GetMetaSnapshotManifestRequest, + GetMetaSnapshotManifestResponse, }; use tonic::{Request, Response, Status}; @@ -68,4 +71,13 @@ where self.backup_manager.delete_backups(&snapshot_ids).await?; Ok(Response::new(DeleteMetaSnapshotResponse {})) } + + async fn get_meta_snapshot_manifest( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(GetMetaSnapshotManifestResponse { + manifest: Some(self.backup_manager.manifest().deref().into()), + })) + } } diff --git a/src/meta/src/rpc/service/notification_service.rs b/src/meta/src/rpc/service/notification_service.rs index 49fa61d729772..d016101b02b45 100644 --- a/src/meta/src/rpc/service/notification_service.rs +++ b/src/meta/src/rpc/service/notification_service.rs @@ -13,6 +13,7 @@ // limitations under the License. use itertools::Itertools; +use risingwave_pb::backup_service::MetaBackupManifestId; use risingwave_pb::catalog::Table; use risingwave_pb::common::worker_node::State::Running; use risingwave_pb::common::{ParallelUnitMapping, WorkerNode, WorkerType}; @@ -24,6 +25,7 @@ use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{Request, Response, Status}; +use crate::backup_restore::BackupManagerRef; use crate::hummock::HummockManagerRef; use crate::manager::{ Catalog, CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, MetaSrvEnv, Notification, @@ -38,6 +40,7 @@ pub struct NotificationServiceImpl { cluster_manager: ClusterManagerRef, hummock_manager: HummockManagerRef, fragment_manager: FragmentManagerRef, + backup_manager: BackupManagerRef, } impl NotificationServiceImpl @@ -50,6 +53,7 @@ where cluster_manager: ClusterManagerRef, hummock_manager: HummockManagerRef, fragment_manager: FragmentManagerRef, + backup_manager: BackupManagerRef, ) -> Self { Self { env, @@ -57,6 +61,7 @@ where cluster_manager, hummock_manager, fragment_manager, + backup_manager, } } @@ -148,6 +153,7 @@ where .await .current_version .clone(); + let meta_backup_manifest_id = self.backup_manager.manifest().manifest_id; MetaSnapshot { tables, @@ -156,6 +162,9 @@ where catalog_version, ..Default::default() }), + meta_backup_manifest_id: Some(MetaBackupManifestId { + id: meta_backup_manifest_id, + }), ..Default::default() } } diff --git a/src/rpc_client/src/compute_client.rs b/src/rpc_client/src/compute_client.rs index 61b43c91e235f..df8a41a4d0ddf 100644 --- a/src/rpc_client/src/compute_client.rs +++ b/src/rpc_client/src/compute_client.rs @@ -20,6 +20,7 @@ use futures::StreamExt; use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE}; use risingwave_common::util::addr::HostAddr; use risingwave_pb::batch_plan::{PlanFragment, TaskId, TaskOutputId}; +use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::compute::config_service_client::ConfigServiceClient; use risingwave_pb::compute::{ShowConfigRequest, ShowConfigResponse}; use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient; @@ -138,7 +139,7 @@ impl ComputeClient { &self, task_id: TaskId, plan: PlanFragment, - epoch: u64, + epoch: BatchQueryEpoch, ) -> Result> { Ok(self .task_client @@ -146,7 +147,7 @@ impl ComputeClient { .create_task(CreateTaskRequest { task_id: Some(task_id), plan: Some(plan), - epoch, + epoch: Some(epoch), }) .await? .into_inner()) diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index cd3463ab3cd5d..5d794d60a041e 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -668,6 +668,12 @@ impl MetaClient { let _resp = self.inner.delete_meta_snapshot(req).await?; Ok(()) } + + pub async fn get_meta_snapshot_manifest(&self) -> Result { + let req = GetMetaSnapshotManifestRequest {}; + let resp = self.inner.get_meta_snapshot_manifest(req).await?; + Ok(resp.manifest.expect("should exist")) + } } #[async_trait] @@ -986,6 +992,7 @@ macro_rules! for_all_meta_rpc { ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse } ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse } ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse} + ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse} } }; } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index a0465b74c403b..422a0dd5711ec 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -45,6 +45,7 @@ prometheus = { version = "0.13", features = ["process"] } prost = "0.11" rand = "0.8" regex = "1" +risingwave_backup = { path = "../storage/backup" } risingwave_common = { path = "../common" } risingwave_common_service = { path = "../common/common_service" } risingwave_hummock_sdk = { path = "../storage/hummock_sdk" } diff --git a/src/storage/backup/integration_tests/Makefile.toml b/src/storage/backup/integration_tests/Makefile.toml index a3270221a501d..ce0a7195b45a4 100644 --- a/src/storage/backup/integration_tests/Makefile.toml +++ b/src/storage/backup/integration_tests/Makefile.toml @@ -1,5 +1,6 @@ [tasks.meta-backup-restore-test] category = "RiseDev - Test" +dependencies = ["pre-start-dev"] description = "Run meta backup/restore test" condition = { env_set = [ "PREFIX_BIN", "PREFIX_DATA" ] } script = """ diff --git a/src/storage/backup/integration_tests/common.sh b/src/storage/backup/integration_tests/common.sh index 6e9474796bf5b..3f01832d230b6 100644 --- a/src/storage/backup/integration_tests/common.sh +++ b/src/storage/backup/integration_tests/common.sh @@ -89,12 +89,12 @@ function execute_sql() { } function get_max_committed_epoch() { - mce=$(${BACKUP_TEST_RW_ALL_IN_ONE} risectl hummock list-version | grep max_committed_epoch | sed -n 's/max_committed_epoch: \(.*\),/\1/p') + mce=$(${BACKUP_TEST_RW_ALL_IN_ONE} risectl hummock list-version | grep max_committed_epoch | sed -n 's/^.*max_committed_epoch: \(.*\),/\1/p') echo "${mce}" } function get_safe_epoch() { - safe_epoch=$(${BACKUP_TEST_RW_ALL_IN_ONE} risectl hummock list-version | grep safe_epoch | sed -n 's/safe_epoch: \(.*\),/\1/p') + safe_epoch=$(${BACKUP_TEST_RW_ALL_IN_ONE} risectl hummock list-version | grep safe_epoch | sed -n 's/^.*safe_epoch: \(.*\),/\1/p') echo "${safe_epoch}" } diff --git a/src/storage/backup/integration_tests/run_all.sh b/src/storage/backup/integration_tests/run_all.sh index 9b6aa91434b23..73b377efa9979 100644 --- a/src/storage/backup/integration_tests/run_all.sh +++ b/src/storage/backup/integration_tests/run_all.sh @@ -2,5 +2,12 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" -bash "${DIR}/test_basic.sh" -bash "${DIR}/test_pin_sst.sh" +tests=( \ +"test_basic.sh" \ +"test_pin_sst.sh" \ +"test_query_backup.sh" \ +) +for t in "${tests[@]}" +do + bash "${DIR}/${t}" +done diff --git a/src/storage/backup/integration_tests/test_query_backup.sh b/src/storage/backup/integration_tests/test_query_backup.sh new file mode 100644 index 0000000000000..8cd7514412fcf --- /dev/null +++ b/src/storage/backup/integration_tests/test_query_backup.sh @@ -0,0 +1,116 @@ +#!/bin/bash + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +. "${DIR}/common.sh" + +stop_cluster +clean_all_data +start_cluster + +execute_sql " +SET RW_IMPLICIT_FLUSH TO true; +create table t1(v1 int, v2 int); +insert into t1 values (2,1),(1,2),(1,1); +" + +result=$( +execute_sql " +select * from t1; +" | grep "3 row" +) +[ -n "${result}" ] + +# backup before delete rows + +job_id=$(backup) +echo "${job_id}" +backup_mce=$(get_max_committed_epoch_in_backup "${job_id}") +backup_safe_epoch=$(get_safe_epoch_in_backup "${job_id}") +echo "backup MCE: ${backup_mce}" +echo "backup safe_epoch: ${backup_safe_epoch}" + +execute_sql " +SET RW_IMPLICIT_FLUSH TO true; +delete from t1 where v1=1; +" + +result=$( +execute_sql " +select * from t1; +" | grep "1 row" +) +[ -n "${result}" ] + +result=$( +execute_sql " +select * from t1; +" | grep "1 row" +) +[ -n "${result}" ] + +min_pinned_snapshot=$(get_min_pinned_snapshot) +while [ "${min_pinned_snapshot}" -le "${backup_mce}" ] ; +do + echo "wait frontend to unpin snapshot. current: ${min_pinned_snapshot}, expect: ${backup_mce}" + sleep 5 + min_pinned_snapshot=$(get_min_pinned_snapshot) +done +# safe epoch equals to 0 because no compaction has been done +safe_epoch=$(get_safe_epoch) +[ "${safe_epoch}" -eq 0 ] +# trigger a compaction to increase safe_epoch +manual_compaction -c 3 -l 0 +# wait until compaction is done +while [ "${safe_epoch}" -le "${backup_mce}" ] ; +do + safe_epoch=$(get_safe_epoch) + sleep 5 +done +echo "safe epoch after compaction: ${safe_epoch}" + +echo "QUERY_EPOCH=safe_epoch. It should fail because it's not covered by any backup" +result=$( +execute_sql " +SET QUERY_EPOCH TO ${safe_epoch}; +select * from t1; +" | grep "Read backup error backup include epoch ${safe_epoch} not found" +) +[ -n "${result}" ] + +echo "QUERY_EPOCH=0 aka disabling query backup" +result=$( +execute_sql " +SET QUERY_EPOCH TO 0; +select * from t1; +" | grep "1 row" +) +[ -n "${result}" ] + +echo "QUERY_EPOCH=backup_safe_epoch + 1, it's < safe_epoch but covered by backup" +[ $((backup_safe_epoch + 1)) -eq 1 ] +result=$( +execute_sql " +SET QUERY_EPOCH TO $((backup_safe_epoch + 1)); +select * from t1; +" | grep "0 row" +) +[ -n "${result}" ] + +echo "QUERY_EPOCH=backup_mce < safe_epoch, it's < safe_epoch but covered by backup" +result=$( +execute_sql " +SET QUERY_EPOCH TO $((backup_mce)); +select * from t1; +" | grep "3 row" +) +[ -n "${result}" ] + +echo "QUERY_EPOCH=future epoch. It should fail because it's not covered by any backup" +future_epoch=18446744073709551615 +result=$( +execute_sql " +SET QUERY_EPOCH TO ${future_epoch}; +select * from t1; +" | grep "Read backup error backup include epoch ${future_epoch} not found" +) +[ -n "${result}" ] \ No newline at end of file diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 7d5eadf9748ef..00e7d4db2420a 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -30,7 +30,6 @@ #![feature(error_generic_member_access)] #![feature(provide_any)] #![cfg_attr(coverage, feature(no_coverage))] -#![allow(dead_code)] pub mod error; pub mod meta_snapshot; @@ -38,8 +37,13 @@ pub mod storage; use std::hash::Hasher; +use itertools::Itertools; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt; use risingwave_hummock_sdk::{HummockSstableId, HummockVersionId}; +use risingwave_pb::backup_service::{ + MetaSnapshotManifest as ProstMetaSnapshotManifest, + MetaSnapshotMetadata as ProstMetaSnapshotMetadata, +}; use risingwave_pb::hummock::HummockVersion; use serde::{Deserialize, Serialize}; @@ -54,6 +58,8 @@ pub struct MetaSnapshotMetadata { pub id: MetaSnapshotId, pub hummock_version_id: HummockVersionId, pub ssts: Vec, + pub max_committed_epoch: u64, + pub safe_epoch: u64, } impl MetaSnapshotMetadata { @@ -62,10 +68,19 @@ impl MetaSnapshotMetadata { id, hummock_version_id: v.id, ssts: v.get_sst_ids(), + max_committed_epoch: v.max_committed_epoch, + safe_epoch: v.safe_epoch, } } } +/// `MetaSnapshotManifest` is the source of truth for valid `MetaSnapshot`. +#[derive(Serialize, Deserialize, Default, Clone)] +pub struct MetaSnapshotManifest { + pub manifest_id: u64, + pub snapshot_metadata: Vec, +} + // Code is copied from storage crate. TODO #6482: extract method. pub fn xxhash64_checksum(data: &[u8]) -> u64 { let mut hasher = twox_hash::XxHash64::with_seed(0); @@ -83,3 +98,23 @@ pub fn xxhash64_verify(data: &[u8], checksum: u64) -> BackupResult<()> { } Ok(()) } + +impl From<&MetaSnapshotMetadata> for ProstMetaSnapshotMetadata { + fn from(m: &MetaSnapshotMetadata) -> Self { + Self { + id: m.id, + hummock_version_id: m.hummock_version_id, + max_committed_epoch: m.max_committed_epoch, + safe_epoch: m.safe_epoch, + } + } +} + +impl From<&MetaSnapshotManifest> for ProstMetaSnapshotManifest { + fn from(m: &MetaSnapshotManifest) -> Self { + Self { + manifest_id: m.manifest_id, + snapshot_metadata: m.snapshot_metadata.iter().map_into().collect_vec(), + } + } +} diff --git a/src/storage/backup/src/storage.rs b/src/storage/backup/src/storage.rs index 16dd5749294b0..9157d933ca6b7 100644 --- a/src/storage/backup/src/storage.rs +++ b/src/storage/backup/src/storage.rs @@ -17,12 +17,13 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_object_store::object::{ObjectError, ObjectStoreRef}; -use serde::{Deserialize, Serialize}; use crate::meta_snapshot::MetaSnapshot; -use crate::{BackupError, BackupResult, MetaSnapshotId, MetaSnapshotMetadata}; +use crate::{ + BackupError, BackupResult, MetaSnapshotId, MetaSnapshotManifest, MetaSnapshotMetadata, +}; -pub type BackupStorageRef = Arc; +pub type MetaSnapshotStorageRef = Arc; #[async_trait::async_trait] pub trait MetaSnapshotStorage: 'static + Sync + Send { @@ -32,40 +33,32 @@ pub trait MetaSnapshotStorage: 'static + Sync + Send { /// Gets a snapshot by id. async fn get(&self, id: MetaSnapshotId) -> BackupResult; - /// List all snapshots' metadata. - async fn list(&self) -> BackupResult>; + /// Gets local snapshot manifest. + fn manifest(&self) -> Arc; + + /// Refreshes local snapshot manifest. + async fn refresh_manifest(&self) -> BackupResult<()>; /// Deletes snapshots by ids. async fn delete(&self, ids: &[MetaSnapshotId]) -> BackupResult<()>; } -/// `MetaSnapshotManifest` is the source of truth for valid `MetaSnapshot`. -#[derive(Serialize, Deserialize, Default, Clone)] -struct MetaSnapshotManifest { - pub manifest_id: u64, - pub snapshot_metadata: Vec, -} - #[derive(Clone)] pub struct ObjectStoreMetaSnapshotStorage { path: String, store: ObjectStoreRef, - manifest: Arc>, + manifest: Arc>>, } // TODO #6482: purge stale snapshots that is not in manifest. impl ObjectStoreMetaSnapshotStorage { pub async fn new(path: &str, store: ObjectStoreRef) -> BackupResult { - let mut instance = Self { + let instance = Self { path: path.to_string(), store, manifest: Default::default(), }; - let manifest = match instance.get_manifest().await? { - None => MetaSnapshotManifest::default(), - Some(manifest) => manifest, - }; - instance.manifest = Arc::new(parking_lot::RwLock::new(manifest)); + instance.refresh_manifest().await?; Ok(instance) } @@ -75,7 +68,7 @@ impl ObjectStoreMetaSnapshotStorage { self.store .upload(&self.get_manifest_path(), bytes.into()) .await?; - *self.manifest.write() = new_manifest; + *self.manifest.write() = Arc::new(new_manifest); Ok(()) } @@ -107,6 +100,7 @@ impl ObjectStoreMetaSnapshotStorage { format!("{}/{}.snapshot", self.path, id) } + #[allow(dead_code)] fn get_snapshot_id_from_path(path: &str) -> MetaSnapshotId { let split = path.split(&['/', '.']).collect_vec(); debug_assert!(split.len() > 2); @@ -124,7 +118,7 @@ impl MetaSnapshotStorage for ObjectStoreMetaSnapshotStorage { self.store.upload(&path, snapshot.encode().into()).await?; // update manifest last - let mut new_manifest = self.manifest.read().clone(); + let mut new_manifest = (**self.manifest.read()).clone(); new_manifest.manifest_id += 1; new_manifest .snapshot_metadata @@ -142,14 +136,24 @@ impl MetaSnapshotStorage for ObjectStoreMetaSnapshotStorage { MetaSnapshot::decode(&data) } - async fn list(&self) -> BackupResult> { - Ok(self.manifest.read().snapshot_metadata.clone()) + fn manifest(&self) -> Arc { + self.manifest.read().clone() + } + + async fn refresh_manifest(&self) -> BackupResult<()> { + if let Some(manifest) = self.get_manifest().await? { + let mut guard = self.manifest.write(); + if manifest.manifest_id > guard.manifest_id { + *guard = Arc::new(manifest); + } + } + Ok(()) } async fn delete(&self, ids: &[MetaSnapshotId]) -> BackupResult<()> { // update manifest first let to_delete: HashSet = HashSet::from_iter(ids.iter().cloned()); - let mut new_manifest = self.manifest.read().clone(); + let mut new_manifest = (**self.manifest.read()).clone(); new_manifest.manifest_id += 1; new_manifest .snapshot_metadata @@ -171,10 +175,13 @@ impl From for BackupError { } } -pub struct DummyBackupStorage {} +#[derive(Default)] +pub struct DummyMetaSnapshotStorage { + manifest: Arc, +} #[async_trait::async_trait] -impl MetaSnapshotStorage for DummyBackupStorage { +impl MetaSnapshotStorage for DummyMetaSnapshotStorage { async fn create(&self, _snapshot: &MetaSnapshot) -> BackupResult<()> { panic!("should not create from DummyBackupStorage") } @@ -183,9 +190,12 @@ impl MetaSnapshotStorage for DummyBackupStorage { panic!("should not get from DummyBackupStorage") } - async fn list(&self) -> BackupResult> { - // Satisfy `BackupManager` - Ok(vec![]) + fn manifest(&self) -> Arc { + self.manifest.clone() + } + + async fn refresh_manifest(&self) -> BackupResult<()> { + Ok(()) } async fn delete(&self, _ids: &[MetaSnapshotId]) -> BackupResult<()> { diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index c9ed4d001a5d8..0ffaf679821e0 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -27,6 +27,7 @@ extern crate num_derive; use std::cmp::Ordering; pub use key_cmp::*; +use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; use risingwave_pb::hummock::SstableInfo; use crate::compaction_group::StaticCompactionGroupId; @@ -168,6 +169,24 @@ pub enum HummockReadEpoch { Current(HummockEpoch), /// We don't need to wait epoch, we usually do stream reading with it. NoWait(HummockEpoch), + /// We don't need to wait epoch. + Backup(HummockEpoch), +} + +impl From for HummockReadEpoch { + fn from(e: BatchQueryEpoch) -> Self { + match e.epoch.unwrap() { + batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::Committed(epoch), + batch_query_epoch::Epoch::Current(epoch) => HummockReadEpoch::Current(epoch), + batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch), + } + } +} + +pub fn to_committed_batch_query_epoch(epoch: u64) -> BatchQueryEpoch { + BatchQueryEpoch { + epoch: Some(batch_query_epoch::Epoch::Committed(epoch)), + } } impl HummockReadEpoch { @@ -176,6 +195,7 @@ impl HummockReadEpoch { HummockReadEpoch::Committed(epoch) => epoch, HummockReadEpoch::Current(epoch) => epoch, HummockReadEpoch::NoWait(epoch) => epoch, + HummockReadEpoch::Backup(epoch) => epoch, } } } diff --git a/src/storage/hummock_test/benches/bench_hummock_iter.rs b/src/storage/hummock_test/benches/bench_hummock_iter.rs index e02b7aed36fe6..102cc20025b29 100644 --- a/src/storage/hummock_test/benches/bench_hummock_iter.rs +++ b/src/storage/hummock_test/benches/bench_hummock_iter.rs @@ -100,6 +100,7 @@ fn criterion_benchmark(c: &mut Criterion) { check_bloom_filter: false, retention_seconds: None, table_id: Default::default(), + read_version_from_backup: false, }, )) .unwrap(); diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index b19751779b3c7..8c040614c9c0f 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -302,6 +302,7 @@ pub(crate) mod tests { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -320,6 +321,7 @@ pub(crate) mod tests { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await; @@ -431,6 +433,7 @@ pub(crate) mod tests { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -783,6 +786,7 @@ pub(crate) mod tests { prefix_hint: None, table_id: TableId::from(existing_table_ids), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -955,6 +959,7 @@ pub(crate) mod tests { prefix_hint: None, table_id: TableId::from(existing_table_id), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -1128,6 +1133,7 @@ pub(crate) mod tests { prefix_hint: Some(bloom_filter_key), table_id: TableId::from(existing_table_id), retention_seconds: None, + read_version_from_backup: false, }, ) .await diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index ea065df09d3c3..7b6766dacd461 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -91,6 +91,7 @@ async fn test_failpoints_state_store_read_upload() { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -136,6 +137,7 @@ async fn test_failpoints_state_store_read_upload() { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await; @@ -150,6 +152,7 @@ async fn test_failpoints_state_store_read_upload() { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await; @@ -165,6 +168,7 @@ async fn test_failpoints_state_store_read_upload() { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -199,6 +203,7 @@ async fn test_failpoints_state_store_read_upload() { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -215,6 +220,7 @@ async fn test_failpoints_state_store_read_upload() { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 671ff5cf4705d..b6f3292fd8844 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -229,6 +229,7 @@ async fn test_storage_basic() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -245,6 +246,7 @@ async fn test_storage_basic() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -263,6 +265,7 @@ async fn test_storage_basic() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -293,6 +296,7 @@ async fn test_storage_basic() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -325,6 +329,7 @@ async fn test_storage_basic() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -342,6 +347,7 @@ async fn test_storage_basic() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -359,6 +365,7 @@ async fn test_storage_basic() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -391,6 +398,7 @@ async fn test_storage_basic() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -409,6 +417,7 @@ async fn test_storage_basic() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -426,6 +435,7 @@ async fn test_storage_basic() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -465,6 +475,7 @@ async fn test_storage_basic() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -631,6 +642,7 @@ async fn test_state_store_sync() { retention_seconds: None, check_bloom_filter: false, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -674,6 +686,7 @@ async fn test_state_store_sync() { retention_seconds: None, check_bloom_filter: false, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -695,6 +708,7 @@ async fn test_state_store_sync() { retention_seconds: None, check_bloom_filter: false, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -734,6 +748,7 @@ async fn test_state_store_sync() { retention_seconds: None, check_bloom_filter: false, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -855,6 +870,7 @@ async fn test_delete_get() { check_bloom_filter: true, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, } ) .await @@ -965,6 +981,7 @@ async fn test_multiple_epoch_sync() { retention_seconds: None, check_bloom_filter: false, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -982,6 +999,7 @@ async fn test_multiple_epoch_sync() { retention_seconds: None, check_bloom_filter: false, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -998,6 +1016,7 @@ async fn test_multiple_epoch_sync() { retention_seconds: None, check_bloom_filter: false, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -1132,6 +1151,7 @@ async fn test_iter_with_min_epoch() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -1154,6 +1174,7 @@ async fn test_iter_with_min_epoch() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -1174,6 +1195,7 @@ async fn test_iter_with_min_epoch() { retention_seconds: Some(1), check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -1213,6 +1235,7 @@ async fn test_iter_with_min_epoch() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -1235,6 +1258,7 @@ async fn test_iter_with_min_epoch() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -1257,6 +1281,7 @@ async fn test_iter_with_min_epoch() { retention_seconds: Some(1), check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, ) .await @@ -1407,6 +1432,7 @@ async fn test_hummock_version_reader() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, read_snapshot, ) @@ -1436,6 +1462,7 @@ async fn test_hummock_version_reader() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, read_snapshot, ) @@ -1465,6 +1492,7 @@ async fn test_hummock_version_reader() { retention_seconds: Some(1), check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, read_snapshot, ) @@ -1532,6 +1560,7 @@ async fn test_hummock_version_reader() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, read_snapshot, ) @@ -1570,6 +1599,7 @@ async fn test_hummock_version_reader() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, read_snapshot, ) @@ -1608,6 +1638,7 @@ async fn test_hummock_version_reader() { retention_seconds: Some(1), check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, read_snapshot, ) @@ -1646,6 +1677,7 @@ async fn test_hummock_version_reader() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, read_snapshot, ) @@ -1690,6 +1722,7 @@ async fn test_hummock_version_reader() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, read_snapshot, ) @@ -1728,6 +1761,7 @@ async fn test_hummock_version_reader() { retention_seconds: None, check_bloom_filter: true, prefix_hint: None, + read_version_from_backup: false, }, read_snapshot, ) diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index bec858d2c284a..551cd5cdb09dd 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -52,6 +52,7 @@ macro_rules! assert_count_range_scan { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -85,6 +86,7 @@ macro_rules! assert_count_backward_range_scan { epoch: $epoch, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 4e08c91f5cd1c..9485e9828edb5 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -125,6 +125,7 @@ async fn test_basic_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -141,6 +142,7 @@ async fn test_basic_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -159,6 +161,7 @@ async fn test_basic_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -190,6 +193,7 @@ async fn test_basic_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -222,6 +226,7 @@ async fn test_basic_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -239,6 +244,7 @@ async fn test_basic_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -256,6 +262,7 @@ async fn test_basic_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -274,6 +281,7 @@ async fn test_basic_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -292,6 +300,7 @@ async fn test_basic_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -309,6 +318,7 @@ async fn test_basic_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -327,6 +337,7 @@ async fn test_basic_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -353,6 +364,7 @@ async fn test_basic_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -369,6 +381,7 @@ async fn test_basic_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -571,6 +584,7 @@ async fn test_reload_storage() { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -589,6 +603,7 @@ async fn test_reload_storage() { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -620,6 +635,7 @@ async fn test_reload_storage() { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -638,6 +654,7 @@ async fn test_reload_storage() { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -656,6 +673,7 @@ async fn test_reload_storage() { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -674,6 +692,7 @@ async fn test_reload_storage() { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -691,6 +710,7 @@ async fn test_reload_storage() { prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -735,6 +755,7 @@ async fn test_write_anytime_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, } ) .await @@ -753,6 +774,7 @@ async fn test_write_anytime_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, } ) .await @@ -771,6 +793,7 @@ async fn test_write_anytime_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, } ) .await @@ -791,6 +814,7 @@ async fn test_write_anytime_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -856,6 +880,7 @@ async fn test_write_anytime_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, } ) .await @@ -872,6 +897,7 @@ async fn test_write_anytime_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, } ) .await @@ -889,6 +915,7 @@ async fn test_write_anytime_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, } ) .await @@ -908,6 +935,7 @@ async fn test_write_anytime_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, }, ) .await @@ -1062,6 +1090,7 @@ async fn test_delete_get_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, } ) .await @@ -1147,6 +1176,7 @@ async fn test_multiple_epoch_sync_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, } ) .await @@ -1164,6 +1194,7 @@ async fn test_multiple_epoch_sync_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, } ) .await @@ -1180,6 +1211,7 @@ async fn test_multiple_epoch_sync_inner( prefix_hint: None, table_id: Default::default(), retention_seconds: None, + read_version_from_backup: false, } ) .await diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index fb7a2e85e0101..f5336d475bd76 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -334,6 +334,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() { prefix_hint: None, table_id: TableId::from(existing_table_id), retention_seconds: None, + read_version_from_backup: false, }; let get_result = storage .get(b"hhh", 120, read_options.clone()) diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index ee31093590270..4ef449649ef17 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -37,10 +37,12 @@ use risingwave_meta::hummock::test_utils::{ use risingwave_meta::hummock::{HummockManager, HummockManagerRef, MockHummockMetaClient}; use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey}; use risingwave_meta::storage::{MemStore, MetaStore}; +use risingwave_pb::backup_service::MetaBackupManifestId; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::pin_version_response; use risingwave_pb::meta::{MetaSnapshot, SubscribeResponse, SubscribeType}; use risingwave_storage::error::StorageResult; +use risingwave_storage::hummock::backup_reader::BackupReader; use risingwave_storage::hummock::event_handler::HummockEvent; use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; @@ -112,6 +114,7 @@ impl NotificationClient for TestNotificationClient { let meta_snapshot = MetaSnapshot { hummock_version: Some(hummock_version), version: Some(Default::default()), + meta_backup_manifest_id: Some(MetaBackupManifestId { id: 0 }), ..Default::default() }; @@ -146,9 +149,14 @@ pub async fn prepare_first_valid_version( let (tx, mut rx) = unbounded_channel(); let notification_client = get_test_notification_client(env, hummock_manager_ref.clone(), worker_node.clone()); + let backup_manager = BackupReader::unused(); let observer_manager = ObserverManager::new( notification_client, - HummockObserverNode::new(Arc::new(FilterKeyExtractorManager::default()), tx.clone()), + HummockObserverNode::new( + Arc::new(FilterKeyExtractorManager::default()), + backup_manager, + tx.clone(), + ), ) .await; observer_manager.start().await; diff --git a/src/storage/src/hummock/backup_reader.rs b/src/storage/src/hummock/backup_reader.rs new file mode 100644 index 0000000000000..71e8dcdd30554 --- /dev/null +++ b/src/storage/src/hummock/backup_reader.rs @@ -0,0 +1,199 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::time::Duration; + +use futures::future::Shared; +use futures::FutureExt; +use risingwave_backup::error::BackupError; +use risingwave_backup::meta_snapshot::MetaSnapshot; +use risingwave_backup::storage::{ + DummyMetaSnapshotStorage, MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage, +}; +use risingwave_backup::MetaSnapshotId; +use risingwave_common::config::RwConfig; +use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; +use risingwave_object_store::object::parse_remote_object_store; + +use crate::error::{StorageError, StorageResult}; +use crate::hummock::local_version::pinned_version::{PinVersionAction, PinnedVersion}; +use crate::hummock::HummockError; + +pub type BackupReaderRef = Arc; + +type VersionHolder = ( + PinnedVersion, + tokio::sync::mpsc::UnboundedReceiver, +); + +pub async fn parse_meta_snapshot_storage( + config: &RwConfig, +) -> StorageResult { + let backup_object_store = Arc::new( + parse_remote_object_store( + &config.backup.storage_url, + Arc::new(ObjectStoreMetrics::unused()), + true, + ) + .await, + ); + let store = Arc::new( + ObjectStoreMetaSnapshotStorage::new(&config.backup.storage_directory, backup_object_store) + .await?, + ); + Ok(store) +} + +type InflightRequest = Shared> + Send>>>; +/// `BackupReader` helps to access historical hummock versions, +/// which are persisted in meta snapshots (aka backups). +pub struct BackupReader { + versions: parking_lot::RwLock>, + inflight_request: parking_lot::Mutex>, + store: MetaSnapshotStorageRef, + refresh_tx: tokio::sync::mpsc::UnboundedSender, +} + +impl BackupReader { + pub fn new(store: MetaSnapshotStorageRef) -> BackupReaderRef { + let (refresh_tx, refresh_rx) = tokio::sync::mpsc::unbounded_channel(); + let instance = Arc::new(Self { + store, + versions: Default::default(), + inflight_request: Default::default(), + refresh_tx, + }); + tokio::spawn(Self::start_manifest_refresher(instance.clone(), refresh_rx)); + instance + } + + pub fn unused() -> BackupReaderRef { + Self::new(Arc::new(DummyMetaSnapshotStorage::default())) + } + + async fn start_manifest_refresher( + backup_reader: BackupReaderRef, + mut refresh_rx: tokio::sync::mpsc::UnboundedReceiver, + ) { + loop { + let expect_manifest_id = refresh_rx.recv().await; + if expect_manifest_id.is_none() { + break; + } + let expect_manifest_id = expect_manifest_id.unwrap(); + let previous_id = backup_reader.store.manifest().manifest_id; + if expect_manifest_id <= previous_id { + continue; + } + if let Err(e) = backup_reader.store.refresh_manifest().await { + // reschedule refresh request + tracing::warn!("failed to refresh backup manifest, will retry. {}", e); + let backup_reader_clone = backup_reader.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(60)).await; + backup_reader_clone.try_refresh_manifest(expect_manifest_id); + }); + continue; + } + // purge stale version cache + let manifest: HashSet = backup_reader + .store + .manifest() + .snapshot_metadata + .iter() + .map(|s| s.id) + .collect(); + backup_reader + .versions + .write() + .retain(|k, _v| manifest.contains(k)); + } + } + + pub fn try_refresh_manifest(self: &BackupReaderRef, min_manifest_id: u64) { + let _ = self + .refresh_tx + .send(min_manifest_id) + .inspect_err(|e| tracing::warn!("failed to send refresh_manifest request {}", e)); + } + + /// Tries to get a hummock version eligible for querying `epoch`. + /// SSTs of the returned version are expected to be guarded by corresponding backup. + /// Otherwise, reading the version may encounter object store error, due to SST absence. + pub async fn try_get_hummock_version( + self: &BackupReaderRef, + epoch: u64, + ) -> StorageResult> { + // 1. check manifest to locate snapshot, if any. + let snapshot_id = self + .store + .manifest() + .snapshot_metadata + .iter() + .find(|v| epoch >= v.safe_epoch && epoch <= v.max_committed_epoch) + .map(|s| s.id); + let snapshot_id = match snapshot_id { + None => { + return Ok(None); + } + Some(s) => s, + }; + // 2. load hummock version of chosen snapshot. + let future = { + let mut req_guard = self.inflight_request.lock(); + if let Some((v, _)) = self.versions.read().get(&snapshot_id) { + return Ok(Some(v.clone())); + } + if let Some(f) = req_guard.get(&snapshot_id) { + f.clone() + } else { + let this = self.clone(); + let f = async move { + let snapshot = this.store.get(snapshot_id).await.map_err(|e| { + format!("failed to get meta snapshot {}. {}", snapshot_id, e) + })?; + let version_holder = build_version_holder(snapshot); + let version_clone = version_holder.0.clone(); + this.versions.write().insert(snapshot_id, version_holder); + Ok(version_clone) + } + .boxed() + .shared(); + req_guard.insert(snapshot_id, f.clone()); + f + } + }; + let result = future + .await + .map(Some) + .map_err(|e| HummockError::read_backup_error(e).into()); + self.inflight_request.lock().remove(&snapshot_id); + result + } +} + +fn build_version_holder(s: MetaSnapshot) -> VersionHolder { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + (PinnedVersion::new(s.metadata.hummock_version, tx), rx) +} + +impl From for StorageError { + fn from(e: BackupError) -> Self { + HummockError::other(e).into() + } +} diff --git a/src/storage/src/hummock/compaction_group_client.rs b/src/storage/src/hummock/compaction_group_client.rs deleted file mode 100644 index 4ead9bb960986..0000000000000 --- a/src/storage/src/hummock/compaction_group_client.rs +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright 2022 Singularity Data -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; -use std::ops::DerefMut; -use std::sync::Arc; - -use parking_lot::{Mutex, RwLock}; -use risingwave_hummock_sdk::compaction_group::StateTableId; -use risingwave_hummock_sdk::CompactionGroupId; -use risingwave_pb::hummock::CompactionGroup; -use risingwave_rpc_client::HummockMetaClient; -use tokio::sync::oneshot; - -use crate::hummock::{HummockError, HummockResult}; - -pub enum CompactionGroupClientImpl { - Meta(Arc), - Dummy(DummyCompactionGroupClient), -} - -impl CompactionGroupClientImpl { - pub async fn get_compaction_group_id( - &self, - table_id: StateTableId, - ) -> HummockResult { - match self { - CompactionGroupClientImpl::Meta(c) => c.get_compaction_group_id(table_id).await, - CompactionGroupClientImpl::Dummy(c) => Ok(c.get_compaction_group_id()), - } - } - - pub fn update_by( - &self, - compaction_groups: Vec, - is_complete_snapshot: bool, - all_table_ids: &[StateTableId], - ) { - match self { - CompactionGroupClientImpl::Meta(c) => { - c.update_by(compaction_groups, is_complete_snapshot, all_table_ids) - } - CompactionGroupClientImpl::Dummy(_) => (), - } - } -} - -/// `CompactionGroupClientImpl` maintains compaction group metadata cache. -pub struct MetaCompactionGroupClient { - // Lock order: wait_queue before cache - wait_queue: Mutex>>>, - cache: RwLock, - hummock_meta_client: Arc, -} - -impl MetaCompactionGroupClient { - /// TODO: cache is synced on need currently. We can refactor it to push based after #3679. - async fn get_compaction_group_id( - self: &Arc, - table_id: StateTableId, - ) -> HummockResult { - // We wait for cache update for at most twice. - // For the first time there may already be an inflight RPC when cache miss, whose response - // may not contain wanted cache entry. For the second time the new RPC must contain - // wanted cache entry, no matter the RPC is fired by this task or other. Otherwise, - // the caller is trying to get an inexistent cache entry, which indicates a bug. - let mut wait_counter = 0; - while wait_counter <= 2 { - // 1. Get from cache - if let Some(id) = self.cache.read().get(&table_id) { - return Ok(id); - } - // 2. Otherwise either update cache, or wait for previous update if any. - let waiter = { - let mut guard = self.wait_queue.lock(); - if let Some(id) = self.cache.read().get(&table_id) { - return Ok(id); - } - let wait_queue = guard.deref_mut(); - if let Some(wait_queue) = wait_queue { - let (tx, rx) = oneshot::channel(); - wait_queue.push(tx); - Some(rx) - } else { - *wait_queue = Some(vec![]); - None - } - }; - if let Some(waiter) = waiter { - // Wait for previous update - if let Ok(success) = waiter.await && success { - wait_counter += 1; - } - continue; - } - // Update cache - let this = self.clone(); - tokio::spawn(async move { - let result = this.update().await; - let mut guard = this.wait_queue.lock(); - let wait_queue = guard.deref_mut().take().unwrap(); - for notify in wait_queue { - let _ = notify.send(result.is_ok()); - } - result - }) - .await - .unwrap()?; - wait_counter += 1; - } - Err(HummockError::compaction_group_error(format!( - "compaction group not found for table id {}", - table_id - ))) - } -} - -impl MetaCompactionGroupClient { - pub fn new(hummock_meta_client: Arc) -> Self { - Self { - wait_queue: Default::default(), - cache: Default::default(), - hummock_meta_client, - } - } - - async fn update(&self) -> HummockResult<()> { - let compaction_groups = self - .hummock_meta_client - .get_compaction_groups() - .await - .map_err(HummockError::meta_error)?; - let mut guard = self.cache.write(); - guard.supply_index(compaction_groups, true, false, &[]); - Ok(()) - } - - fn update_by( - &self, - compaction_groups: Vec, - is_complete_snapshot: bool, - all_table_ids: &[StateTableId], - ) { - let mut guard = self.cache.write(); - guard.supply_index( - compaction_groups, - false, - is_complete_snapshot, - all_table_ids, - ); - } -} - -#[derive(Default)] -struct CompactionGroupClientInner { - index: HashMap, -} - -impl CompactionGroupClientInner { - fn get(&self, table_id: &StateTableId) -> Option { - self.index.get(table_id).cloned() - } - - fn update_member_ids( - &mut self, - member_ids: &[StateTableId], - is_pull: bool, - cg_id: CompactionGroupId, - ) { - for table_id in member_ids { - match self.index.entry(*table_id) { - Entry::Occupied(mut entry) => { - if !is_pull { - entry.insert(cg_id); - } - } - Entry::Vacant(entry) => { - entry.insert(cg_id); - } - } - } - } - - fn supply_index( - &mut self, - compaction_groups: Vec, - is_pull: bool, - is_complete_snapshot: bool, - all_table_ids: &[StateTableId], - ) { - if is_complete_snapshot { - self.index.clear(); - } else if !all_table_ids.is_empty() { - let all_table_set: HashSet = all_table_ids.iter().cloned().collect(); - self.index - .retain(|table_id, _| all_table_set.contains(table_id)); - } - for compaction_group in compaction_groups { - let member_ids = compaction_group.get_member_table_ids(); - self.update_member_ids(member_ids, is_pull, compaction_group.get_id()); - } - } -} - -pub struct DummyCompactionGroupClient { - /// Always return this `compaction_group_id`. - compaction_group_id: CompactionGroupId, -} - -impl DummyCompactionGroupClient { - pub fn new(compaction_group_id: CompactionGroupId) -> Self { - Self { - compaction_group_id, - } - } -} - -impl DummyCompactionGroupClient { - fn get_compaction_group_id(&self) -> CompactionGroupId { - self.compaction_group_id - } -} diff --git a/src/storage/src/hummock/error.rs b/src/storage/src/hummock/error.rs index 05b7d1d4b1252..9f466e77c5441 100644 --- a/src/storage/src/hummock/error.rs +++ b/src/storage/src/hummock/error.rs @@ -56,6 +56,8 @@ enum HummockErrorInner { CompactionGroupError(String), #[error("SstableUpload error {0}.")] SstableUploadError(String), + #[error("Read backup error {0}.")] + ReadBackupError(String), #[error("Other error {0}.")] Other(String), } @@ -117,6 +119,10 @@ impl HummockError { HummockErrorInner::ExpiredEpoch { safe_epoch, epoch }.into() } + pub fn is_expired_epoch(&self) -> bool { + matches!(self.inner, HummockErrorInner::ExpiredEpoch { .. }) + } + pub fn compaction_executor(error: impl ToString) -> HummockError { HummockErrorInner::CompactionExecutor(error.to_string()).into() } @@ -137,6 +143,10 @@ impl HummockError { HummockErrorInner::SstableUploadError(error.to_string()).into() } + pub fn read_backup_error(error: impl ToString) -> HummockError { + HummockErrorInner::ReadBackupError(error.to_string()).into() + } + pub fn other(error: impl ToString) -> HummockError { HummockErrorInner::Other(error.to_string()).into() } diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 40a40b263febb..004fd7004c6a4 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -47,7 +47,6 @@ pub use tiered_cache::*; pub mod sstable; pub use sstable::*; -pub mod compaction_group_client; pub mod compactor; pub mod conflict_detector; mod error; @@ -62,6 +61,7 @@ pub mod test_utils; pub mod utils; pub use compactor::{CompactorMemoryCollector, CompactorSstableStore}; pub use utils::MemoryLimiter; +pub mod backup_reader; pub mod event_handler; pub mod local_version; pub mod observer_manager; @@ -85,6 +85,7 @@ use self::iterator::{BackwardUserIterator, HummockIterator, UserIterator}; pub use self::sstable_store::*; use super::monitor::StateStoreMetrics; use crate::error::StorageResult; +use crate::hummock::backup_reader::{BackupReader, BackupReaderRef}; use crate::hummock::compactor::Context; use crate::hummock::event_handler::hummock_event_handler::BufferTracker; use crate::hummock::event_handler::{HummockEvent, HummockEventHandler}; @@ -137,6 +138,8 @@ pub struct HummockStorage { read_version_mapping: Arc, tracing: Arc, + + backup_reader: BackupReaderRef, } impl HummockStorage { @@ -144,6 +147,7 @@ impl HummockStorage { pub async fn new( options: Arc, sstable_store: SstableStoreRef, + backup_reader: BackupReaderRef, hummock_meta_client: Arc, notification_client: impl NotificationClient, // TODO: separate `HummockStats` from `StateStoreMetrics`. @@ -160,7 +164,11 @@ impl HummockStorage { let observer_manager = ObserverManager::new( notification_client, - HummockObserverNode::new(filter_key_extractor_manager.clone(), event_tx.clone()), + HummockObserverNode::new( + filter_key_extractor_manager.clone(), + backup_reader.clone(), + event_tx.clone(), + ), ) .await; observer_manager.start().await; @@ -206,6 +214,7 @@ impl HummockStorage { }), read_version_mapping: hummock_event_handler.read_version_mapping(), tracing, + backup_reader, }; tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker()); @@ -301,6 +310,7 @@ impl HummockStorage { Self::new( options, sstable_store, + BackupReader::unused(), hummock_meta_client, notification_client, Arc::new(StateStoreMetrics::unused()), @@ -492,10 +502,14 @@ impl HummockStorageV1 { let filter_key_extractor_manager = Arc::new(FilterKeyExtractorManager::default()); let (event_tx, mut event_rx) = unbounded_channel(); - + let backup_manager = BackupReader::unused(); let observer_manager = ObserverManager::new( notification_client, - HummockObserverNode::new(filter_key_extractor_manager.clone(), event_tx.clone()), + HummockObserverNode::new( + filter_key_extractor_manager.clone(), + backup_manager, + event_tx.clone(), + ), ) .await; observer_manager.start().await; diff --git a/src/storage/src/hummock/observer_manager.rs b/src/storage/src/hummock/observer_manager.rs index 41c338259f930..e55e6b26c5423 100644 --- a/src/storage/src/hummock/observer_manager.rs +++ b/src/storage/src/hummock/observer_manager.rs @@ -25,11 +25,14 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::SubscribeResponse; use tokio::sync::mpsc::UnboundedSender; +use crate::hummock::backup_reader::BackupReaderRef; use crate::hummock::event_handler::HummockEvent; pub struct HummockObserverNode { filter_key_extractor_manager: FilterKeyExtractorManagerRef, + backup_reader: BackupReaderRef, + version_update_sender: UnboundedSender, version: u64, @@ -68,6 +71,10 @@ impl ObserverState for HummockObserverNode { }); } + Info::MetaBackupManifestId(id) => { + self.backup_reader.try_refresh_manifest(id.id); + } + _ => { panic!("error type notification"); } @@ -80,6 +87,12 @@ impl ObserverState for HummockObserverNode { }; self.handle_catalog_snapshot(snapshot.tables); + self.backup_reader.try_refresh_manifest( + snapshot + .meta_backup_manifest_id + .expect("should get meta backup manifest id") + .id, + ); let _ = self .version_update_sender .send(HummockEvent::VersionUpdate( @@ -100,10 +113,12 @@ impl ObserverState for HummockObserverNode { impl HummockObserverNode { pub fn new( filter_key_extractor_manager: FilterKeyExtractorManagerRef, + backup_reader: BackupReaderRef, version_update_sender: UnboundedSender, ) -> Self { Self { filter_key_extractor_manager, + backup_reader, version_update_sender, version: 0, } diff --git a/src/storage/src/hummock/state_store.rs b/src/storage/src/hummock/state_store.rs index ab8912d9e4996..1fb0cc1bc9dbe 100644 --- a/src/storage/src/hummock/state_store.rs +++ b/src/storage/src/hummock/state_store.rs @@ -61,8 +61,11 @@ impl HummockStorage { Bound::Included(TableKey(key.to_vec())), ); - let read_version_tuple = - self.build_read_version_tuple(epoch, read_options.table_id, &key_range)?; + let read_version_tuple = if read_options.read_version_from_backup { + self.build_read_version_tuple_from_backup(epoch).await? + } else { + self.build_read_version_tuple(epoch, read_options.table_id, &key_range)? + }; self.hummock_version_reader .get(TableKey(key), epoch, read_options, read_version_tuple) @@ -75,14 +78,35 @@ impl HummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult> { - let read_version_tuple = - self.build_read_version_tuple(epoch, read_options.table_id, &key_range)?; + let read_version_tuple = if read_options.read_version_from_backup { + self.build_read_version_tuple_from_backup(epoch).await? + } else { + self.build_read_version_tuple(epoch, read_options.table_id, &key_range)? + }; self.hummock_version_reader .iter(key_range, epoch, read_options, read_version_tuple) .await } + async fn build_read_version_tuple_from_backup( + &self, + epoch: u64, + ) -> StorageResult<(Vec, Vec, CommittedVersion)> { + match self.backup_reader.try_get_hummock_version(epoch).await { + Ok(Some(backup_version)) => { + validate_epoch(backup_version.safe_epoch(), epoch)?; + Ok((Vec::default(), Vec::default(), backup_version)) + } + Ok(None) => Err(HummockError::read_backup_error(format!( + "backup include epoch {} not found", + epoch + )) + .into()), + Err(e) => Err(e), + } + } + fn build_read_version_tuple( &self, epoch: u64, @@ -172,7 +196,7 @@ impl StateStore for HummockStorage { ); return Ok(()); } - HummockReadEpoch::NoWait(_) => return Ok(()), + HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => return Ok(()), }; if wait_epoch == HummockEpoch::MAX { panic!("epoch should not be u64::MAX"); diff --git a/src/storage/src/hummock/state_store_v1.rs b/src/storage/src/hummock/state_store_v1.rs index 307f3e8f749d0..01c9b718a9c6c 100644 --- a/src/storage/src/hummock/state_store_v1.rs +++ b/src/storage/src/hummock/state_store_v1.rs @@ -502,7 +502,7 @@ impl StateStore for HummockStorageV1 { ); return Ok(()); } - HummockReadEpoch::NoWait(_) => return Ok(()), + HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => return Ok(()), }; if wait_epoch == HummockEpoch::MAX { panic!("epoch should not be u64::MAX"); diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 25ce7a38854d3..2927956f70c9c 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -282,6 +282,9 @@ pub struct ReadOptions { pub retention_seconds: Option, pub table_id: TableId, + /// Read from historical hummock version of meta snapshot backup. + /// It should only be used by `StorageTable` for batch query. + pub read_version_from_backup: bool, } pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<&u32>) -> u64 { diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 2884c8fe248af..487287790584d 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -16,7 +16,7 @@ use std::fmt::Debug; use std::sync::Arc; use enum_as_inner::EnumAsInner; -use risingwave_common::config::StorageConfig; +use risingwave_common::config::RwConfig; use risingwave_common_service::observer_manager::RpcNotificationClient; use risingwave_hummock_sdk::filter_key_extractor::FilterKeyExtractorManagerRef; use risingwave_object_store::object::{ @@ -24,6 +24,7 @@ use risingwave_object_store::object::{ }; use crate::error::StorageResult; +use crate::hummock::backup_reader::{parse_meta_snapshot_storage, BackupReader}; use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::{ @@ -464,13 +465,14 @@ impl StateStoreImpl { pub async fn new( s: &str, file_cache_dir: &str, - config: Arc, + rw_config: &RwConfig, hummock_meta_client: Arc, state_store_stats: Arc, object_store_metrics: Arc, tiered_cache_metrics_builder: TieredCacheMetricsBuilder, tracing: Arc, ) -> StorageResult { + let config = Arc::new(rw_config.storage.clone()); #[cfg(not(target_os = "linux"))] let tiered_cache = TieredCache::none(); @@ -531,9 +533,12 @@ impl StateStoreImpl { RpcNotificationClient::new(hummock_meta_client.get_inner().clone()); if !config.enable_state_store_v1 { + let backup_store = parse_meta_snapshot_storage(rw_config).await?; + let backup_reader = BackupReader::new(backup_store); let inner = HummockStorage::new( config.clone(), sstable_store, + backup_reader, hummock_meta_client.clone(), notification_client, state_store_stats.clone(), diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 6c21530fc4c4e..e80e9b83cf820 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -241,6 +241,7 @@ impl StorageTable { wait_epoch: HummockReadEpoch, ) -> StorageResult> { let epoch = wait_epoch.get_epoch(); + let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); self.store.try_wait_epoch(wait_epoch).await?; let serialized_pk = serialize_pk_with_vnode(&pk, &self.pk_serializer, self.compute_vnode_by_pk(&pk)); @@ -253,6 +254,7 @@ impl StorageTable { retention_seconds: self.table_option.retention_seconds, ignore_range_tombstone: false, table_id: self.table_id, + read_version_from_backup: read_backup, }; if let Some(value) = self.store.get(&serialized_pk, epoch, read_options).await? { let full_row = self.row_deserializer.deserialize(value)?; @@ -330,6 +332,7 @@ impl StorageTable { let iterators: Vec<_> = try_join_all(raw_key_ranges.map(|raw_key_range| { let prefix_hint = prefix_hint.clone(); let wait_epoch = wait_epoch.clone(); + let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); async move { let check_bloom_filter = prefix_hint.is_some(); let read_options = ReadOptions { @@ -338,6 +341,7 @@ impl StorageTable { ignore_range_tombstone: false, retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, + read_version_from_backup: read_backup, }; let iter = StorageTableIterInner::::new( &self.store, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 9d1cd18fb2105..e78d061a54824 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -457,6 +457,7 @@ impl StateTable { retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, ignore_range_tombstone: false, + read_version_from_backup: false, }; if let Some(storage_row_bytes) = self .local_store @@ -761,6 +762,7 @@ impl StateTable { retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, ignore_range_tombstone: false, + read_version_from_backup: false, }; let stored_value = self.local_store.get(key, epoch, read_options).await?; @@ -793,6 +795,7 @@ impl StateTable { retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, ignore_range_tombstone: false, + read_version_from_backup: false, }; let stored_value = self.local_store.get(key, epoch, read_options).await?; @@ -827,6 +830,7 @@ impl StateTable { check_bloom_filter: false, retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, + read_version_from_backup: false, }; let stored_value = self.local_store.get(key, epoch, read_options).await?; @@ -1031,6 +1035,7 @@ impl StateTable { ignore_range_tombstone: false, retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, + read_version_from_backup: false, }; // Storage iterator. diff --git a/src/tests/compaction_test/src/runner.rs b/src/tests/compaction_test/src/runner.rs index 6e6ffabdd5cc9..40725b07ed07e 100644 --- a/src/tests/compaction_test/src/runner.rs +++ b/src/tests/compaction_test/src/runner.rs @@ -14,7 +14,7 @@ use std::collections::{BTreeMap, HashSet}; use std::net::SocketAddr; -use std::ops::Bound; +use std::ops::{Bound, Deref}; use std::pin::Pin; use std::sync::Arc; use std::thread::JoinHandle; @@ -26,7 +26,7 @@ use clap::Parser; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::config::{load_config, StorageConfig}; +use risingwave_common::config::{load_config, RwConfig, StorageConfig}; use risingwave_common::util::addr::HostAddr; use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, FIRST_VERSION_ID}; use risingwave_pb::common::WorkerType; @@ -596,6 +596,7 @@ async fn open_hummock_iters( retention_seconds: None, check_bloom_filter: false, ignore_range_tombstone: false, + read_version_from_backup: false, }, ) .await?; @@ -657,11 +658,15 @@ pub async fn create_hummock_store_with_metrics( state_store_metrics: Arc::new(StateStoreMetrics::unused()), object_store_metrics: Arc::new(ObjectStoreMetrics::unused()), }; + let rw_config = RwConfig { + storage: storage_config.deref().clone(), + ..Default::default() + }; let state_store_impl = StateStoreImpl::new( &opts.state_store, "", - storage_config, + &rw_config, Arc::new(MonitoredHummockMetaClient::new( meta_client.clone(), metrics.hummock_metrics.clone(),