Skip to content

Commit

Permalink
Initial cluster configuration
Browse files Browse the repository at this point in the history
## Summary:
Initial cluster configuration and handling
- Defines cluster config structure
- Regularly fetch cluster config from the metadata store
- Single node uses replicated loglet by default
- Proper tooling to reconfigure the cluster

## Configuration file
Introduce a new configuration
```toml
cluster-configuration = "empty|default"
```
By setting this to "default" (that the the default value)
the node seeds the cluster configuration with default config

Setting `cluster-configuration` to empty. Starts the node
with empty cluster configuration. In that case, the node
does not provision the partition table or the logs until the
configuration is set explicitly via `restatectl cluster config set`

Once configuration is set, the configuration of the cluster auto
resumes. No extra actions are needed.

## `cluster config get/set`

`restatectl cluster config` gives users a simple interface to inspect and update
configuration

### Example

```
restatectl cluster config get

⚙️ Version: 1:
―――――――――――――
num-partitions = 24
default-provider = "local"
nodeset-selection-strategy = "strict-fault-tolerant-greedy"
partition-processor-replication-strategy = "on-all-nodes"

[replication-property]
node = 1
```

To change the config, create a new toml file with following content
cluster-config-example.toml

```toml
num-partitions = 24
default-provider = "local"
nodeset-selection-strategy = "strict-fault-tolerant-greedy"
[partition-processor-replication-strategy]
factor = 2

[replication-property]
node = 2
```

Then run

```
restatectl cluster config set cluster-config-example.toml
  num-partitions = 24
  default-provider = "local"
  nodeset-selection-strategy = "strict-fault-tolerant-greedy"
 -partition-processor-replication-strategy = "on-all-nodes"

  [replication-property]
 -node = 1
 +node = 2
 +
 +[partition-processor-replication-strategy]
 +factor = 2


  💡   Please note that changes in configuration can take up to 5 seconds
       before they are seen by all nodes.

       If Configuration is being set for the first time. The system will initialize the logs with the
       given configuration.

       Changes of configuration does not automatically trigger a reconfigure of the logs. Reconfiguration
       will happen when detected to be needed by the system or by using the `restatectl cluster reconfigure`
       command.

? Apply changes? (y/n) › no
```
  • Loading branch information
muhamadazmy committed Nov 27, 2024
1 parent 07882c3 commit e050319
Show file tree
Hide file tree
Showing 28 changed files with 1,493 additions and 270 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ tempfile = "3.6.0"
test-log = { version = "0.2.11", default-features = false, features = [
"trace",
] }
toml = { version = "0.8.12" }
# tikv-jemallocator has not yet been released with musl target support, so we pin a main commit
tikv-jemallocator = { git = "https://github.com/restatedev/jemallocator", rev = "7c32f6e3d6ad5e4e492cc08d6bdb8307acf9afa0", default-features = false }
thiserror = "1.0"
Expand Down
40 changes: 31 additions & 9 deletions crates/admin/protobuf/cluster_ctrl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,41 @@ import "google/protobuf/empty.proto";
package restate.cluster_ctrl;

service ClusterCtrlSvc {
rpc GetClusterState(ClusterStateRequest) returns(ClusterStateResponse);
rpc GetClusterState(ClusterStateRequest) returns (ClusterStateResponse);

rpc ListLogs(ListLogsRequest) returns(ListLogsResponse);
rpc ListLogs(ListLogsRequest) returns (ListLogsResponse);

rpc DescribeLog(DescribeLogRequest) returns(DescribeLogResponse);
rpc DescribeLog(DescribeLogRequest) returns (DescribeLogResponse);

rpc ListNodes(ListNodesRequest) returns(ListNodesResponse);
rpc ListNodes(ListNodesRequest) returns (ListNodesResponse);

rpc TrimLog(TrimLogRequest) returns(google.protobuf.Empty);
rpc TrimLog(TrimLogRequest) returns (google.protobuf.Empty);

rpc CreatePartitionSnapshot(CreatePartitionSnapshotRequest)
returns(CreatePartitionSnapshotResponse);
returns (CreatePartitionSnapshotResponse);

rpc SealAndExtendChain(SealAndExtendChainRequest)
returns(SealAndExtendChainResponse);
returns (SealAndExtendChainResponse);

rpc FindTail(FindTailRequest) returns(FindTailResponse);
rpc FindTail(FindTailRequest) returns (FindTailResponse);

rpc GetClusterConfiguration(GetClusterConfigurationRequest)
returns (GetClusterConfigurationResponse);

rpc SetClusterConfiguration(SetClusterConfigurationRequest)
returns (SetClusterConfigurationResponse);
}

message SetClusterConfigurationResponse {}
message SetClusterConfigurationRequest {
uint32 expected_version = 1;
restate.cluster.ClusterConfiguration cluster_configuration = 2;
}

