Skip to content

Commit

Permalink
[DPE-2119] fix issues and tests when reusing storage (#272)
Browse files Browse the repository at this point in the history
## Issue
When attaching an existing storage to a new unit, 2 issues happen:

- Snap install failed because of permissions / ownership of directories 
- snap_common gets completely deleted

## Solution
- bump snap version, use the fixed one (the fixed revision is 47, this
is already outdated as a newer version of the snap is already available
and merged to main prior to this PR)
- enhance test coverage for integration tests

## Integration Testing
Tests for attaching existing storage can be found in
integration/ha/test_storage.py. There are now three test cases:
1. test_storage_reuse_after_scale_down: remove one unit from the
deployment, afterwards add a new one re-using the storage from the
removed unit. check if the continuous writes are ok and a testfile that
was created intially is still there.
2. test_storage_reuse_after_scale_to_zero: remove both units from the
deployment, keep the application, add two new units using the storage
again. check the continuous writes.
3. test_storage_reuse_in_new_cluster_after_app_removal: from a cluster
of three units, remove all of them and remove the application. deploy a
new application (with one unit) to the same model, attach the storage,
then add two more units with the other storage volumes. check the
continuous writes.

## Other Issues
- As part of this PR, another issue is addressed:
#306. It is
resolved with this commit:
19f843c
- Furthermore problems with acquiring the OpenSearch lock are worked around with this PR, especially when the shards for the locking index within OpenSearch are not assigned to a new primary when removing the former primary. This was also reported in #243 and will be further investigated in #327.
  • Loading branch information
reneradoi authored Jun 11, 2024
1 parent f97f015 commit 070182a
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 42 deletions.
3 changes: 0 additions & 3 deletions lib/charms/opensearch/v0/opensearch_base_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,9 +530,6 @@ def _on_opensearch_data_storage_detaching(self, _: StorageDetachingEvent): # no
self.peers_data.delete(Scope.APP, "bootstrap_contributors_count")
self.peers_data.delete(Scope.APP, "nodes_config")

# todo: remove this if snap storage reuse is solved.
self.peers_data.delete(Scope.APP, "security_index_initialised")

# we attempt to flush the translog to disk
if self.opensearch.is_node_up():
try:
Expand Down
15 changes: 11 additions & 4 deletions lib/charms/opensearch/v0/opensearch_locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ def _unit_with_lock(self, host) -> str | None:
retries=3,
)
except OpenSearchHttpError as e:
if e.response_code == 404:
# No unit has lock
if e.response_code in [404, 503]:
# No unit has lock or index not available
return
raise
return document_data["unit-name"]
Expand Down Expand Up @@ -240,7 +240,13 @@ def acquired(self) -> bool: # noqa: C901
unit = self._unit_with_lock(host)
except OpenSearchHttpError:
logger.exception("Error checking which unit has OpenSearch lock")
return False
# if the node lock cannot be acquired, fall back to peer databag lock
# this avoids hitting deadlock situations in cases where
# the .charm_node_lock index is not available
if online_nodes <= 1:
return self._peer.acquired
else:
return False
# If online_nodes == 1, we should acquire the lock via the peer databag.
# If we acquired the lock via OpenSearch and this unit was stopping, we would be unable
# to release the OpenSearch lock. For example, when scaling to 0.
Expand Down Expand Up @@ -274,7 +280,8 @@ def acquired(self) -> bool: # noqa: C901
return False
else:
logger.exception("Error creating OpenSearch lock document")
return False
# in this case, try to acquire peer databag lock as fallback
return self._peer.acquired
else:
# Ensure write was successful on all nodes
# "It is important to note that this setting [`wait_for_active_shards`] greatly
Expand Down
161 changes: 126 additions & 35 deletions tests/integration/ha/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@

import asyncio
import logging
import subprocess
import time

import pytest
from pytest_operator.plugin import OpsTest

from ..ha.helpers import app_name, storage_id, storage_type
from ..ha.helpers import (
app_name,
assert_continuous_writes_increasing,
storage_id,
storage_type,
)
from ..ha.test_horizontal_scaling import IDLE_PERIOD
from ..helpers import APP_NAME, MODEL_CONFIG, SERIES, get_application_unit_ids
from ..helpers_deployments import wait_until
from ..tls.test_tls import TLS_CERTIFICATES_APP_NAME
from .continuous_writes import ContinuousWrites

Expand All @@ -29,11 +36,14 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None:

