Skip to content

Commit

Permalink
Merge pull request #17631 from ballard26/write-cache-omb
Browse files Browse the repository at this point in the history
Add write caching to OMB validation tests
  • Loading branch information
travisdowns authored Nov 27, 2024
2 parents 0c52283 + 2e6cfa5 commit 223e84c
Showing 1 changed file with 44 additions and 12 deletions.
56 changes: 44 additions & 12 deletions tests/rptest/redpanda_cloud_tests/omb_validation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from rptest.services.redpanda import get_cloud_provider
from rptest.tests.redpanda_cloud_test import RedpandaCloudTest
from ducktape.tests.test import TestContext
from ducktape.mark import matrix
from rptest.services.producer_swarm import ProducerSwarm
from rptest.clients.rpk import RpkTool
from rptest.services.redpanda_cloud import ProductInfo
Expand Down Expand Up @@ -230,7 +231,8 @@ def _mb_to_mib(self, mb: float | int):
return math.floor(0.9537 * mb)

@cluster(num_nodes=CLUSTER_NODES)
def test_max_connections(self):
@matrix(write_caching=["on", "off"])
def test_max_connections(self, write_caching: str):
tier_limits = self.tier_limits

# Constants
Expand Down Expand Up @@ -281,6 +283,9 @@ def test_max_connections(self):
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
},
"topic_config": {
"write.caching": write_caching,
},
}

validator = self.base_validator() | {
Expand Down Expand Up @@ -396,7 +401,8 @@ def assert_no_rejected():

self.rpk.create_topic(swarm_topic_name,
self._partition_count(),
replicas=3)
replicas=3,
config={"write.caching": write_caching})

def make_swarm():
return ProducerSwarm(
Expand Down Expand Up @@ -558,7 +564,8 @@ def kv_str(k: str, v: Any):
self.logger.warn(str(results))

@cluster(num_nodes=CLUSTER_NODES)
def test_max_partitions(self):
@matrix(write_caching=["on", "off"])
def test_max_partitions(self, write_caching: str):
tier_limits = self.tier_limits

# multiplier for the latencies to log warnings on, but still pass the test
Expand Down Expand Up @@ -596,6 +603,27 @@ def test_max_partitions(self):
producer_rate / (1 * KiB),
}

driver = {
"name": "MaxPartitionsTestDriver",
"replication_factor": 3,
"request_timeout": 300000,
"producer_config": {
"enable.idempotence": "true",
"acks": "all",
"linger.ms": 1,
"max.in.flight.requests.per.connection": 5,
"batch.size": 131072,
},
"consumer_config": {
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
"max.partition.fetch.bytes": 131072
},
"topic_config": {
"write.caching": write_caching,
},
}

# validator to check metrics and fail on
fail_validator = self.base_validator(fudge_factor) | {
OMBSampleConfigurations.AVG_THROUGHPUT_MBPS: [
Expand All @@ -612,13 +640,11 @@ def test_max_partitions(self):
],
}

benchmark = OpenMessagingBenchmark(
self._ctx,
self.redpanda,
"ACK_ALL_GROUP_LINGER_1MS_IDEM_MAX_IN_FLIGHT",
(workload, fail_validator),
num_workers=self.CLUSTER_NODES - 1,
topology="ensemble")
benchmark = OpenMessagingBenchmark(self._ctx,
self.redpanda,
driver, (workload, fail_validator),
num_workers=self.CLUSTER_NODES - 1,
topology="ensemble")
benchmark.start()
benchmark_time_min = benchmark.benchmark_time() + 5
benchmark.wait(timeout_sec=benchmark_time_min * 60)
Expand All @@ -644,7 +670,8 @@ def test_max_partitions(self):
self.redpanda.assert_cluster_is_reusable()

@cluster(num_nodes=CLUSTER_NODES)
def test_common_workload(self):
@matrix(write_caching=["on", "off"])
def test_common_workload(self, write_caching: str):
tier_limits = self.tier_limits

subscriptions = max(tier_limits.max_egress // tier_limits.max_ingress,
Expand Down Expand Up @@ -684,6 +711,9 @@ def test_common_workload(self):
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
},
"topic_config": {
"write.caching": write_caching,
},
}

benchmark = OpenMessagingBenchmark(self._ctx,
Expand All @@ -698,7 +728,8 @@ def test_common_workload(self):
self.redpanda.assert_cluster_is_reusable()

@cluster(num_nodes=CLUSTER_NODES)
def test_retention(self):
@matrix(write_caching=["on", "off"])
def test_retention(self, write_caching: str):
tier_limits = self.tier_limits

subscriptions = max(tier_limits.max_egress // tier_limits.max_ingress,
Expand Down Expand Up @@ -740,6 +771,7 @@ def test_retention(self):
"enable.auto.commit": "false",
},
"topic_config": {
"write.caching": write_caching,
"retention.bytes": retention_bytes,
"retention.local.target.bytes": retention_bytes,
"segment.bytes": segment_bytes,
Expand Down

0 comments on commit 223e84c

Please sign in to comment.