Skip to content

Commit

Permalink
chore: add log inserter
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed May 27, 2024
1 parent a02d0de commit faffb6b
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 69 deletions.
4 changes: 3 additions & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod builder;
mod grpc;
mod influxdb;
mod log_hander;

Check warning on line 18 in src/frontend/src/instance.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"hander" should be "handler".
mod opentsdb;
mod otlp;
mod prom_store;
Expand Down Expand Up @@ -66,7 +67,7 @@ use servers::prometheus_handler::PrometheusHandler;
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{
InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
InfluxdbLineProtocolHandler, LogHander, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,

Check warning on line 70 in src/frontend/src/instance.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"Hander" should be "Handler".
PromStoreProtocolHandler, ScriptHandler,
};
use servers::server::ServerHandlers;
Expand Down Expand Up @@ -100,6 +101,7 @@ pub trait FrontendInstance:
+ OpenTelemetryProtocolHandler
+ ScriptHandler
+ PrometheusHandler
+ LogHander

Check warning on line 104 in src/frontend/src/instance.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"Hander" should be "Handler".
+ Send
+ Sync
+ 'static
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ where
Some(self.instance.clone()),
);

builder = builder
.with_log_ingest_handler(ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()));
builder = builder.with_log_ingest_handler(self.instance.clone());

