Skip to content

Commit

Permalink
fix database error in commit_batch_update
Browse files Browse the repository at this point in the history
  • Loading branch information
jigold committed Feb 6, 2024
1 parent 1030f59 commit 229d8b6
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 22 deletions.
4 changes: 2 additions & 2 deletions batch/batch/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -1396,7 +1396,7 @@ async def insert_jobs_into_db(tx):
(
batch_id,
update_id,
ROOT_JOB_GROUP_ID,
job_group_id,
inst_coll,
rand_token,
resources['n_jobs'],
Expand All @@ -1422,7 +1422,7 @@ async def insert_jobs_into_db(tx):
(
batch_id,
update_id,
ROOT_JOB_GROUP_ID,
job_group_id,
inst_coll,
rand_token,
resources['n_ready_cancellable_jobs'],
Expand Down
6 changes: 2 additions & 4 deletions batch/sql/estimated-current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ CREATE TABLE IF NOT EXISTS `batch_updates` (
`time_created` BIGINT NOT NULL,
`time_committed` BIGINT,
PRIMARY KEY (`batch_id`, `update_id`, `start_job_group_id`, `start_job_id`),
FOREIGN KEY (`batch_id`) REFERENCES batches(`id`),
UNIQUE KEY (`batch_id`, `start_job_id`)
FOREIGN KEY (`batch_id`) REFERENCES batches(`id`)
) ENGINE = InnoDB;
CREATE INDEX `batch_updates_committed` ON `batch_updates` (`batch_id`, `committed`);
CREATE INDEX `batch_updates_start_job_id` ON `batch_updates` (`batch_id`, `start_job_id`);
Expand Down Expand Up @@ -1098,10 +1097,9 @@ BEGIN
ELSE
SELECT COALESCE(SUM(n_jobs), 0) INTO staging_n_jobs
FROM job_groups_inst_coll_staging
WHERE batch_id = in_batch_id AND update_id = in_update_id AND job_group_id = 0
WHERE batch_id = in_batch_id AND update_id = in_update_id
FOR UPDATE;

# we can only check staged equals expected for the root job group
IF staging_n_jobs = expected_n_jobs THEN
UPDATE batch_updates
SET committed = 1, time_committed = in_timestamp
Expand Down
201 changes: 201 additions & 0 deletions batch/sql/finalize-job-groups.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ START TRANSACTION;

SET foreign_key_checks = 0;

# we need to remove the unique index on batch_id, start_job_id because the start_job_id can be repeated if the n_jobs in an update is 0
# `batch_id` was the name of the unique index in my test database
ALTER TABLE batch_updates DROP INDEX `batch_id`, ALGORITHM=INPLACE, LOCK=NONE;

ALTER TABLE batch_updates ADD COLUMN start_job_group_id INT NOT NULL DEFAULT 1, ALGORITHM=INSTANT;
ALTER TABLE batch_updates ADD COLUMN n_job_groups INT NOT NULL DEFAULT 0, ALGORITHM=INSTANT;
CREATE INDEX `batch_updates_start_job_group_id` ON `batch_updates` (`batch_id`, `start_job_group_id`);
Expand Down Expand Up @@ -415,6 +419,203 @@ BEGIN
COMMIT;
END $$

DROP PROCEDURE IF EXISTS commit_batch_update $$
CREATE PROCEDURE commit_batch_update(
IN in_batch_id BIGINT,
IN in_update_id INT,
IN in_timestamp BIGINT
)
BEGIN
DECLARE cur_update_committed BOOLEAN;
DECLARE expected_n_jobs INT;
DECLARE staging_n_jobs INT;
DECLARE cur_update_start_job_id INT;

START TRANSACTION;

SELECT committed, n_jobs INTO cur_update_committed, expected_n_jobs
FROM batch_updates
WHERE batch_id = in_batch_id AND update_id = in_update_id
FOR UPDATE;

IF cur_update_committed THEN
COMMIT;
SELECT 0 as rc;
ELSE
SELECT COALESCE(SUM(n_jobs), 0) INTO staging_n_jobs
FROM job_groups_inst_coll_staging
WHERE batch_id = in_batch_id AND update_id = in_update_id AND job_group_id = 0
FOR UPDATE;

# we can only check staged equals expected for the root job group
IF staging_n_jobs = expected_n_jobs THEN
UPDATE batch_updates
SET committed = 1, time_committed = in_timestamp
WHERE batch_id = in_batch_id AND update_id = in_update_id;

UPDATE batches SET
`state` = 'running',
time_completed = NULL,
n_jobs = n_jobs + expected_n_jobs
WHERE id = in_batch_id;

UPDATE job_groups
INNER JOIN (
SELECT batch_id, job_group_id, CAST(COALESCE(SUM(n_jobs), 0) AS SIGNED) AS staged_n_jobs
FROM job_groups_inst_coll_staging
WHERE batch_id = in_batch_id AND update_id = in_update_id
GROUP BY batch_id, job_group_id
) AS t ON job_groups.batch_id = t.batch_id AND job_groups.job_group_id = t.job_group_id
SET `state` = 'running', time_completed = NULL, n_jobs = n_jobs + t.staged_n_jobs;

# compute global number of new ready jobs from root job group
INSERT INTO user_inst_coll_resources (user, inst_coll, token, n_ready_jobs, ready_cores_mcpu)
SELECT user, inst_coll, 0, @n_ready_jobs := COALESCE(SUM(n_ready_jobs), 0), @ready_cores_mcpu := COALESCE(SUM(ready_cores_mcpu), 0)
FROM job_groups_inst_coll_staging
JOIN batches ON batches.id = job_groups_inst_coll_staging.batch_id
WHERE batch_id = in_batch_id AND update_id = in_update_id AND job_group_id = 0
GROUP BY `user`, inst_coll
ON DUPLICATE KEY UPDATE
n_ready_jobs = n_ready_jobs + @n_ready_jobs,
ready_cores_mcpu = ready_cores_mcpu + @ready_cores_mcpu;

DELETE FROM job_groups_inst_coll_staging WHERE batch_id = in_batch_id AND update_id = in_update_id;

IF in_update_id != 1 THEN
SELECT start_job_id INTO cur_update_start_job_id FROM batch_updates WHERE batch_id = in_batch_id AND update_id = in_update_id;

UPDATE jobs
LEFT JOIN `jobs_telemetry` ON `jobs_telemetry`.batch_id = jobs.batch_id AND `jobs_telemetry`.job_id = jobs.job_id
LEFT JOIN (
SELECT `job_parents`.batch_id, `job_parents`.job_id,
COALESCE(SUM(1), 0) AS n_parents,
COALESCE(SUM(state IN ('Pending', 'Ready', 'Creating', 'Running')), 0) AS n_pending_parents,
COALESCE(SUM(state = 'Success'), 0) AS n_succeeded
FROM `job_parents`
LEFT JOIN `jobs` ON jobs.batch_id = `job_parents`.batch_id AND jobs.job_id = `job_parents`.parent_id
WHERE job_parents.batch_id = in_batch_id AND
`job_parents`.job_id >= cur_update_start_job_id AND
`job_parents`.job_id < cur_update_start_job_id + staging_n_jobs
GROUP BY `job_parents`.batch_id, `job_parents`.job_id
FOR UPDATE
) AS t
ON jobs.batch_id = t.batch_id AND
jobs.job_id = t.job_id
SET jobs.state = IF(COALESCE(t.n_pending_parents, 0) = 0, 'Ready', 'Pending'),
jobs.n_pending_parents = COALESCE(t.n_pending_parents, 0),
jobs.cancelled = IF(COALESCE(t.n_succeeded, 0) = COALESCE(t.n_parents - t.n_pending_parents, 0), jobs.cancelled, 1),
jobs_telemetry.time_ready = IF(COALESCE(t.n_pending_parents, 0) = 0 AND jobs_telemetry.time_ready IS NULL, in_timestamp, jobs_telemetry.time_ready)
WHERE jobs.batch_id = in_batch_id AND jobs.job_id >= cur_update_start_job_id AND
jobs.job_id < cur_update_start_job_id + staging_n_jobs;
END IF;

COMMIT;
SELECT 0 as rc;
ELSE
ROLLBACK;
SELECT 1 as rc, expected_n_jobs, staging_n_jobs as actual_n_jobs, 'wrong number of jobs' as message;
END IF;
END IF;
END $$

DROP PROCEDURE IF EXISTS commit_batch_update $$
CREATE PROCEDURE commit_batch_update(
IN in_batch_id BIGINT,
IN in_update_id INT,
IN in_timestamp BIGINT
)
BEGIN
DECLARE cur_update_committed BOOLEAN;
DECLARE expected_n_jobs INT;
DECLARE staging_n_jobs INT;
DECLARE cur_update_start_job_id INT;

START TRANSACTION;

SELECT committed, n_jobs INTO cur_update_committed, expected_n_jobs
FROM batch_updates
WHERE batch_id = in_batch_id AND update_id = in_update_id
FOR UPDATE;

IF cur_update_committed THEN
COMMIT;
SELECT 0 as rc;
ELSE
SELECT COALESCE(SUM(n_jobs), 0) INTO staging_n_jobs
FROM job_groups_inst_coll_staging
WHERE batch_id = in_batch_id AND update_id = in_update_id
FOR UPDATE;

IF staging_n_jobs = expected_n_jobs THEN
UPDATE batch_updates
SET committed = 1, time_committed = in_timestamp
WHERE batch_id = in_batch_id AND update_id = in_update_id;

UPDATE batches SET
`state` = 'running',
time_completed = NULL,
n_jobs = n_jobs + expected_n_jobs
WHERE id = in_batch_id;

UPDATE job_groups
INNER JOIN (
SELECT batch_id, job_group_id, CAST(COALESCE(SUM(n_jobs), 0) AS SIGNED) AS staged_n_jobs
FROM job_groups_inst_coll_staging
WHERE batch_id = in_batch_id AND update_id = in_update_id
GROUP BY batch_id, job_group_id
) AS t ON job_groups.batch_id = t.batch_id AND job_groups.job_group_id = t.job_group_id
SET `state` = 'running', time_completed = NULL, n_jobs = n_jobs + t.staged_n_jobs;

# compute global number of new ready jobs from root job group
INSERT INTO user_inst_coll_resources (user, inst_coll, token, n_ready_jobs, ready_cores_mcpu)
SELECT user, inst_coll, 0, @n_ready_jobs := COALESCE(SUM(n_ready_jobs), 0), @ready_cores_mcpu := COALESCE(SUM(ready_cores_mcpu), 0)
FROM job_groups_inst_coll_staging
JOIN batches ON batches.id = job_groups_inst_coll_staging.batch_id
WHERE batch_id = in_batch_id AND update_id = in_update_id AND job_group_id = 0
GROUP BY `user`, inst_coll
ON DUPLICATE KEY UPDATE
n_ready_jobs = n_ready_jobs + @n_ready_jobs,
ready_cores_mcpu = ready_cores_mcpu + @ready_cores_mcpu;

DELETE FROM job_groups_inst_coll_staging WHERE batch_id = in_batch_id AND update_id = in_update_id;

IF in_update_id != 1 THEN
SELECT start_job_id INTO cur_update_start_job_id FROM batch_updates WHERE batch_id = in_batch_id AND update_id = in_update_id;

UPDATE jobs
LEFT JOIN `jobs_telemetry` ON `jobs_telemetry`.batch_id = jobs.batch_id AND `jobs_telemetry`.job_id = jobs.job_id
LEFT JOIN (
SELECT `job_parents`.batch_id, `job_parents`.job_id,
COALESCE(SUM(1), 0) AS n_parents,
COALESCE(SUM(state IN ('Pending', 'Ready', 'Creating', 'Running')), 0) AS n_pending_parents,
COALESCE(SUM(state = 'Success'), 0) AS n_succeeded
FROM `job_parents`
LEFT JOIN `jobs` ON jobs.batch_id = `job_parents`.batch_id AND jobs.job_id = `job_parents`.parent_id
WHERE job_parents.batch_id = in_batch_id AND
`job_parents`.job_id >= cur_update_start_job_id AND
`job_parents`.job_id < cur_update_start_job_id + staging_n_jobs
GROUP BY `job_parents`.batch_id, `job_parents`.job_id
FOR UPDATE
) AS t
ON jobs.batch_id = t.batch_id AND
jobs.job_id = t.job_id
SET jobs.state = IF(COALESCE(t.n_pending_parents, 0) = 0, 'Ready', 'Pending'),
jobs.n_pending_parents = COALESCE(t.n_pending_parents, 0),
jobs.cancelled = IF(COALESCE(t.n_succeeded, 0) = COALESCE(t.n_parents - t.n_pending_parents, 0), jobs.cancelled, 1),
jobs_telemetry.time_ready = IF(COALESCE(t.n_pending_parents, 0) = 0 AND jobs_telemetry.time_ready IS NULL, in_timestamp, jobs_telemetry.time_ready)
WHERE jobs.batch_id = in_batch_id AND jobs.job_id >= cur_update_start_job_id AND
jobs.job_id < cur_update_start_job_id + staging_n_jobs;
END IF;

COMMIT;
SELECT 0 as rc;
ELSE
ROLLBACK;
SELECT 1 as rc, expected_n_jobs, staging_n_jobs as actual_n_jobs, 'wrong number of jobs' as message;
END IF;
END IF;
END $$

DELIMITER ;

COMMIT;
4 changes: 2 additions & 2 deletions batch/test/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1939,9 +1939,9 @@ def test_get_job_group_from_client_batch(client: BatchClient):
b_copy = client.get_batch(b.id)
jg_copy = b_copy.get_job_group(jg.job_group_id)
jg_copy.create_job(DOCKER_ROOT_IMAGE, ['true'])
b.submit()
b_copy.submit()
status = jg_copy.wait()
assert status['n_jobs'] == 1, str(b.debug_info())
assert status['n_jobs'] == 1, str(status)


def test_cancellation_doesnt_cancel_other_job_groups(client: BatchClient):
Expand Down
22 changes: 8 additions & 14 deletions hail/python/hailtop/batch_client/aioclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,20 +955,20 @@ async def _update_fast(
job_progress_task: BatchProgressBarTask,
) -> Tuple[int, int]:
self._raise_if_not_created()
byte_job_specs = [spec.spec_bytes for spec in byte_specs_bunch if spec.typ == SpecType.JOB]
byte_job_group_specs = [spec.spec_bytes for spec in byte_specs_bunch if spec.typ == SpecType.JOB_GROUP]
byte_job_specs = [spec.spec_bytes for spec in byte_specs_bunch if spec.typ == SpecType.JOB]

b = bytearray()
b.extend(b'{"bunch":')
b.extend(b'{"job_groups":')
b.append(ord('['))
for i, spec in enumerate(byte_job_specs):
for i, spec in enumerate(byte_job_group_specs):
if i > 0:
b.append(ord(','))
b.extend(spec)
b.append(ord(']'))
b.extend(b',"job_groups":')
b.extend(b',"bunch":')
b.append(ord('['))
for i, spec in enumerate(byte_job_group_specs):
for i, spec in enumerate(byte_job_specs):
if i > 0:
b.append(ord(','))
b.extend(spec)
Expand All @@ -992,17 +992,15 @@ def _create_bunches(
job_specs: List[dict],
max_bunch_bytesize: int,
max_bunch_size: int,
) -> Tuple[List[List[SpecBytes]], List[int]]:
) -> List[List[SpecBytes]]:
assert max_bunch_bytesize > 0
assert max_bunch_size > 0
job_group_byte_specs = [SpecBytes(orjson.dumps(spec), SpecType.JOB_GROUP) for spec in job_group_specs]
job_byte_specs = [SpecBytes(orjson.dumps(spec), SpecType.JOB) for spec in job_specs]

byte_specs_bunches: List[List[SpecBytes]] = []
bunch_sizes = []
bunch: List[SpecBytes] = []
bunch_n_bytes = 0
bunch_n_specs = 0
for spec in [*job_group_byte_specs, *job_byte_specs]:
n_bytes = spec.n_bytes
assert n_bytes < max_bunch_bytesize, (
Expand All @@ -1012,18 +1010,14 @@ def _create_bunches(
if bunch_n_bytes + n_bytes < max_bunch_bytesize and len(bunch) < max_bunch_size:
bunch.append(spec)
bunch_n_bytes += n_bytes
bunch_n_specs += 1
else:
byte_specs_bunches.append(bunch)
bunch_sizes.append(bunch_n_specs)
bunch = [spec]
bunch_n_bytes = n_bytes
bunch_n_specs = 1
if bunch:
byte_specs_bunches.append(bunch)
bunch_sizes.append(bunch_n_specs)

return (byte_specs_bunches, bunch_sizes)
return byte_specs_bunches

async def _submit_spec_bunch(self, url: str, byte_spec_bunch: List[bytes], progress_task: BatchProgressBarTask):
self._raise_if_not_created()
Expand Down Expand Up @@ -1143,7 +1137,7 @@ async def _submit(
) -> Tuple[Optional[int], Optional[int]]:
n_job_groups = len(self._job_groups)
n_jobs = len(self._jobs)
byte_specs_bunches, bunch_sizes = self._create_bunches(
byte_specs_bunches = self._create_bunches(
self._job_group_specs, self._job_specs, max_bunch_bytesize, max_bunch_size
)
n_bunches = len(byte_specs_bunches)
Expand Down

0 comments on commit 229d8b6

Please sign in to comment.