Skip to content

Commit

Permalink
Merge branch 'main' into feature/key-column-usage
Browse files Browse the repository at this point in the history
  • Loading branch information
dimbtp authored Dec 31, 2023
2 parents aaeaff7 + 6070e88 commit 0e1d37a
Show file tree
Hide file tree
Showing 46 changed files with 1,067 additions and 262 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/doc-label.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
sync-labels: 1
- name: create an issue in doc repo
uses: dacbd/create-issue-action@main
if: ${{ contains(github.event.pull_request.body, '- [ ] This PR does not require documentation updates.') }}
if: ${{ github.event.action == 'opened' && contains(github.event.pull_request.body, '- [ ] This PR does not require documentation updates.') }}
with:
owner: GreptimeTeam
repo: docs
Expand Down
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ prost = "0.12"
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" }
rand = "0.8"
regex = "1.8"
regex-automata = { version = "0.1", features = ["transducer"] }
regex-automata = { version = "0.2", features = ["transducer"] }
reqwest = { version = "0.11", default-features = false, features = [
"json",
"rustls-tls-native-roots",
Expand Down Expand Up @@ -169,6 +169,7 @@ datanode = { path = "src/datanode" }
datatypes = { path = "src/datatypes" }
file-engine = { path = "src/file-engine" }
frontend = { path = "src/frontend" }
index = { path = "src/index" }
log-store = { path = "src/log-store" }
meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
Expand Down
2 changes: 2 additions & 0 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ lazy_static! {
COLLATION_CHARACTER_SET_APPLICABILITY,
CHECK_CONSTRAINTS,
EVENTS,
FILES,
];
}

Expand Down Expand Up @@ -181,6 +182,7 @@ impl InformationSchemaProvider {
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
FILES => setup_memory_table!(FILES),
SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
Expand Down
44 changes: 44 additions & 0 deletions src/catalog/src/information_schema/memory_table/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,50 @@ pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>) {
vec![],
),

FILES => (
vec![
bigint_column("FILE_ID"),
string_column("FILE_NAME"),
string_column("FILE_TYPE"),
string_column("TABLESPACE_NAME"),
string_column("TABLE_CATALOG"),
string_column("TABLE_SCHEMA"),
string_column("TABLE_NAME"),
string_column("LOGFILE_GROUP_NAME"),
bigint_column("LOGFILE_GROUP_NUMBER"),
string_column("ENGINE"),
string_column("FULLTEXT_KEYS"),
bigint_column("DELETED_ROWS"),
bigint_column("UPDATE_COUNT"),
bigint_column("FREE_EXTENTS"),
bigint_column("TOTAL_EXTENTS"),
bigint_column("EXTENT_SIZE"),
bigint_column("INITIAL_SIZE"),
bigint_column("MAXIMUM_SIZE"),
bigint_column("AUTOEXTEND_SIZE"),
datetime_column("CREATION_TIME"),
datetime_column("LAST_UPDATE_TIME"),
datetime_column("LAST_ACCESS_TIME"),
datetime_column("RECOVER_TIME"),
bigint_column("TRANSACTION_COUNTER"),
string_column("VERSION"),
string_column("ROW_FORMAT"),
bigint_column("TABLE_ROWS"),
bigint_column("AVG_ROW_LENGTH"),
bigint_column("DATA_LENGTH"),
bigint_column("MAX_DATA_LENGTH"),
bigint_column("INDEX_LENGTH"),
bigint_column("DATA_FREE"),
datetime_column("CREATE_TIME"),
datetime_column("UPDATE_TIME"),
datetime_column("CHECK_TIME"),
string_column("CHECKSUM"),
string_column("STATUS"),
string_column("EXTRA"),
],
vec![],
),

_ => unreachable!("Unknown table in information_schema: {}", table_name),
};