message GetClusterConfigurationRequest {}
message GetClusterConfigurationResponse {
uint32 version = 1;
restate.cluster.ClusterConfiguration cluster_configuration = 2;
}

message ClusterStateRequest {}
Expand All @@ -46,7 +64,11 @@ message ListLogsResponse {
bytes logs = 1;
}

enum TailState { TailState_UNKNOWN = 0; OPEN = 1; SEALED = 2; }
enum TailState {
TailState_UNKNOWN = 0;
OPEN = 1;
SEALED = 2;
}

message DescribeLogRequest { uint32 log_id = 1; }

Expand Down
93 changes: 91 additions & 2 deletions crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@
use std::time::Duration;

use bytes::{Bytes, BytesMut};
use restate_types::cluster_controller::{ClusterConfiguration, ClusterConfigurationSeed};
use tonic::{async_trait, Request, Response, Status};
use tracing::info;

use restate_bifrost::{Bifrost, BifrostAdmin, Error as BiforstError};
use restate_core::MetadataWriter;
use restate_metadata_store::MetadataStoreClient;
use restate_metadata_store::{MetadataStoreClient, Precondition};
use restate_types::identifiers::PartitionId;
use restate_types::logs::metadata::{Logs, ProviderKind, SegmentIndex};
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, NODES_CONFIG_KEY};
use restate_types::metadata_store::keys::{
BIFROST_CONFIG_KEY, CLUSTER_CONFIG_KEY, NODES_CONFIG_KEY,
};
use restate_types::nodes_config::NodesConfiguration;
use restate_types::storage::{StorageCodec, StorageEncode};
use restate_types::{Version, Versioned};
Expand All @@ -34,6 +37,10 @@ use crate::cluster_controller::protobuf::{
TrimLogRequest,
};

use super::protobuf::{
GetClusterConfigurationRequest, GetClusterConfigurationResponse,
SetClusterConfigurationRequest, SetClusterConfigurationResponse,
};
use super::ClusterControllerHandle;

pub(crate) struct ClusterCtrlSvcHandler {
Expand Down Expand Up @@ -286,6 +293,88 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {

Ok(Response::new(response))
}

async fn get_cluster_configuration(
&self,
_request: tonic::Request<GetClusterConfigurationRequest>,
) -> Result<Response<GetClusterConfigurationResponse>, Status> {
// todo(azmy): cluster configuration manager is buried deep
// since it's only usable by a `leader` admin. We still can do direct
// config set/get here.

let config: ClusterConfiguration = self
.metadata_store_client
.get(CLUSTER_CONFIG_KEY.clone())
.await
.map_err(|err| Status::internal(err.to_string()))?
.ok_or_else(|| Status::not_found("configuration not set"))?;

let response = GetClusterConfigurationResponse {
version: config.version.into(),
cluster_configuration: Some(config.configuration.into()),
};

Ok(Response::new(response))
}

async fn set_cluster_configuration(
&self,
request: Request<SetClusterConfigurationRequest>,
) -> Result<Response<SetClusterConfigurationResponse>, Status> {
let request = request.into_inner();

let expected_version: Version = request.expected_version.into();
let configuration = request
.cluster_configuration
.ok_or_else(|| Status::invalid_argument("ClusterConfiguration is required fields"))?;

let configuration = ClusterConfigurationSeed::try_from(configuration)
.map_err(|err| Status::invalid_argument(err.to_string()))?;

// verify changes. we need to get current config
if let Some(current) = self
.metadata_store_client
.get::<ClusterConfiguration>(CLUSTER_CONFIG_KEY.clone())
.await
.map_err(|err| Status::internal(err.to_string()))?
{
// changing number of partitions is not supported at the moment
if current.num_partitions != configuration.num_partitions {
return Err(Status::invalid_argument(
"Changing number of partitions is not supported at the moment.",
));
}

// change of provider type should not also be supported.
// Normally this is because updating from `local` to `replicated`
// is not possible atm without introducing too much complexity
// it's also not safe to go from replicated to local. hence
// it better to prevent changing the provider-kind completely
if current.default_provider != configuration.default_provider {
return Err(Status::invalid_argument(
"Changing default provider is not supported at the moment.",
));
}
}

let configuration = ClusterConfiguration {
version: expected_version.next(),
configuration,
};

let precondition = if expected_version == Version::INVALID {
Precondition::DoesNotExist
} else {
Precondition::MatchesVersion(expected_version)
};

self.metadata_store_client
.put(CLUSTER_CONFIG_KEY.clone(), &configuration, precondition)
.await
.map_err(|err| Status::internal(err.to_string()))?;

Ok(Response::new(SetClusterConfigurationResponse {}))
}
}

fn serialize_value<T: StorageEncode>(value: T) -> Bytes {
Expand Down
Loading

0 comments on commit e050319

Please sign in to comment.