my_charm = await ops_test.build_charm(".")
await ops_test.model.set_config(MODEL_CONFIG)
# this assumes the test is run on a lxd cloud
await ops_test.model.create_storage_pool("opensearch-pool", "lxd")
storage = {"opensearch-data": {"pool": "opensearch-pool", "size": 2048}}
# Deploy TLS Certificates operator.
config = {"ca-common-name": "CN_CA"}
await asyncio.gather(
ops_test.model.deploy(TLS_CERTIFICATES_APP_NAME, channel="stable", config=config),
ops_test.model.deploy(my_charm, num_units=1, series=SERIES),
ops_test.model.deploy(my_charm, num_units=1, series=SERIES, storage=storage),
)

# Relate it to OpenSearch to set up TLS.
Expand All @@ -60,33 +70,34 @@ async def test_storage_reuse_after_scale_down(
"reuse of storage can only be used on deployments with persistent storage not on rootfs deployments"
)

# scale-down to 1 if multiple units
unit_ids = get_application_unit_ids(ops_test, app)
if len(unit_ids) > 1:
for unit_id in unit_ids[1:]:
await ops_test.model.applications[app].destroy_unit(f"{app}/{unit_id}")

await ops_test.model.wait_for_idle(
apps=[app],
status="active",
timeout=1000,
wait_for_exact_units=1,
idle_period=IDLE_PERIOD,
)
else:
# wait for enough data to be written
time.sleep(60)
# scale up to 2 units
await ops_test.model.applications[app].add_unit(count=1)
await ops_test.model.wait_for_idle(
apps=[app],
status="active",
timeout=1000,
wait_for_exact_units=2,
)

writes_result = await c_writes.stop()

# get unit info
unit_id = get_application_unit_ids(ops_test, app)[0]
unit_id = get_application_unit_ids(ops_test, app)[1]
unit_storage_id = storage_id(ops_test, app, unit_id)

# scale-down to 0
# create a testfile on the newly added unit to check if data in storage is persistent
testfile = "/var/snap/opensearch/common/testfile"
create_testfile_cmd = f"juju ssh {app}/{unit_id} -q sudo touch {testfile}"
subprocess.run(create_testfile_cmd, shell=True)

# scale-down to 1
await ops_test.model.applications[app].destroy_unit(f"{app}/{unit_id}")
await ops_test.model.wait_for_idle(
apps=[app], status="active", timeout=1000, wait_for_exact_units=0
# app status will not be active because after scaling down not all shards are assigned
apps=[app],
timeout=1000,
wait_for_exact_units=1,
idle_period=IDLE_PERIOD,
)

