Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into zhongzc/index-integra…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
zhongzc committed Jan 3, 2024
2 parents 3ed9b30 + b9302e4 commit 1a963a4
Show file tree
Hide file tree
Showing 100 changed files with 3,828 additions and 915 deletions.
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ GT_GCS_BUCKET = GCS bucket
GT_GCS_SCOPE = GCS scope
GT_GCS_CREDENTIAL_PATH = GCS credential path
GT_GCS_ENDPOINT = GCS end point
# Settings for kafka wal test
GT_KAFKA_ENDPOINTS = localhost:9092
31 changes: 31 additions & 0 deletions .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,37 @@ jobs:
path: ${{ runner.temp }}/greptime-*.log
retention-days: 3

sqlness-kafka-wal:
name: Sqlness Test with Kafka Wal
if: github.event.pull_request.draft == false
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-20.04-8-cores ]
timeout-minutes: 60
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
- name: Setup kafka server
working-directory: tests-integration/fixtures/kafka
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Run sqlness
run: cargo sqlness -w kafka -k 127.0.0.1:9092
- name: Upload sqlness logs
if: always()
uses: actions/upload-artifact@v3
with:
name: sqlness-logs
path: ${{ runner.temp }}/greptime-*.log
retention-days: 3

fmt:
name: Rustfmt
if: github.event.pull_request.draft == false
Expand Down
11 changes: 11 additions & 0 deletions .github/workflows/doc-label.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,14 @@ jobs:
enable-versioned-regex: false
repo-token: ${{ secrets.GITHUB_TOKEN }}
sync-labels: 1
- name: create an issue in doc repo
uses: dacbd/create-issue-action@main
if: ${{ github.event.action == 'opened' && contains(github.event.pull_request.body, '- [ ] This PR does not require documentation updates.') }}
with:
owner: GreptimeTeam
repo: docs
token: ${{ secrets.DOCS_REPO_TOKEN }}
title: Update docs for ${{ github.event.issue.title || github.event.pull_request.title }}
body: |
A document change request is generated from
${{ github.event.issue.html_url || github.event.pull_request.html_url }}
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ sync_write = false
# broker_endpoints = ["127.0.0.1:9092"]
# max_batch_size = "4MB"
# linger = "200ms"
# produce_record_timeout = "100ms"
# consumer_wait_timeout = "100ms"
# backoff_init = "500ms"
# backoff_max = "10s"
# backoff_base = 2
Expand Down
20 changes: 10 additions & 10 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,29 +100,29 @@ provider = "raft_engine"
# Available selector types:
# - "round_robin" (default)
# selector_type = "round_robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
# The prefix of topic name.
# topic_name_prefix = "greptimedb_wal_topic"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# The number of replicas of each partition.
# replication_factor = 1

# The maximum log size a kafka batch producer could buffer.
# The max size of a single producer batch.
# max_batch_size = "4MB"
# The linger duration of a kafka batch producer.
# The linger duration.
# linger = "200ms"
# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
# produce_record_timeout = "100ms"
# Above which a topic creation operation will be cancelled.
# The consumer wait timeout.
# consumer_wait_timeout = "100ms"
# Create topic timeout.
# create_topic_timeout = "30s"

# The initial backoff for kafka clients.
# The initial backoff delay.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
# The maximum backoff delay.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# The deadline of retries.
# backoff_deadline = "5mins"

# WAL data directory
Expand Down
39 changes: 39 additions & 0 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.

mod columns;
mod key_column_usage;
mod memory_table;
mod schemata;
mod table_names;
mod tables;

Expand All @@ -40,7 +42,9 @@ pub use table_names::*;

use self::columns::InformationSchemaColumns;
use crate::error::Result;
use crate::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
use crate::information_schema::memory_table::{get_schema_columns, MemoryTable};
use crate::information_schema::schemata::InformationSchemaSchemata;
use crate::information_schema::tables::InformationSchemaTables;
use crate::CatalogManager;

Expand All @@ -56,6 +60,17 @@ lazy_static! {
COLLATION_CHARACTER_SET_APPLICABILITY,
CHECK_CONSTRAINTS,
EVENTS,
FILES,
OPTIMIZER_TRACE,
PARAMETERS,
PROFILING,
REFERENTIAL_CONSTRAINTS,
ROUTINES,
SCHEMA_PRIVILEGES,
TABLE_PRIVILEGES,
TRIGGERS,
GLOBAL_STATUS,
SESSION_STATUS,
];
}

