Skip to content

Commit

Permalink
fix(ingest/partitionExecutor): Fetch ready items for non-empty batch …
Browse files Browse the repository at this point in the history
…when _pending is empty (datahub-project#11885)
  • Loading branch information
asikowitz authored Nov 19, 2024
1 parent bf16e58 commit 94f1f39
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 16 deletions.
22 changes: 10 additions & 12 deletions metadata-ingestion/src/datahub/utilities/partition_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def __init__(
self.read_from_pending_interval = read_from_pending_interval
assert self.max_workers > 1

self.state_lock = threading.Lock()
self._state_lock = threading.Lock()
self._executor = ThreadPoolExecutor(
# We add one here to account for the clearinghouse worker thread.
max_workers=max_workers + 1,
Expand Down Expand Up @@ -323,7 +323,7 @@ def _handle_batch_completion(
if item.done_callback:
item.done_callback(future)

def _find_ready_items() -> List[_BatchPartitionWorkItem]:
def _find_ready_items(max_to_add: int) -> List[_BatchPartitionWorkItem]:
with clearinghouse_state_lock:
# First, update the keys in flight.
for key in keys_no_longer_in_flight:
Expand All @@ -336,18 +336,15 @@ def _find_ready_items() -> List[_BatchPartitionWorkItem]:

ready: List[_BatchPartitionWorkItem] = []
for item in pending:
if (
len(ready) < self.max_per_batch
and item.key not in keys_in_flight
):
if len(ready) < max_to_add and item.key not in keys_in_flight:
ready.append(item)
else:
pending_key_completion.append(item)

return ready

def _build_batch() -> List[_BatchPartitionWorkItem]:
next_batch = _find_ready_items()
next_batch = _find_ready_items(self.max_per_batch)

while (
not self._queue_empty_for_shutdown
Expand Down Expand Up @@ -382,11 +379,12 @@ def _build_batch() -> List[_BatchPartitionWorkItem]:
pending_key_completion.append(next_item)
else:
next_batch.append(next_item)

if not next_batch:
next_batch = _find_ready_items()
except queue.Empty:
if not blocking:
if blocking:
next_batch.extend(
_find_ready_items(self.max_per_batch - len(next_batch))
)
else:
break

return next_batch
Expand Down Expand Up @@ -458,7 +456,7 @@ def _ensure_clearinghouse_started(self) -> None:
f"{self.__class__.__name__} is shutting down; cannot submit new work items."
)

with self.state_lock:
with self._state_lock:
# Lazily start the clearinghouse worker.
if not self._clearinghouse_started:
self._clearinghouse_started = True
Expand Down
36 changes: 32 additions & 4 deletions metadata-ingestion/tests/unit/utilities/test_partition_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ def process_batch(batch):
}


@pytest.mark.timeout(10)
@pytest.mark.timeout(5)
def test_batch_partition_executor_max_batch_size():
n = 20 # Exceed max_pending to test for deadlocks when max_pending exceeded
n = 5
batches_processed = []

def process_batch(batch):
Expand All @@ -147,8 +147,8 @@ def process_batch(batch):
max_pending=10,
process_batch=process_batch,
max_per_batch=2,
min_process_interval=timedelta(seconds=1),
read_from_pending_interval=timedelta(seconds=1),
min_process_interval=timedelta(seconds=0.1),
read_from_pending_interval=timedelta(seconds=0.1),
) as executor:
# Submit more tasks than the max_per_batch to test batching limits.
for i in range(n):
Expand All @@ -161,6 +161,34 @@ def process_batch(batch):
assert len(batch) <= 2, "Batch size exceeded max_per_batch limit"


@pytest.mark.timeout(10)
def test_batch_partition_executor_deadlock():
n = 20 # Exceed max_pending to test for deadlocks when max_pending exceeded
batch_size = 2
batches_processed = []

def process_batch(batch):
batches_processed.append(batch)
time.sleep(0.1) # Simulate batch processing time

with BatchPartitionExecutor(
max_workers=5,
max_pending=2,
process_batch=process_batch,
max_per_batch=batch_size,
min_process_interval=timedelta(seconds=30),
read_from_pending_interval=timedelta(seconds=0.01),
) as executor:
# Submit more tasks than the max_per_batch to test batching limits.
executor.submit("key3", "key3", "task0")
executor.submit("key3", "key3", "task1")
executor.submit("key1", "key1", "task1") # Populates second batch
for i in range(3, n):
executor.submit("key3", "key3", f"task{i}")

assert sum(len(batch) for batch in batches_processed) == n


def test_empty_batch_partition_executor():
# We want to test that even if no submit() calls are made, cleanup works fine.
with BatchPartitionExecutor(
Expand Down

0 comments on commit 94f1f39

Please sign in to comment.