Skip to content

Commit

Permalink
add config for easy of testing
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Mar 5, 2024
1 parent cefa584 commit 8e3c736
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 6 deletions.
4 changes: 2 additions & 2 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ if ${is_not_ci}
no_rust_log = not ${rust_log}
if ${no_rust_log}
set_env RUST_LOG "pgwire_query_log=info"
set_env RUST_LOG "pgwire_query_log=info,hyper::client::connect::http=info"
else
set_env RUST_LOG "pgwire_query_log=info,${rust_log}"
set_env RUST_LOG "pgwire_query_log=info,hyper::client::connect::http=info,${rust_log}"
end
end
''',
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ pub struct ConfigMap {
#[parameter(default = false)]
background_ddl: bool,

/// Run DDL statements in background
#[parameter(default = true)]
enable_reusable_source: bool,

/// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
#[parameter(default = SERVER_ENCODING)]
server_encoding: String,
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ macro_rules! impl_set_system_param {
$(
key_of!($field) => {
let v = if let Some(v) = value {
v.as_ref().parse().map_err(|_| format!("cannot parse parameter value"))?
v.as_ref().parse().map_err(|e| format!("cannot parse parameter value: {e}"))?
} else {
$default.ok_or_else(|| format!("{} does not have a default value", key))?
};
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ impl BrokerAddrRewriter {
role: PrivateLinkContextRole,
broker_rewrite_map: Option<HashMap<String, String>>,
) -> ConnectorResult<Self> {
tracing::info!("[{}] rewrite map {:?}", role, broker_rewrite_map);
let rewrite_map: ConnectorResult<BTreeMap<BrokerAddr, BrokerAddr>> = broker_rewrite_map
.map_or(Ok(BTreeMap::new()), |addr_map| {
tracing::info!("[{}] rewrite map {:?}", role, addr_map);
addr_map
.into_iter()
.map(|(old_addr, new_addr)| {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1417,7 +1417,7 @@ pub async fn handle_create_source(

let catalog_writer = session.catalog_writer()?;

if has_streaming_job {
if has_streaming_job && session.config().enable_reusable_source() {
let graph = {
let context = OptimizerContext::from_handler_args(handler_args);
let source_node = LogicalSource::with_catalog(
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/planner/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Planner {
}

pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
if source.can_backfill() {
if source.can_backfill() && self.ctx().session_ctx().config().enable_reusable_source() {
Ok(LogicalSourceBackfill::new(Rc::new(source.catalog), self.ctx())?.into())
} else {
Ok(LogicalSource::with_catalog(
Expand Down

0 comments on commit 8e3c736

Please sign in to comment.