if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
builder = builder.with_user_provider(user_provider);
Expand Down
152 changes: 120 additions & 32 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ pub struct Inserter {

pub type InserterRef = Arc<Inserter>;

enum TableType {
Logical(String),
Physical,
Log,
}

impl Inserter {
pub fn new(
catalog_manager: CatalogManagerRef,
Expand Down Expand Up @@ -109,7 +115,37 @@ impl Inserter {
validate_column_count_match(&requests)?;

let table_name_to_ids = self
.create_or_alter_tables_on_demand(&requests, &ctx, None, statement_executor)
.create_or_alter_tables_on_demand(
&requests,
&ctx,
TableType::Physical,
statement_executor,
)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref())
.convert(requests)
.await?;

self.do_request(inserts, &ctx).await
}

pub async fn handle_log_inserts(
&self,
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
// remove empty requests
requests.inserts.retain(|req| {
req.rows
.as_ref()
.map(|r| !r.rows.is_empty())
.unwrap_or_default()
});
validate_column_count_match(&requests)?;

let table_name_to_ids = self
.create_or_alter_tables_on_demand(&requests, &ctx, TableType::Log, statement_executor)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref())
.convert(requests)
Expand Down Expand Up @@ -144,7 +180,7 @@ impl Inserter {
.create_or_alter_tables_on_demand(
&requests,
&ctx,
Some(physical_table.to_string()),
TableType::Logical(physical_table.to_string()),
statement_executor,
)
.await?;
Expand Down Expand Up @@ -366,7 +402,7 @@ impl Inserter {
&self,
requests: &RowInsertRequests,
ctx: &QueryContextRef,
on_physical_table: Option<String>,
table_type: TableType,
statement_executor: &StatementExecutor,
) -> Result<HashMap<String, TableId>> {
let mut table_name_to_ids = HashMap::with_capacity(requests.inserts.len());
Expand Down Expand Up @@ -394,42 +430,56 @@ impl Inserter {
}
}

if let Some(on_physical_table) = on_physical_table {
if !create_tables.is_empty() {
// Creates logical tables in batch.
let tables = self
.create_logical_tables(
create_tables,
ctx,
&on_physical_table,
statement_executor,
)
.await?;
match table_type {
TableType::Logical(on_physical_table) => {
if !create_tables.is_empty() {
// Creates logical tables in batch.
let tables = self
.create_logical_tables(
create_tables,
ctx,
&on_physical_table,
statement_executor,
)
.await?;

for table in tables {
for table in tables {
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
}
if !alter_tables.is_empty() {
// Alter logical tables in batch.
statement_executor
.alter_logical_tables(alter_tables, ctx.clone())
.await?;
}
}
TableType::Physical => {
for req in create_tables {
let table = self.create_table(req, ctx, statement_executor).await?;
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
.alter_table_inner(alter_expr, ctx.clone())
.await?;
}
}
if !alter_tables.is_empty() {
// Alter logical tables in batch.
statement_executor
.alter_logical_tables(alter_tables, ctx.clone())
.await?;
}
} else {
for req in create_tables {
let table = self.create_table(req, ctx, statement_executor).await?;
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
.alter_table_inner(alter_expr, ctx.clone())
.await?;
TableType::Log => {
for req in create_tables {
let table = self.create_log_table(req, ctx, statement_executor).await?;
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
.alter_table_inner(alter_expr, ctx.clone())
.await?;
}
}
}

Ok(table_name_to_ids)
}

Expand Down Expand Up @@ -571,6 +621,44 @@ impl Inserter {
}
}

async fn create_log_table(
&self,
req: &RowInsertRequest,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<TableRef> {
let table_ref =
TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name);

let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;

info!("Table `{table_ref}` does not exist, try creating table");
create_table_expr
.table_options
.insert("append_mode".to_string(), "true".to_string());
let res = statement_executor
.create_table_inner(create_table_expr, None, ctx.clone())
.await;

match res {
Ok(table) => {
info!(
"Successfully created table {}.{}.{}",
table_ref.catalog, table_ref.schema, table_ref.table,
);
Ok(table)
}
Err(err) => {
error!(
"Failed to create table {}.{}.{}: {}",
table_ref.catalog, table_ref.schema, table_ref.table, err
);
Err(err)
}
}
}

async fn create_logical_tables(
&self,
create_tables: Vec<&RowInsertRequest>,
Expand Down
11 changes: 5 additions & 6 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,10 @@ use crate::http::prometheus::{
use crate::metrics::http_metrics_layer;
use crate::metrics_handler::MetricsHandler;
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::query_handler::{
InfluxdbLineProtocolHandlerRef, OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef,
PromStoreProtocolHandlerRef, ScriptHandlerRef,
InfluxdbLineProtocolHandlerRef, LogHandlerRef, OpenTelemetryProtocolHandlerRef,
OpentsdbProtocolHandlerRef, PromStoreProtocolHandlerRef, ScriptHandlerRef,
};
use crate::server::Server;

Expand Down Expand Up @@ -589,7 +588,7 @@ impl HttpServerBuilder {
}

// FIXME(qtang): This is a temporary solution to handle the log ingest metrics.
pub fn with_log_ingest_handler(self, handler: ServerGrpcQueryHandlerRef) -> Self {
pub fn with_log_ingest_handler(self, handler: LogHandlerRef) -> Self {
Self {
router: self.router.nest(
&format!("/{HTTP_API_VERSION}/event"),
Expand Down Expand Up @@ -711,10 +710,10 @@ impl HttpServer {
.with_state(metrics_handler)
}

fn route_log<S>(grpc_handler: ServerGrpcQueryHandlerRef) -> Router<S> {
fn route_log<S>(log_handler: LogHandlerRef) -> Router<S> {
Router::new()
.route("/logs", routing::post(handler::log_ingester))
.with_state(grpc_handler)
.with_state(log_handler)
}

fn route_sql<S>(api_state: ApiState) -> ApiRouter<S> {
Expand Down
56 changes: 28 additions & 28 deletions src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ use crate::http::{
HttpResponse, ResponseFormat,
};
use crate::metrics_handler::MetricsHandler;
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::query_handler::LogHandlerRef;

#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct SqlQuery {
Expand All @@ -73,7 +73,7 @@ pub struct SqlQuery {
/// handler to log ingester
#[axum_macros::debug_handler]
pub async fn log_ingester(
State(_state): State<ServerGrpcQueryHandlerRef>,
State(_state): State<LogHandlerRef>,
Query(_query_params): Query<SqlQuery>,
Extension(_query_ctx): Extension<QueryContextRef>,
Json(_payload): Json<Value>,
Expand All @@ -85,34 +85,34 @@ pub async fn log_ingester(
// state.do_query(payload, query_ctx).await.to_string();
let _processors = _payload["processors"].as_str().unwrap();
let _data = _payload["data"].as_array().unwrap();
let mut rows = Rows::default();

// need a ColumnSchema for rows
rows.schema = vec![
ColumnSchema {
column_name: "log".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
datatype_extension: None,
},
ColumnSchema {
column_name: "ts".to_string(),
datatype: ColumnDataType::TimestampSecond as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
},
];
let mut rows = Rows {
schema: vec![
ColumnSchema {
column_name: "log".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
datatype_extension: None,
},
ColumnSchema {
column_name: "ts".to_string(),
datatype: ColumnDataType::TimestampSecond as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
},
],
..Default::default()
};

for row in _data {
let _row = row.as_str().unwrap();
let mut value = ApiValue::default();
value.value_data = Some(ValueData::StringValue(_row.to_string()));
let value = ApiValue {
value_data: Some(ValueData::StringValue(_row.to_string())),
};
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let now =
since_the_epoch.as_secs();
let now = since_the_epoch.as_secs();
rows.rows.push(Row {
values: vec![
value,
Expand All @@ -122,14 +122,14 @@ pub async fn log_ingester(
],
})
}
let mut insert_request = RowInsertRequest::default();
insert_request.rows = Some(rows);
insert_request.table_name = "log".to_string();
let insert_request = RowInsertRequest {
rows: Some(rows),
table_name: "log".to_string(),
};
let insert_requests = RowInsertRequests {
inserts: vec![insert_request],
};
let test_insert_request = api::v1::greptime_request::Request::RowInserts(insert_requests);
let insert_result = _state.do_query(test_insert_request, _query_ctx).await;
let insert_result = _state.insert_log(insert_requests, _query_ctx).await;
match insert_result {
Ok(_) => String::from("ok"),
Err(e) => {
Expand Down
7 changes: 7 additions & 0 deletions src/servers/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler +
pub type PromStoreProtocolHandlerRef = Arc<dyn PromStoreProtocolHandler + Send + Sync>;
pub type OpenTelemetryProtocolHandlerRef = Arc<dyn OpenTelemetryProtocolHandler + Send + Sync>;
pub type ScriptHandlerRef = Arc<dyn ScriptHandler + Send + Sync>;
pub type LogHandlerRef = Arc<dyn LogHander + Send + Sync>;

Check warning on line 51 in src/servers/src/query_handler.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"Hander" should be "Handler".

#[async_trait]
pub trait ScriptHandler {
Expand Down Expand Up @@ -118,3 +119,9 @@ pub trait OpenTelemetryProtocolHandler {
ctx: QueryContextRef,
) -> Result<Output>;
}

#[async_trait]

pub trait LogHander {

Check warning on line 125 in src/servers/src/query_handler.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"Hander" should be "Handler".
async fn insert_log(&self, log: RowInsertRequests, ctx: QueryContextRef) -> Result<Output>;
}

0 comments on commit faffb6b

Please sign in to comment.