Skip to content

Commit

Permalink
Merge pull request ClickHouse#68837 from CurtizJ/fix-async-insert-alter
Browse files Browse the repository at this point in the history
Fix async inserts with `ALTER ADD/MODIFY COLUMN`
  • Loading branch information
CurtizJ authored Sep 3, 2024
2 parents 9822b2f + 2e4f6f2 commit 34c14a6
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 101 deletions.
203 changes: 113 additions & 90 deletions src/Interpreters/AsynchronousInsertQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include <Common/SensitiveDataMasker.h>
#include <Common/SipHash.h>
#include <Common/logger_useful.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>

namespace CurrentMetrics
{
Expand Down Expand Up @@ -308,6 +310,7 @@ void AsynchronousInsertQueue::preprocessInsertQuery(const ASTPtr & query, const
/* no_squash */ false,
/* no_destination */ false,
/* async_insert */ false);

auto table = interpreter.getTable(insert_query);
auto sample_block = InterpreterInsertQuery::getSampleBlock(insert_query, table, table->getInMemoryMetadataPtr(), query_context);

Expand All @@ -318,6 +321,10 @@ void AsynchronousInsertQueue::preprocessInsertQuery(const ASTPtr & query, const
/// InterpreterInsertQuery::getTable() -> ITableFunction::execute().
if (insert_query.table_id)
query_context->checkAccess(AccessType::INSERT, insert_query.table_id, sample_block.getNames());

insert_query.columns = std::make_shared<ASTExpressionList>();
for (const auto & column : sample_block)
insert_query.columns->children.push_back(std::make_shared<ASTIdentifier>(column.name));
}

AsynchronousInsertQueue::PushResult
Expand Down Expand Up @@ -696,6 +703,17 @@ catch (...)
tryLogCurrentException("AsynchronousInsertQueue", "Failed to add elements to AsynchronousInsertLog");
}

void convertBlockToHeader(Block & block, const Block & header)
{
auto converting_dag = ActionsDAG::makeConvertingActions(
block.getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);

auto converting_actions = std::make_shared<ExpressionActions>(std::move(converting_dag));
converting_actions->execute(block);
}

String serializeQuery(const IAST & query, size_t max_length)
{
return query.hasSecretParts()
Expand Down Expand Up @@ -791,6 +809,61 @@ try
if (async_insert_log)
log_elements.reserve(data->entries.size());

auto add_entry_to_asynchronous_insert_log = [&, query_by_format = NameToNameMap{}](
const InsertData::EntryPtr & entry,
const String & parsing_exception,
size_t num_rows,
size_t num_bytes) mutable
{
if (!async_insert_log)
return;

AsynchronousInsertLogElement elem;
elem.event_time = timeInSeconds(entry->create_time);
elem.event_time_microseconds = timeInMicroseconds(entry->create_time);
elem.database = query_database;
elem.table = query_table;
elem.format = entry->format;
elem.query_id = entry->query_id;
elem.bytes = num_bytes;
elem.rows = num_rows;
elem.exception = parsing_exception;
elem.data_kind = entry->chunk.getDataKind();
elem.timeout_milliseconds = data->timeout_ms.count();
elem.flush_query_id = insert_query_id;

auto get_query_by_format = [&](const String & format) -> const String &
{
auto [it, inserted] = query_by_format.try_emplace(format);
if (!inserted)
return it->second;

auto query = key.query->clone();
assert_cast<ASTInsertQuery &>(*query).format = format;
it->second = serializeQuery(*query, insert_context->getSettingsRef().log_queries_cut_to_length);
return it->second;
};

if (entry->chunk.getDataKind() == DataKind::Parsed)
elem.query_for_logging = key.query_str;
else
elem.query_for_logging = get_query_by_format(entry->format);

/// If there was a parsing error,
/// the entry won't be flushed anyway,
/// so add the log element immediately.
if (!elem.exception.empty())
{
elem.status = AsynchronousInsertLogElement::ParsingError;
async_insert_log->add(std::move(elem));
}
else
{
elem.status = AsynchronousInsertLogElement::Ok;
log_elements.push_back(std::move(elem));
}
};

try
{
interpreter = std::make_unique<InterpreterInsertQuery>(
Expand Down Expand Up @@ -819,49 +892,20 @@ try
catch (...)
{
logExceptionBeforeStart(query_for_logging, insert_context, key.query, query_span, start_watch.elapsedMilliseconds());
throw;
}

auto add_entry_to_asynchronous_insert_log = [&](const auto & entry,
const auto & entry_query_for_logging,
const auto & exception,
size_t num_rows,
size_t num_bytes,
Milliseconds timeout_ms)
{
if (!async_insert_log)
return;

AsynchronousInsertLogElement elem;
elem.event_time = timeInSeconds(entry->create_time);
elem.event_time_microseconds = timeInMicroseconds(entry->create_time);
elem.query_for_logging = entry_query_for_logging;
elem.database = query_database;
elem.table = query_table;
elem.format = entry->format;
elem.query_id = entry->query_id;
elem.bytes = num_bytes;
elem.rows = num_rows;
elem.exception = exception;
elem.data_kind = entry->chunk.getDataKind();
elem.timeout_milliseconds = timeout_ms.count();
elem.flush_query_id = insert_query_id;

/// If there was a parsing error,
/// the entry won't be flushed anyway,
/// so add the log element immediately.
if (!elem.exception.empty())
{
elem.status = AsynchronousInsertLogElement::ParsingError;
async_insert_log->add(std::move(elem));
}
else
if (async_insert_log)
{
log_elements.push_back(elem);
for (const auto & entry : data->entries)
add_entry_to_asynchronous_insert_log(entry, /*parsing_exception=*/ "", /*num_rows=*/ 0, entry->chunk.byteSize());

auto exception = getCurrentExceptionMessage(false);
auto flush_time = std::chrono::system_clock::now();
appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, exception);
}
};
throw;
}

auto finish_entries = [&]
auto finish_entries = [&](size_t num_rows, size_t num_bytes)
{
for (const auto & entry : data->entries)
{
Expand All @@ -874,20 +918,7 @@ try
auto flush_time = std::chrono::system_clock::now();
appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, "");
}
};