Expand Down
1 change: 1 addition & 0 deletions src/catalog/src/information_schema/table_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ pub const COLLATION_CHARACTER_SET_APPLICABILITY: &str = "collation_character_set
pub const CHECK_CONSTRAINTS: &str = "check_constraints";
pub const EVENTS: &str = "events";
pub const KEY_COLUMN_USAGE: &str = "key_column_usage";
pub const FILES: &str = "files";
pub const SCHEMATA: &str = "schemata";
6 changes: 4 additions & 2 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ pub const INFORMATION_SCHEMA_COLLATION_CHARACTER_SET_APPLICABILITY_TABLE_ID: u32
pub const INFORMATION_SCHEMA_CHECK_CONSTRAINTS_TABLE_ID: u32 = 12;
/// id for information_schema.EVENTS
pub const INFORMATION_SCHEMA_EVENTS_TABLE_ID: u32 = 13;
/// id for information_schema.FILES
pub const INFORMATION_SCHEMA_FILES_TABLE_ID: u32 = 14;
/// id for information_schema.SCHEMATA
pub const INFORMATION_SCHEMA_SCHEMATA_TABLE_ID: u32 = 14;
pub const INFORMATION_SCHEMA_SCHEMATA_TABLE_ID: u32 = 15;
/// id for information_schema.KEY_COLUMN_USAGE
pub const INFORMATION_SCHEMA_KEY_COLUMN_USAGE_TABLE_ID: u32 = 15;
pub const INFORMATION_SCHEMA_KEY_COLUMN_USAGE_TABLE_ID: u32 = 16;
/// ----- End of information_schema tables -----
pub const MITO_ENGINE: &str = "mito";
Expand Down
2 changes: 1 addition & 1 deletion src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct KafkaConfig {
#[serde(skip)]
#[serde(default)]
pub compression: RsKafkaCompression,
/// The maximum log size a kakfa batch producer could buffer.
/// The maximum log size a kafka batch producer could buffer.
pub max_batch_size: ReadableSize,
/// The linger duration of a kafka batch producer.
#[serde(with = "humantime_serde")]
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl AlterTableProcedure {
.await?
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
let region_routes = table_route.region_routes();
let region_routes = table_route.region_routes()?;

let leaders = find_leaders(region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl CreateTableProcedure {
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
let region_routes = physical_table_route.region_routes();
let region_routes = physical_table_route.region_routes()?;

let request_builder = self.new_region_request_builder(Some(physical_table_id))?;

Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl DropTableProcedure {

/// Register dropping regions if doesn't exist.
fn register_dropping_regions(&mut self) -> Result<()> {
let region_routes = self.data.region_routes();
let region_routes = self.data.region_routes()?;

let dropping_regions = operating_leader_regions(region_routes);

Expand Down Expand Up @@ -190,7 +190,7 @@ impl DropTableProcedure {
pub async fn on_datanode_drop_regions(&self) -> Result<Status> {
let table_id = self.data.table_id();

let region_routes = &self.data.region_routes();
let region_routes = &self.data.region_routes()?;
let leaders = find_leaders(region_routes);
let mut drop_region_tasks = Vec::with_capacity(leaders.len());

Expand Down Expand Up @@ -306,7 +306,7 @@ impl DropTableData {
self.task.table_ref()
}

fn region_routes(&self) -> &Vec<RegionRoute> {
fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
self.table_route_value.region_routes()
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async fn handle_truncate_table_task(
let table_route_value =
table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?;

let table_route = table_route_value.into_inner().region_routes().clone();
let table_route = table_route_value.into_inner().region_routes()?.clone();

let id = ddl_manager
.submit_truncate_table_task(
Expand Down
6 changes: 5 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ pub enum Error {

#[snafu(display("The topic pool is empty"))]
EmptyTopicPool { location: Location },

#[snafu(display("Unexpected table route type: {}", err_msg))]
UnexpectedLogicalRouteTable { location: Location, err_msg: String },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -392,7 +395,8 @@ impl ErrorExt for Error {
| BuildKafkaPartitionClient { .. }
| ProduceRecord { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. } => StatusCode::Unexpected,
| EmptyTopicPool { .. }
| UnexpectedLogicalRouteTable { .. } => StatusCode::Unexpected,

SendMessage { .. }
| GetKvCache { .. }
Expand Down
42 changes: 25 additions & 17 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ impl TableMetadataManager {
.build_delete_txn(table_id, table_info_value)?;

// Deletes datanode table key value pairs.
let distribution = region_distribution(table_route_value.region_routes())?;
let distribution = region_distribution(table_route_value.region_routes()?)?;
let delete_datanode_txn = self
.datanode_table_manager()
.build_delete_txn(table_id, distribution)?;
Expand Down Expand Up @@ -608,7 +608,7 @@ impl TableMetadataManager {
) -> Result<()> {
// Updates the datanode table key value pairs.
let current_region_distribution =
region_distribution(current_table_route_value.region_routes())?;
region_distribution(current_table_route_value.region_routes()?)?;
let new_region_distribution = region_distribution(&new_region_routes)?;

let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
Expand All @@ -621,7 +621,7 @@ impl TableMetadataManager {
)?;

// Updates the table_route.
let new_table_route_value = current_table_route_value.update(new_region_routes);
let new_table_route_value = current_table_route_value.update(new_region_routes)?;

let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
Expand Down Expand Up @@ -656,7 +656,7 @@ impl TableMetadataManager {
where
F: Fn(&RegionRoute) -> Option<Option<RegionStatus>>,
{
let mut new_region_routes = current_table_route_value.region_routes().clone();
let mut new_region_routes = current_table_route_value.region_routes()?.clone();

let mut updated = 0;
for route in &mut new_region_routes {
Expand All @@ -673,7 +673,7 @@ impl TableMetadataManager {
}

// Updates the table_route.
let new_table_route_value = current_table_route_value.update(new_region_routes);
let new_table_route_value = current_table_route_value.update(new_region_routes)?;

let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
Expand Down Expand Up @@ -897,7 +897,11 @@ mod tests {
table_info
);
assert_eq!(
remote_table_route.unwrap().into_inner().region_routes(),
remote_table_route
.unwrap()
.into_inner()
.region_routes()
.unwrap(),
region_routes
);
}
Expand Down Expand Up @@ -978,7 +982,7 @@ mod tests {
.unwrap()
.unwrap()
.into_inner();
assert_eq!(removed_table_route.region_routes(), region_routes);
assert_eq!(removed_table_route.region_routes().unwrap(), region_routes);
}

#[tokio::test]
Expand Down Expand Up @@ -1173,11 +1177,11 @@ mod tests {
.unwrap();

assert_eq!(
updated_route_value.region_routes()[0].leader_status,
updated_route_value.region_routes().unwrap()[0].leader_status,
Some(RegionStatus::Downgraded)
);
assert_eq!(
updated_route_value.region_routes()[1].leader_status,
updated_route_value.region_routes().unwrap()[1].leader_status,
Some(RegionStatus::Downgraded)
);
}
Expand Down Expand Up @@ -1271,7 +1275,8 @@ mod tests {
let current_table_route_value = DeserializedValueWithBytes::from_inner(
current_table_route_value
.inner
.update(new_region_routes.clone()),
.update(new_region_routes.clone())
.unwrap(),
);
let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
// it should be ok.
Expand All @@ -1295,13 +1300,16 @@ mod tests {

// if the current_table_route_value is wrong, it should return an error.
// The ABA problem.
let wrong_table_route_value =
DeserializedValueWithBytes::from_inner(current_table_route_value.update(vec![
new_region_route(1, 1),
new_region_route(2, 2),
new_region_route(3, 3),
new_region_route(4, 4),
]));
let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
current_table_route_value
.update(vec![
new_region_route(1, 1),
new_region_route(2, 2),
new_region_route(3, 3),
new_region_route(4, 4),
])
.unwrap(),
);
assert!(table_metadata_manager
.update_table_route(
table_id,
Expand Down
Loading

0 comments on commit 0e1d37a

Please sign in to comment.