Skip to content

Commit

Permalink
Merge pull request ClickHouse#69148 from Algunenano/rmt_retries
Browse files Browse the repository at this point in the history
RMT: Do not block retries when establishing a new keeper connection
  • Loading branch information
Algunenano authored Sep 3, 2024
2 parents 5d3d2c2 + fda6a48 commit 7ba6eff
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 16 deletions.
11 changes: 7 additions & 4 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,17 @@ void StorageReplicatedMergeTree::setZooKeeper()
/// strange effects. So we always use only one session for all tables.
/// (excluding auxiliary zookeepers)

std::lock_guard lock(current_zookeeper_mutex);
if (zookeeper_name == default_zookeeper_name)
{
current_zookeeper = getContext()->getZooKeeper();
auto new_keeper = getContext()->getZooKeeper();
std::lock_guard lock(current_zookeeper_mutex);
current_zookeeper = new_keeper;
}
else
{
current_zookeeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
auto new_keeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
std::lock_guard lock(current_zookeeper_mutex);
current_zookeeper = new_keeper;
}
}

Expand Down Expand Up @@ -365,7 +368,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_name);
if (has_zookeeper)
{
/// It's possible for getZooKeeper() to timeout if zookeeper host(s) can't
/// It's possible for getZooKeeper() to timeout if zookeeper host(s) can't
/// be reached. In such cases Poco::Exception is thrown after a connection
/// timeout - refer to src/Common/ZooKeeper/ZooKeeperImpl.cpp:866 for more info.
///
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>

<clickhouse>
<logger>
<level>test</level>
</logger>

</clickhouse>
91 changes: 79 additions & 12 deletions tests/integration/test_inserts_with_keeper_retries/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@
import pytest
import time
import threading
import uuid
from helpers.cluster import ClickHouseCluster
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.client import QueryRuntimeException
from helpers.test_tools import assert_eq_with_retry

cluster = ClickHouseCluster(__file__)

node1 = cluster.add_instance("node1", with_zookeeper=True)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/storage_conf.xml"],
with_zookeeper=True,
with_minio=True,
)


@pytest.fixture(scope="module")
Expand All @@ -25,10 +30,16 @@ def started_cluster():
cluster.shutdown()


def test_replica_inserts_with_keeper_restart(started_cluster):
@pytest.mark.parametrize(
"engine,storage_policy",
[
("ReplicatedMergeTree", "default"),
],
)
def test_replica_inserts_with_keeper_restart(started_cluster, engine, storage_policy):
try:
node1.query(
"CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '0') ORDER BY tuple()"
f"CREATE TABLE r (a UInt64, b String) ENGINE={engine}('/test/r', '0') ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'"
)

p = Pool(1)
Expand Down Expand Up @@ -60,10 +71,18 @@ def zoo_restart(zk_stopped_event):
node1.query("DROP TABLE IF EXISTS r SYNC")


def test_replica_inserts_with_keeper_disconnect(started_cluster):
@pytest.mark.parametrize(
"engine,storage_policy",
[
("ReplicatedMergeTree", "default"),
],
)
def test_replica_inserts_with_keeper_disconnect(
started_cluster, engine, storage_policy
):
try:
node1.query(
"CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '0') ORDER BY tuple()"
f"CREATE TABLE r2 (a UInt64, b String) ENGINE={engine}('/test/r2', '0') ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'"
)

p = Pool(1)
Expand All @@ -84,26 +103,32 @@ def keeper_disconnect(node, event):
disconnect_event.wait(90)

node1.query(
"INSERT INTO r SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=20"
"INSERT INTO r2 SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=20"
)
node1.query(
"INSERT INTO r SELECT number, toString(number) FROM numbers(10, 10) SETTINGS insert_keeper_max_retries=20"
"INSERT INTO r2 SELECT number, toString(number) FROM numbers(10, 10) SETTINGS insert_keeper_max_retries=20"
)

job.wait()
p.close()
p.join()

assert node1.query("SELECT COUNT() FROM r") == "20\n"
assert node1.query("SELECT COUNT() FROM r2") == "20\n"

finally:
node1.query("DROP TABLE IF EXISTS r SYNC")
node1.query("DROP TABLE IF EXISTS r2 SYNC")


def test_query_timeout_with_zk_down(started_cluster):
@pytest.mark.parametrize(
"engine,storage_policy",
[
("ReplicatedMergeTree", "default"),
],
)
def test_query_timeout_with_zk_down(started_cluster, engine, storage_policy):
try:
node1.query(
"CREATE TABLE zk_down (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/zk_down', '0') ORDER BY tuple()"
f"CREATE TABLE zk_down (a UInt64, b String) ENGINE={engine}('/test/zk_down', '0') ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'"
)

cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
Expand All @@ -118,3 +143,45 @@ def test_query_timeout_with_zk_down(started_cluster):
finally:
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
node1.query("DROP TABLE IF EXISTS zk_down SYNC")


@pytest.mark.parametrize(
"engine,storage_policy",
[
("ReplicatedMergeTree", "default"),
],
)
def test_retries_should_not_wait_for_global_connection(
started_cluster, engine, storage_policy
):
pm = PartitionManager()
try:
node1.query(
f"CREATE TABLE zk_down_retries (a UInt64, b String) ENGINE={engine}('/test/zk_down', '0') ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'"
)

cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
# Apart from stopping keepers, we introduce a network delay to make connection retries slower
# We want to check that retries are not blocked during that time
pm.add_network_delay(node1, 1000)

query_id = uuid.uuid4()

with pytest.raises(QueryRuntimeException):
node1.query(
"INSERT INTO zk_down_retries SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=10, insert_keeper_retry_max_backoff_ms=100",
query_id=str(query_id),
)
pm.heal_all()
# Use query_log for execution time since we want to ignore the network delay introduced (also in client)
node1.query("SYSTEM FLUSH LOGS")
res = node1.query(
f"SELECT query_duration_ms FROM system.query_log WHERE type != 'QueryStart' AND query_id = '{query_id}'"
)
query_duration = int(res)
# It should be around 1 second. 5 seconds is being generous (debug and so on). Used to take 35 seconds without the fix
assert query_duration < 5000
finally:
pm.heal_all()
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
node1.query("DROP TABLE IF EXISTS zk_down_retries SYNC")

0 comments on commit 7ba6eff

Please sign in to comment.