Chunk chunk;
auto header = pipeline.getHeader();

if (key.data_kind == DataKind::Parsed)
chunk = processEntriesWithParsing(key, data, header, insert_context, log, add_entry_to_asynchronous_insert_log);
else
chunk = processPreprocessedEntries(key, data, header, insert_context, add_entry_to_asynchronous_insert_log);

ProfileEvents::increment(ProfileEvents::AsyncInsertRows, chunk.getNumRows());

auto log_and_add_finish_to_query_log = [&](size_t num_rows, size_t num_bytes)
{
LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str);
queue_shard_flush_time_history.updateWithCurrentTime();

Expand All @@ -896,16 +927,24 @@ try
query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, QueryCache::Usage::None, internal);
};


if (chunk.getNumRows() == 0)
{
finish_entries();
log_and_add_finish_to_query_log(0, 0);
return;
}

try
{
Chunk chunk;
auto header = pipeline.getHeader();

if (key.data_kind == DataKind::Parsed)
chunk = processEntriesWithParsing(key, data, header, insert_context, log, add_entry_to_asynchronous_insert_log);
else
chunk = processPreprocessedEntries(data, header, add_entry_to_asynchronous_insert_log);

ProfileEvents::increment(ProfileEvents::AsyncInsertRows, chunk.getNumRows());

if (chunk.getNumRows() == 0)
{
finish_entries(/*num_rows=*/ 0, /*num_bytes=*/ 0);
return;
}

size_t num_rows = chunk.getNumRows();
size_t num_bytes = chunk.bytes();

Expand All @@ -915,7 +954,7 @@ try
CompletedPipelineExecutor completed_executor(pipeline);
completed_executor.execute();

log_and_add_finish_to_query_log(num_rows, num_bytes);
finish_entries(num_rows, num_bytes);
}
catch (...)
{
Expand All @@ -929,8 +968,6 @@ try
}
throw;
}