# add unit with storage attached
Expand All @@ -97,23 +108,86 @@ async def test_storage_reuse_after_scale_down(
assert return_code == 0, "Failed to add unit with storage"

await ops_test.model.wait_for_idle(
apps=[app], status="active", timeout=1000, wait_for_exact_units=1
apps=[app],
status="active",
timeout=1000,
wait_for_exact_units=2,
idle_period=IDLE_PERIOD,
)

# check the storage of the new unit
new_unit_id = get_application_unit_ids(ops_test, app)[0]
new_unit_id = get_application_unit_ids(ops_test, app)[1]
new_unit_storage_id = storage_id(ops_test, app, new_unit_id)
assert unit_storage_id == new_unit_storage_id, "Storage IDs mismatch."

# check if data is also imported
assert writes_result.count == (await c_writes.count())
assert writes_result.max_stored_id == (await c_writes.max_stored_id())

# check if the testfile is still there or was overwritten on installation
check_testfile_cmd = f"juju ssh {app}/{new_unit_id} -q sudo ls {testfile}"
assert testfile == subprocess.getoutput(check_testfile_cmd)


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_storage_reuse_in_new_cluster_after_app_removal(
async def test_storage_reuse_after_scale_to_zero(
ops_test: OpsTest, c_writes: ContinuousWrites, c_writes_runner
):
"""Check storage is reused and data accessible after scaling down and up."""
app = (await app_name(ops_test)) or APP_NAME

if storage_type(ops_test, app) == "rootfs":
pytest.skip(
"reuse of storage can only be used on deployments with persistent storage not on rootfs deployments"
)

writes_result = await c_writes.stop()

# scale down to zero units in reverse order
unit_ids = get_application_unit_ids(ops_test, app)
storage_ids = {}
for unit_id in unit_ids[::-1]:
storage_ids[unit_id] = storage_id(ops_test, app, unit_id)
await ops_test.model.applications[app].destroy_unit(f"{app}/{unit_id}")
# give some time for removing each unit
time.sleep(60)

await ops_test.model.wait_for_idle(
# app status will not be active because after scaling down not all shards are assigned
apps=[app],
timeout=1000,
wait_for_exact_units=0,
)

# scale up again
for unit_id in unit_ids:
add_unit_cmd = f"add-unit {app} --model={ops_test.model.info.name} --attach-storage={storage_ids[unit_id]}"
return_code, _, _ = await ops_test.juju(*add_unit_cmd.split())
assert return_code == 0, f"Failed to add unit with storage {storage_ids[unit_id]}"
await ops_test.model.wait_for_idle(apps=[app], timeout=1000)

await ops_test.model.wait_for_idle(
apps=[app],
status="active",
timeout=1000,
wait_for_exact_units=len(unit_ids),
)

# check if data is also imported
assert writes_result.count == (await c_writes.count())
assert writes_result.max_stored_id == (await c_writes.max_stored_id())

# restart continuous writes and check if they can be written
await c_writes.start()
time.sleep(30)
await assert_continuous_writes_increasing(c_writes)


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_storage_reuse_in_new_cluster_after_app_removal(
ops_test: OpsTest, c_writes: ContinuousWrites, c_balanced_writes_runner
):
"""Check storage is reused and data accessible after removing app and deploying new cluster."""
app = (await app_name(ops_test)) or APP_NAME
Expand All @@ -123,7 +197,7 @@ async def test_storage_reuse_in_new_cluster_after_app_removal(
"reuse of storage can only be used on deployments with persistent storage not on rootfs deployments"
)

# scale-down to 1 if multiple units
# scale-up to 3 to make it a cluster
unit_ids = get_application_unit_ids(ops_test, app)
if len(unit_ids) < 3:
await ops_test.model.applications[app].add_unit(count=3 - len(unit_ids))
Expand All @@ -146,11 +220,8 @@ async def test_storage_reuse_in_new_cluster_after_app_removal(
for unit_id in get_application_unit_ids(ops_test, app):
storage_ids.append(storage_id(ops_test, app, unit_id))

# remove application
await ops_test.model.applications[app].destroy()

# wait a bit until all app deleted
time.sleep(60)
# remove the remaining application
await ops_test.model.remove_application(app, block_until_done=True)

# deploy new cluster
my_charm = await ops_test.build_charm(".")
Expand All @@ -159,6 +230,18 @@ async def test_storage_reuse_in_new_cluster_after_app_removal(
)
return_code, _, _ = await ops_test.juju(*deploy_cluster_with_storage_cmd.split())
assert return_code == 0, f"Failed to deploy app with storage {storage_ids[0]}"
await ops_test.model.integrate(app, TLS_CERTIFICATES_APP_NAME)

# wait for cluster to be deployed
await wait_until(
ops_test,
apps=[app],
apps_statuses=["active", "blocked"],
units_statuses=["active"],
wait_for_exact_units=1,
idle_period=IDLE_PERIOD,
timeout=2400,
)

# add unit with storage attached
for unit_storage_id in storage_ids[1:]:
Expand All @@ -168,12 +251,15 @@ async def test_storage_reuse_in_new_cluster_after_app_removal(
return_code, _, _ = await ops_test.juju(*add_unit_cmd.split())
assert return_code == 0, f"Failed to add unit with storage {unit_storage_id}"

await ops_test.model.integrate(app, TLS_CERTIFICATES_APP_NAME)
await ops_test.model.wait_for_idle(
apps=[TLS_CERTIFICATES_APP_NAME, APP_NAME],
status="active",
timeout=1000,
# wait for new cluster to settle down
await wait_until(
ops_test,
apps=[app],
apps_statuses=["active"],
units_statuses=["active"],
wait_for_exact_units=len(storage_ids),
idle_period=IDLE_PERIOD,
timeout=2400,
)
assert len(ops_test.model.applications[app].units) == len(storage_ids)

Expand All @@ -187,3 +273,8 @@ async def test_storage_reuse_in_new_cluster_after_app_removal(
# check if data is also imported
assert writes_result.count == (await c_writes.count())
assert writes_result.max_stored_id == (await c_writes.max_stored_id())

# restart continuous writes and check if they can be written
await c_writes.start()
time.sleep(60)
assert (await c_writes.count()) > 0, "Continuous writes not increasing"

0 comments on commit 070182a

Please sign in to comment.