Skip to content

Commit

Permalink
Merge branch 'main' into fuzz-alter
Browse files Browse the repository at this point in the history
  • Loading branch information
poltao committed Jun 1, 2024
2 parents 3c1e8f6 + 45fee94 commit bb045e3
Show file tree
Hide file tree
Showing 23 changed files with 808 additions and 97 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/killme2008/greptime-proto.git", rev = "a15a54a714fe117d7e9f7635e149c4eecac773fa" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ae26136accd82fbdf8be540cd502f2e94951077e" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
6 changes: 3 additions & 3 deletions src/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ impl Client {
Ok(FlightClient { addr, client })
}

pub(crate) fn raw_region_client(&self) -> Result<PbRegionClient<Channel>> {
let (_, channel) = self.find_channel()?;
pub(crate) fn raw_region_client(&self) -> Result<(String, PbRegionClient<Channel>)> {
let (addr, channel) = self.find_channel()?;
let client = PbRegionClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
Ok(client)
Ok((addr, client))
}

pub fn make_prometheus_gateway_client(&self) -> Result<PrometheusGatewayClient<Channel>> {
Expand Down
3 changes: 2 additions & 1 deletion src/client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ pub enum Error {
source: common_grpc::error::Error,
},

#[snafu(display("Failed to request RegionServer, code: {}", code))]
#[snafu(display("Failed to request RegionServer {}, code: {}", addr, code))]
RegionServer {
addr: String,
code: Code,
source: BoxedError,
#[snafu(implicit)]
Expand Down
3 changes: 2 additions & 1 deletion src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl RegionRequester {
.with_label_values(&[request_type.as_str()])
.start_timer();

let mut client = self.client.raw_region_client()?;
let (addr, mut client) = self.client.raw_region_client()?;

let response = client
.handle(request)
Expand All @@ -187,6 +187,7 @@ impl RegionRequester {
let err: error::Error = e.into();
// Uses `Error::RegionServer` instead of `Error::Server`
error::Error::RegionServer {
addr,
code,
source: BoxedError::new(err),
location: location!(),
Expand Down
7 changes: 2 additions & 5 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,7 @@ async fn handle_create_table_task(

Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
table_id: Some(table_id),
..Default::default()
table_ids: vec![table_id],
})
}

Expand Down Expand Up @@ -534,7 +533,6 @@ async fn handle_create_logical_table_tasks(
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
table_ids,
..Default::default()
})
}

Expand Down Expand Up @@ -690,8 +688,7 @@ async fn handle_create_view_task(

Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
table_id: Some(view_id),
..Default::default()
table_ids: vec![view_id],
})
}

Expand Down
10 changes: 1 addition & 9 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,22 +274,17 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
#[derive(Debug, Default)]
pub struct SubmitDdlTaskResponse {
pub key: Vec<u8>,
// For create physical table
// TODO(jeremy): remove it?
pub table_id: Option<TableId>,
// For create multi logical tables
// `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task.
pub table_ids: Vec<TableId>,
}

impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
type Error = error::Error;

fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
let table_id = resp.table_id.map(|t| t.id);
let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
Ok(Self {
key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
table_id,
table_ids,
})
}
Expand All @@ -299,9 +294,6 @@ impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
fn from(val: SubmitDdlTaskResponse) -> Self {
Self {
pid: Some(ProcedureId { key: val.key }),
table_id: val
.table_id
.map(|table_id| api::v1::TableId { id: table_id }),
table_ids: val
.table_ids
.into_iter()
Expand Down
10 changes: 2 additions & 8 deletions src/datanode/src/event_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,13 @@ pub struct RegionServerEventSender(pub(crate) UnboundedSender<RegionServerEvent>
impl RegionServerEventListener for RegionServerEventSender {
fn on_region_registered(&self, region_id: RegionId) {
if let Err(e) = self.0.send(RegionServerEvent::Registered(region_id)) {
error!(
"Failed to send registering region: {region_id} event, source: {}",
e
);
error!(e; "Failed to send registering region: {region_id} event");
}
}

fn on_region_deregistered(&self, region_id: RegionId) {
if let Err(e) = self.0.send(RegionServerEvent::Deregistered(region_id)) {
error!(
"Failed to send deregistering region: {region_id} event, source: {}",
e
);
error!(e; "Failed to send deregistering region: {region_id} event");
}
}
}
Expand Down
15 changes: 9 additions & 6 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,9 @@ impl EngineInner {
let res = FileRegion::create(region_id, request, &self.object_store).await;
let region = res.inspect_err(|err| {
error!(
"Failed to create region, region_id: {}, err: {}",
region_id, err
err;
"Failed to create region, region_id: {}",
region_id
);
})?;
self.regions.write().unwrap().insert(region_id, region);
Expand Down Expand Up @@ -259,8 +260,9 @@ impl EngineInner {
let res = FileRegion::open(region_id, request, &self.object_store).await;
let region = res.inspect_err(|err| {
error!(
"Failed to open region, region_id: {}, err: {}",
region_id, err
err;
"Failed to open region, region_id: {}",
region_id
);
})?;
self.regions.write().unwrap().insert(region_id, region);
Expand Down Expand Up @@ -302,8 +304,9 @@ impl EngineInner {
let res = FileRegion::drop(&region, &self.object_store).await;
res.inspect_err(|err| {
error!(
"Failed to drop region, region_id: {}, err: {}",
region_id, err
err;
"Failed to drop region, region_id: {}",
region_id
);
})?;
}
Expand Down
20 changes: 10 additions & 10 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pub struct FlownodeManager {
table_info_source: TableSource,
frontend_invoker: RwLock<Option<Box<dyn FrontendInvoker + Send + Sync>>>,
/// contains mapping from table name to global id, and table schema
node_context: Mutex<FlownodeContext>,
node_context: RwLock<FlownodeContext>,
flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
tick_manager: FlowTickManager,
Expand Down Expand Up @@ -194,7 +194,7 @@ impl FlownodeManager {
query_engine,
table_info_source: srv_map,
frontend_invoker: RwLock::new(None),
node_context: Mutex::new(node_context),
node_context: RwLock::new(node_context),
flow_err_collectors: Default::default(),
src_send_buf_lens: Default::default(),
tick_manager,
Expand Down Expand Up @@ -298,7 +298,7 @@ impl FlownodeManager {
} else {
// TODO(discord9): condiser remove buggy auto create by schema

let node_ctx = self.node_context.lock().await;
let node_ctx = self.node_context.read().await;
let gid: GlobalId = node_ctx
.table_repr
.get_by_name(&table_name)
Expand Down Expand Up @@ -462,7 +462,7 @@ impl FlownodeManager {
let mut output = BTreeMap::new();
for (name, sink_recv) in self
.node_context
.lock()
.write()
.await
.sink_receiver
.iter_mut()
Expand Down Expand Up @@ -542,11 +542,11 @@ impl FlownodeManager {
}
// first check how many inputs were sent
let (flush_res, buf_len) = if blocking {
let mut ctx = self.node_context.lock().await;
(ctx.flush_all_sender(), ctx.get_send_buf_size())
let ctx = self.node_context.read().await;
(ctx.flush_all_sender().await, ctx.get_send_buf_size().await)
} else {
match self.node_context.try_lock() {
Ok(mut ctx) => (ctx.flush_all_sender(), ctx.get_send_buf_size()),
match self.node_context.try_read() {
Ok(ctx) => (ctx.flush_all_sender().await, ctx.get_send_buf_size().await),
Err(_) => return Ok(()),
}
};
Expand Down Expand Up @@ -580,7 +580,7 @@ impl FlownodeManager {
rows.len()
);
let table_id = region_id.table_id();
self.node_context.lock().await.send(table_id, rows)?;
self.node_context.read().await.send(table_id, rows).await?;
// TODO(discord9): put it in a background task?
// self.run_available(false).await?;
Ok(())
Expand Down Expand Up @@ -628,7 +628,7 @@ impl FlownodeManager {
}
}

let mut node_ctx = self.node_context.lock().await;
let mut node_ctx = self.node_context.write().await;
// assign global id to source and sink table
for source in source_table_ids {
node_ctx
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Flownode for FlownodeManager {
let now = self.tick_manager.tick();

let fetch_order = {
let ctx = self.node_context.lock().await;
let ctx = self.node_context.read().await;
let table_col_names = ctx
.table_repr
.get_by_table_id(&table_id)
Expand Down
48 changes: 29 additions & 19 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_telemetry::debug;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::{broadcast, mpsc, RwLock};

use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::adapter::{FlowId, TableName, TableSource};
Expand Down Expand Up @@ -65,7 +65,7 @@ pub struct FlownodeContext {
#[derive(Debug)]
pub struct SourceSender {
sender: broadcast::Sender<DiffRow>,
send_buf: VecDeque<DiffRow>,
send_buf: RwLock<VecDeque<DiffRow>>,
}

impl Default for SourceSender {
Expand All @@ -78,22 +78,24 @@ impl Default for SourceSender {
}
}

// TODO: make all send operation immut
impl SourceSender {
pub fn get_receiver(&self) -> broadcast::Receiver<DiffRow> {
self.sender.subscribe()
}

/// send as many as possible rows from send buf
/// until send buf is empty or broadchannel is full
pub fn try_send_all(&mut self) -> Result<usize, Error> {
pub async fn try_send_all(&self) -> Result<usize, Error> {
let mut row_cnt = 0;
loop {
let mut send_buf = self.send_buf.write().await;
// if inner sender channel is empty or send buf is empty, there
// is nothing to do for now, just break
if self.sender.len() >= BROADCAST_CAP || self.send_buf.is_empty() {
if self.sender.len() >= BROADCAST_CAP || send_buf.is_empty() {
break;
}
if let Some(row) = self.send_buf.pop_front() {
if let Some(row) = send_buf.pop_front() {
self.sender
.send(row)
.map_err(|err| {
Expand All @@ -108,17 +110,20 @@ impl SourceSender {
}
if row_cnt > 0 {
debug!("Send {} rows", row_cnt);
debug!("Remaining Send buf.len() = {}", self.send_buf.len());
debug!(
"Remaining Send buf.len() = {}",
self.send_buf.read().await.len()
);
}

Ok(row_cnt)
}

/// return number of rows it actual send(including what's in the buffer)
pub fn send_rows(&mut self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf.extend(rows);
pub async fn send_rows(&self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf.write().await.extend(rows);

let row_cnt = self.try_send_all()?;
let row_cnt = self.try_send_all().await?;

Ok(row_cnt)
}
Expand All @@ -128,30 +133,35 @@ impl FlownodeContext {
/// return number of rows it actual send(including what's in the buffer)
///
/// TODO(discord9): make this concurrent
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
pub async fn send(&self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
let sender = self
.source_sender
.get_mut(&table_id)
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
// debug!("FlownodeContext::send: trying to send {} rows", rows.len());
sender.send_rows(rows)
sender.send_rows(rows).await
}

/// flush all sender's buf
///
/// return numbers being sent
pub fn flush_all_sender(&mut self) -> Result<usize, Error> {
self.source_sender
.iter_mut()
.map(|(_table_id, src_sender)| src_sender.try_send_all())
.try_fold(0, |acc, x| x.map(|x| x + acc))
pub async fn flush_all_sender(&self) -> Result<usize, Error> {
let mut sum = 0;
for sender in self.source_sender.values() {
sender.try_send_all().await.inspect(|x| sum += x)?;
}
Ok(sum)
}

/// Return the sum number of rows in all send buf
pub fn get_send_buf_size(&self) -> usize {
self.source_sender.values().map(|v| v.send_buf.len()).sum()
pub async fn get_send_buf_size(&self) -> usize {
let mut sum = 0;
for sender in self.source_sender.values() {
sum += sender.send_buf.read().await.len();
}
sum
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/flow/src/adapter/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ impl<'s> Worker<'s> {
Ok(Some((id, resp))) => {
if let Err(err) = self.itc_server.blocking_lock().resp(id, resp) {
common_telemetry::error!(
"Worker's itc server has been closed unexpectedly, shutting down worker: {}",
err
err;
"Worker's itc server has been closed unexpectedly, shutting down worker"
);
break;
};
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/inverted_index/create/sort/external_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl ExternalSorter {
IntermediateWriter::new(writer).write_all(values, bitmap_leading_zeros as _).await.inspect(|_|
debug!("Dumped {entries} entries ({memory_usage} bytes) to intermediate file {file_id} for index {index_name}")
).inspect_err(|e|
error!("Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}. Error: {e}")
error!(e; "Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}")
)
}

Expand Down
Loading

0 comments on commit bb045e3

Please sign in to comment.