Expand Down Expand Up @@ -126,7 +141,12 @@ impl InformationSchemaProvider {
fn build_tables(&mut self) {
let mut tables = HashMap::new();
tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
tables.insert(
KEY_COLUMN_USAGE.to_string(),
self.build_table(KEY_COLUMN_USAGE).unwrap(),
);

// Add memory tables
for name in MEMORY_TABLES.iter() {
Expand Down Expand Up @@ -168,6 +188,25 @@ impl InformationSchemaProvider {
}
CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
EVENTS => setup_memory_table!(EVENTS),
FILES => setup_memory_table!(FILES),
OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
PARAMETERS => setup_memory_table!(PARAMETERS),
PROFILING => setup_memory_table!(PROFILING),
REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
ROUTINES => setup_memory_table!(ROUTINES),
SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
TRIGGERS => setup_memory_table!(TRIGGERS),
GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
_ => None,
}
}
Expand Down
45 changes: 40 additions & 5 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ const TABLE_NAME: &str = "table_name";
const COLUMN_NAME: &str = "column_name";
const DATA_TYPE: &str = "data_type";
const SEMANTIC_TYPE: &str = "semantic_type";
const COLUMN_DEFAULT: &str = "column_default";
const IS_NULLABLE: &str = "is_nullable";
const COLUMN_TYPE: &str = "column_type";
const COLUMN_COMMENT: &str = "column_comment";

impl InformationSchemaColumns {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Expand All @@ -69,6 +73,10 @@ impl InformationSchemaColumns {
ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(DATA_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(SEMANTIC_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_DEFAULT, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(IS_NULLABLE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_COMMENT, ConcreteDataType::string_datatype(), true),
]))
}

Expand Down Expand Up @@ -126,6 +134,11 @@ struct InformationSchemaColumnsBuilder {
column_names: StringVectorBuilder,
data_types: StringVectorBuilder,
semantic_types: StringVectorBuilder,

column_defaults: StringVectorBuilder,
is_nullables: StringVectorBuilder,
column_types: StringVectorBuilder,
column_comments: StringVectorBuilder,
}

impl InformationSchemaColumnsBuilder {
Expand All @@ -144,6 +157,10 @@ impl InformationSchemaColumnsBuilder {
column_names: StringVectorBuilder::with_capacity(42),
data_types: StringVectorBuilder::with_capacity(42),
semantic_types: StringVectorBuilder::with_capacity(42),
column_defaults: StringVectorBuilder::with_capacity(42),
is_nullables: StringVectorBuilder::with_capacity(42),
column_types: StringVectorBuilder::with_capacity(42),
column_comments: StringVectorBuilder::with_capacity(42),
}
}

Expand Down Expand Up @@ -187,9 +204,8 @@ impl InformationSchemaColumnsBuilder {
&catalog_name,
&schema_name,
&table_name,
&column.name,
&column.data_type.name(),
semantic_type,
column,
);
}
} else {
Expand All @@ -206,16 +222,31 @@ impl InformationSchemaColumnsBuilder {
catalog_name: &str,
schema_name: &str,
table_name: &str,
column_name: &str,
data_type: &str,
semantic_type: &str,
column_schema: &ColumnSchema,
) {
let data_type = &column_schema.data_type.name();

self.catalog_names.push(Some(catalog_name));
self.schema_names.push(Some(schema_name));
self.table_names.push(Some(table_name));
self.column_names.push(Some(column_name));
self.column_names.push(Some(&column_schema.name));
self.data_types.push(Some(data_type));
self.semantic_types.push(Some(semantic_type));
self.column_defaults.push(
column_schema
.default_constraint()
.map(|s| format!("{}", s))
.as_deref(),
);
if column_schema.is_nullable() {
self.is_nullables.push(Some("Yes"));
} else {
self.is_nullables.push(Some("No"));
}
self.column_types.push(Some(data_type));
self.column_comments
.push(column_schema.column_comment().map(|x| x.as_ref()));
}

fn finish(&mut self) -> Result<RecordBatch> {
Expand All @@ -226,6 +257,10 @@ impl InformationSchemaColumnsBuilder {
Arc::new(self.column_names.finish()),
Arc::new(self.data_types.finish()),
Arc::new(self.semantic_types.finish()),
Arc::new(self.column_defaults.finish()),
Arc::new(self.is_nullables.finish()),
Arc::new(self.column_types.finish()),
Arc::new(self.column_comments.finish()),
];

RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
Expand Down
Loading

0 comments on commit 1a963a4

Please sign in to comment.