Skip to content

Commit

Permalink
Add backup collection kqp part (ydb-platform#11022)
Browse files Browse the repository at this point in the history
  • Loading branch information
Enjection authored Oct 29, 2024
1 parent 103d800 commit 06b8cb8
Show file tree
Hide file tree
Showing 13 changed files with 587 additions and 29 deletions.
18 changes: 18 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,24 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {

}

case NKqpProto::TKqpSchemeOperation::kCreateBackupCollection: {
const auto& modifyScheme = schemeOp.GetCreateBackupCollection();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kAlterBackupCollection: {
const auto& modifyScheme = schemeOp.GetAlterBackupCollection();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kDropBackupCollection: {
const auto& modifyScheme = schemeOp.GetDropBackupCollection();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
<< (ui32) schemeOp.GetOperationCase());
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,18 @@ class TKikimrIcGateway : public IKqpGateway {
}
}

TFuture<TGenericResult> CreateBackupCollection(const TString&, const NYql::TCreateBackupCollectionSettings&) override {
return NotImplemented<TGenericResult>();
}

TFuture<TGenericResult> AlterBackupCollection(const TString&, const NYql::TAlterBackupCollectionSettings&) override {
return NotImplemented<TGenericResult>();
}

TFuture<TGenericResult> DropBackupCollection(const TString&, const NYql::TDropBackupCollectionSettings&) override {
return NotImplemented<TGenericResult>();
}

TFuture<TGenericResult> CreateUser(const TString& cluster, const NYql::TCreateUserSettings& settings) override {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

Expand Down
160 changes: 160 additions & 0 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <ydb/services/metadata/abstract/kqp_common.h>
#include <ydb/services/lib/actors/pq_schema_actor.h>

#include <util/generic/overloaded.h>

namespace NKikimr::NKqp {

using namespace NThreading;
Expand Down Expand Up @@ -1121,6 +1123,164 @@ class TKqpGatewayProxy : public IKikimrGateway {
}
}

TFuture<TGenericResult> CreateBackupCollection(const TString& cluster, const NYql::TCreateBackupCollectionSettings& settings) override {
CHECK_PREPARED_DDL(CreateBackupCollection);

try {
if (cluster != SessionCtx->GetCluster()) {
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
}

TString path;

if (!settings.Name.StartsWith(SessionCtx->GetDatabase())) {
path = JoinPath({SessionCtx->GetDatabase(), ".backups/collections", settings.Name});
} else {
path = settings.Name;
}

TString error;
std::pair<TString, TString> pathPair;
if (!NSchemeHelpers::SplitTablePath(path, GetDatabase(), pathPair, error, true)) {
return MakeFuture(ResultFromError<TGenericResult>(error));
}

NKikimrSchemeOp::TModifyScheme tx;
tx.SetWorkingDir(pathPair.first);
tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateBackupCollection);

auto& op = *tx.MutableCreateBackupCollection();
op.SetName(pathPair.second);

if (settings.Settings.IncrementalBackupEnabled) {
op.MutableIncrementalBackupConfig();
}

auto errOpt = std::visit(
TOverloaded {
[](const TCreateBackupCollectionSettings::TDatabase&) -> std::optional<TString> {
return "Unimplemented";
},
[&](const TVector<TCreateBackupCollectionSettings::TTable>& tables) -> std::optional<TString> {
auto& dstTables = *op.MutableExplicitEntryList();
for (const auto& table : tables) {
auto& entry = *dstTables.AddEntries();
entry.SetType(NKikimrSchemeOp::TBackupCollectionDescription::TBackupEntry::ETypeTable);
entry.SetPath(table.Path);
}
return std::nullopt;
},
}, settings.Entries);

if (errOpt) {
return MakeFuture(ResultFromError<TGenericResult>(*errOpt));
}

op.MutableCluster();

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
phyTx.MutableSchemeOperation()->MutableCreateBackupCollection()->Swap(&tx);

TGenericResult result;
result.SetSuccess();
return MakeFuture(result);
} else {
return Gateway->ModifyScheme(std::move(tx));
}
}
catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}
}

TFuture<TGenericResult> AlterBackupCollection(const TString& cluster, const NYql::TAlterBackupCollectionSettings& settings) override {
CHECK_PREPARED_DDL(AlterBackupCollection);

try {
if (cluster != SessionCtx->GetCluster()) {
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
}

TString error;
std::pair<TString, TString> pathPair;
if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, false)) {
return MakeFuture(ResultFromError<TGenericResult>(error));
}

NKikimrSchemeOp::TModifyScheme tx;
tx.SetWorkingDir(pathPair.first);
tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterBackupCollection);

auto& op = *tx.MutableAlterBackupCollection();
op.SetName(pathPair.second);

// TODO(innokentii): handle settings
// TODO(innokentii): add/remove entries

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
phyTx.MutableSchemeOperation()->MutableAlterBackupCollection()->Swap(&tx);

TGenericResult result;
result.SetSuccess();
return MakeFuture(result);
} else {
return Gateway->ModifyScheme(std::move(tx));
}
}
catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}
}