finish_entries();
}
catch (const Exception & e)
{
Expand Down Expand Up @@ -991,7 +1028,6 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(

StreamingFormatExecutor executor(header, format, std::move(on_error), std::move(adding_defaults_transform));
auto chunk_info = std::make_shared<AsyncInsertInfo>();
auto query_for_logging = serializeQuery(*key.query, insert_context->getSettingsRef().log_queries_cut_to_length);

for (const auto & entry : data->entries)
{
Expand All @@ -1009,7 +1045,8 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
size_t num_rows = executor.execute(*buffer);

total_rows += num_rows;
/// for some reason, client can pass zero rows and bytes to server.

/// For some reason, client can pass zero rows and bytes to server.
/// We don't update offsets in this case, because we assume every insert has some rows during dedup
/// but we have nothing to deduplicate for this insert.
if (num_rows > 0)
Expand All @@ -1018,8 +1055,7 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
chunk_info->tokens.push_back(entry->async_dedup_token);
}

add_to_async_insert_log(entry, query_for_logging, current_exception, num_rows, num_bytes, data->timeout_ms);

add_to_async_insert_log(entry, current_exception, num_rows, num_bytes);
current_exception.clear();
entry->resetChunk();
}
Expand All @@ -1031,54 +1067,41 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(

template <typename LogFunc>
Chunk AsynchronousInsertQueue::processPreprocessedEntries(
const InsertQuery & key,
const InsertDataPtr & data,
const Block & header,
const ContextPtr & insert_context,
LogFunc && add_to_async_insert_log)
{
size_t total_rows = 0;
auto chunk_info = std::make_shared<AsyncInsertInfo>();
auto result_columns = header.cloneEmptyColumns();

std::unordered_map<String, String> format_to_query;

auto get_query_by_format = [&](const String & format) -> const String &
{
auto [it, inserted] = format_to_query.try_emplace(format);
if (!inserted)
return it->second;

auto query = key.query->clone();
assert_cast<ASTInsertQuery &>(*query).format = format;
it->second = serializeQuery(*query, insert_context->getSettingsRef().log_queries_cut_to_length);
return it->second;
};

for (const auto & entry : data->entries)
{
const auto * block = entry->chunk.asBlock();
if (!block)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected entry with data kind Preprocessed. Got: {}", entry->chunk.getDataKind());

auto columns = block->getColumns();
Block block_to_insert = *block;
if (!isCompatibleHeader(block_to_insert, header))
convertBlockToHeader(block_to_insert, header);

auto columns = block_to_insert.getColumns();
for (size_t i = 0, s = columns.size(); i < s; ++i)
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());

total_rows += block->rows();
/// for some reason, client can pass zero rows and bytes to server.
total_rows += block_to_insert.rows();

/// For some reason, client can pass zero rows and bytes to server.
/// We don't update offsets in this case, because we assume every insert has some rows during dedup,
/// but we have nothing to deduplicate for this insert.
if (block->rows())
if (block_to_insert.rows() > 0)
{
chunk_info->offsets.push_back(total_rows);
chunk_info->tokens.push_back(entry->async_dedup_token);
}

const auto & query_for_logging = get_query_by_format(entry->format);
add_to_async_insert_log(entry, query_for_logging, "", block->rows(), block->bytes(), data->timeout_ms);

add_to_async_insert_log(entry, /*parsing_exception=*/ "", block_to_insert.rows(), block_to_insert.bytes());
entry->resetChunk();
}

Expand Down
2 changes: 0 additions & 2 deletions src/Interpreters/AsynchronousInsertQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,8 @@ class AsynchronousInsertQueue : public WithContext

template <typename LogFunc>
static Chunk processPreprocessedEntries(
const InsertQuery & key,
const InsertDataPtr & data,
const Block & header,
const ContextPtr & insert_context,
LogFunc && add_to_async_insert_log);

template <typename E>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ written_rows: 0
written_bytes: 0
result_rows: 0
result_bytes: 0
query: INSERT INTO default.async_insert_landing SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query: INSERT INTO default.async_insert_landing (id) SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query_kind: AsyncInsertFlush
databases: ['default']
tables: ['default.async_insert_landing']
Expand All @@ -26,7 +26,7 @@ written_rows: 4
written_bytes: 16
result_rows: 4
result_bytes: 16
query: INSERT INTO default.async_insert_landing SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query: INSERT INTO default.async_insert_landing (id) SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query_kind: AsyncInsertFlush
databases: ['default']
tables: ['default.async_insert_landing']
Expand Down Expand Up @@ -54,7 +54,7 @@ written_rows: 0
written_bytes: 0
result_rows: 0
result_bytes: 0
query: INSERT INTO default.async_insert_landing SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query: INSERT INTO default.async_insert_landing (id) SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query_kind: AsyncInsertFlush
databases: ['default']
tables: ['default.async_insert_landing','default.async_insert_target']
Expand All @@ -71,7 +71,7 @@ written_rows: 6
written_bytes: 24
result_rows: 6
result_bytes: 24
query: INSERT INTO default.async_insert_landing SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query: INSERT INTO default.async_insert_landing (id) SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query_kind: AsyncInsertFlush
databases: ['default']
tables: ['default.async_insert_landing','default.async_insert_target']
Expand Down Expand Up @@ -118,7 +118,7 @@ written_rows: 0
written_bytes: 0
result_rows: 0
result_bytes: 0
query: INSERT INTO default.async_insert_landing SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query: INSERT INTO default.async_insert_landing (id) SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query_kind: AsyncInsertFlush
databases: ['default']
tables: ['default.async_insert_landing','default.async_insert_target']
Expand All @@ -135,7 +135,7 @@ written_rows: 3
written_bytes: 12
result_rows: 0
result_bytes: 0
query: INSERT INTO default.async_insert_landing SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query: INSERT INTO default.async_insert_landing (id) SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query_kind: AsyncInsertFlush
databases: ['default']
tables: ['default.async_insert_landing','default.async_insert_target']
Expand Down
Loading

0 comments on commit 34c14a6

Please sign in to comment.