TFuture<TGenericResult> DropBackupCollection(const TString& cluster, const NYql::TDropBackupCollectionSettings& settings) override {
CHECK_PREPARED_DDL(DropBackupCollection);

try {
if (cluster != SessionCtx->GetCluster()) {
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
}

TString error;
std::pair<TString, TString> pathPair;
if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, false)) {
return MakeFuture(ResultFromError<TGenericResult>(error));
}

NKikimrSchemeOp::TModifyScheme tx;
tx.SetWorkingDir(pathPair.first);
if (settings.Cascade) {
return MakeFuture(ResultFromError<TGenericResult>("Unimplemented"));
} else {
tx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropBackupCollection);
}

auto& op = *tx.MutableDrop();
op.SetName(pathPair.second);

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
phyTx.MutableSchemeOperation()->MutableDropBackupCollection()->Swap(&tx);

TGenericResult result;
result.SetSuccess();
return MakeFuture(result);
} else {
return Gateway->ModifyScheme(std::move(tx));
}
}
catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}
}

TFuture<TGenericResult> CreateUser(const TString& cluster, const TCreateUserSettings& settings) override {
CHECK_PREPARED_DDL(CreateUser);

Expand Down
82 changes: 82 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_datasink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,24 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
return TStatus::Error;
}

TStatus HandleCreateBackupCollection(TKiCreateBackupCollection node, TExprContext& ctx) override {
Y_UNUSED(ctx);
Y_UNUSED(node);
return TStatus::Ok;
}

TStatus HandleAlterBackupCollection(TKiAlterBackupCollection node, TExprContext& ctx) override {
Y_UNUSED(ctx);
Y_UNUSED(node);
return TStatus::Ok;
}

TStatus HandleDropBackupCollection(TKiDropBackupCollection node, TExprContext& ctx) override {
Y_UNUSED(ctx);
Y_UNUSED(node);
return TStatus::Ok;
}

TStatus HandleCreateUser(TKiCreateUser node, TExprContext& ctx) override {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "CreateUser is not yet implemented for intent determination transformer"));
Expand Down Expand Up @@ -353,6 +371,8 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
return TStatus::Ok;
case TKikimrKey::Type::Replication:
return TStatus::Ok;
case TKikimrKey::Type::BackupCollection:
return TStatus::Ok;
}

ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Invalid table key type."));
Expand Down Expand Up @@ -572,6 +592,13 @@ class TKikimrDataSink : public TDataProviderBase
}
}

if (node.IsCallable(TKiCreateBackupCollection::CallableName())
|| node.IsCallable(TKiAlterBackupCollection::CallableName())
|| node.IsCallable(TKiDropBackupCollection::CallableName()))
{
return true;
}

return false;
}

Expand Down Expand Up @@ -1433,6 +1460,49 @@ class TKikimrDataSink : public TDataProviderBase
}
break;
}
case TKikimrKey::Type::BackupCollection: {
auto settings = ParseWriteBackupCollectionSettings(TExprList(node->Child(4)), ctx);
YQL_ENSURE(settings.Mode);
auto mode = settings.Mode.Cast();

if (mode == "create") {
return Build<TKiCreateBackupCollection>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.BackupCollection().Build(key.GetBackupCollectionPath().Name)
.Prefix().Build(key.GetBackupCollectionPath().Prefix)
.Entries(settings.Entries.Cast())
.BackupCollectionSettings(settings.BackupCollectionSettings.Cast())
.Settings(settings.Other)
.Done()
.Ptr();
} else if (mode == "alter") {
return Build<TKiAlterBackupCollection>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.BackupCollection().Build(key.GetBackupCollectionPath().Name)
.Prefix().Build(key.GetBackupCollectionPath().Prefix)
.BackupCollectionSettings(settings.BackupCollectionSettings.Cast())
.Settings(settings.Other)
.Done()
.Ptr();
} else if (mode == "drop") {
return Build<TKiDropBackupCollection>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.BackupCollection().Build(key.GetBackupCollectionPath().Name)
.Prefix().Build(key.GetBackupCollectionPath().Prefix)
.Cascade<TCoAtom>()
.Value(false) // TODO(innokentii): handle cascade drop
.Build()
.Done()
.Ptr();
} else {
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Unknown operation type for backup collection: " << TString(mode)));
return nullptr;
}
break;
}
}

ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "Failed to rewrite IO."));
Expand Down Expand Up @@ -1686,6 +1756,18 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt
return HandleAnalyze(node.Cast(), ctx);
}

if (auto node = TMaybeNode<TKiCreateBackupCollection>(input)) {
return HandleCreateBackupCollection(node.Cast(), ctx);
}

if (auto node = TMaybeNode<TKiAlterBackupCollection>(input)) {
return HandleAlterBackupCollection(node.Cast(), ctx);
}

if (auto node = TMaybeNode<TKiDropBackupCollection>(input)) {
return HandleDropBackupCollection(node.Cast(), ctx);
}

ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "(Kikimr DataSink) Unsupported function: "
<< callable.CallableName()));
return TStatus::Error;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class TKiSourceIntentDeterminationTransformer: public TKiSourceVisitorTransforme
return TStatus::Ok;
case TKikimrKey::Type::Replication:
return TStatus::Ok;
case TKikimrKey::Type::BackupCollection:
return TStatus::Ok;
}

return TStatus::Error;
Expand Down
Loading

0 comments on commit 06b8cb8

Please sign in to comment.