Skip to content

Commit

Permalink
Add CLI tool to fetch AIPs (#203)
Browse files Browse the repository at this point in the history
Added a tool to fetch AIPs, either all at once or in "pages".
  • Loading branch information
mcantelon committed Nov 7, 2023
1 parent 1657ff7 commit babe75b
Show file tree
Hide file tree
Showing 3 changed files with 316 additions and 1 deletion.
48 changes: 47 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,49 @@ adminsitrators.
The test data generator, `tools/generate-test-data`, tool populates
AIPscan's databse with randomly generated example data.

#### Fetch script

The AIP fetch tool, `tools/fetch_aips`, allows all, or a subset, of a storage
service's packages to be fetched by AIPscan. Any AIPs not yet fetched by
AIPscan will be added but no duplicates will be added if an AIP has already
been fetched. Any AIPs that have been newly marked as deleted will be removed
from AIPscan.

When using the script the storage service's list of packages can optionally be
grouped into "pages" with each "page" containing a number of packages
(specified by a command-line argument). So, for example, packages on a storage
service with 150 packages on it could be fetched by fetching three pages of 50
packages. Likewise if the storage service has anything from 101 to 149 packages
on it it could also be fetched by fetching three pages of 50 packages.

If using `cron`, or some other scheduler, to automatically fetch AIPs using
this tool consider using the `flock` command to prevent overlapping executions
of the tool.

##### Cached package list

A storage service's list of packages is downloaded by the script and is cached
so paging, if used, will remain consistent between script runs. The cache of a
particular cached list of packages is identified by a "session descriptor". A
session descriptor is specified by whoever runs the script and can be any
alphanumeric identifier without spaces or special characters. It's used to name
the directory in which fetch-related files are created.

Below is what the directory structure would end up looking like if the session
identifier "somedescriptor" was used, showing where the `packages.json` file,
containing the list of a storage service's packages, would be put.

AIPscan/Aggregator/downloads/somedescriptor
├── mets
│   └── batch
└── packages
└── packages.json

**NOTE:** Each run of the script will generate a new fetch job database entry.
These individual fetch jobs shouldn't be deleted, via the AIPscan web UI,
until all fetch jobs (for each "page") have run. Otherwise the cached list of
packages will be deleted and the package list will have to be downloaded again.

### Running tools

These should be run using the same system user and virtual environment that
Expand All @@ -175,7 +218,10 @@ Here's how you would run the `generate-test-data` tool, for example:
$ cd <path to AIPscan base directory>
$ sudo -u <AIPscan system user> /bin/bash
$ source <path to AIPscan virtual environment>/bin/activate
$ ./tools/generate-test-data.py
$ ./tools/generate-test-data

In order to display a tool's CLI arguments and options, enter `<path to tool>
--help`.


# Usage
Expand Down
149 changes: 149 additions & 0 deletions tools/fetch_aips
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#!/usr/bin/env python3
import logging
import os
import sys
from datetime import datetime

import click
from app import init
from helpers import fetch

from AIPscan import db
from AIPscan.Aggregator import database_helpers
from AIPscan.Aggregator.task_helpers import create_numbered_subdirs
from AIPscan.models import StorageService
from config import CONFIGS


@click.command()
@click.option("--ss-id", "-s", required=True, help="Storage service ID", type=int)
@click.option("--session-id", "-i", required=True, help="Session descriptor", type=str)
@click.option(
"--page", "-p", help="Page (if not set then all AIPs will be fetched)", type=int
)
@click.option(
"--packages-per-page",
"-n",
default=1000,
help="Packages per page (default 1000)",
type=int,
)
@click.option("--logfile", "-l", default=None, help="Filepath to log to", type=str)
def main(ss_id, session_id, page, packages_per_page, logfile):
# Set up logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s:%(levelname)s:%(name)s:%(message)s",
filename=logfile,
)

logger_name = os.path.basename(sys.argv[0])
logger = logging.getLogger(logger_name)
logger.setLevel(logging.INFO)

root = logging.getLogger()
root.setLevel(logging.INFO)

# Initialize Flask app context
app = init.create_app_instance(CONFIGS[init.config_name], db)

package_list_no = "batch"
with app.app_context():

# Get storage service configuration
storage_service = StorageService.query.get(ss_id)

if storage_service is None:
message = f"Storage service ID {ss_id} not found."

logger.critical(message)

err = click.ClickException(message)
err.exit_code = 1
raise err

# Create a fetch_job record in the AIPscan database and take note of its ID
datetime_obj_start = datetime.now().replace(microsecond=0)

fetch_job = database_helpers.create_fetch_job(
datetime_obj_start, session_id, ss_id
)
fetch_job_id = fetch_job.id

logger.info(f"Created fetch job record {fetch_job_id}.")

# Create directories, if need be
packages_dir = fetch.create_packages_directory(session_id)
fetch.create_mets_directory(session_id)
create_numbered_subdirs(session_id, package_list_no)

# Get package data
packages = fetch.get_packages(storage_service, packages_dir)

# Determine start and end package
total_packages = len(packages["objects"])

if page is None:
start_item = 1
end_item = total_packages
else:
start_item = ((page - 1) * packages_per_page) + 1
end_item = start_item + packages_per_page - 1

# Describe start and end package
if total_packages < end_item:
end_item = total_packages

# Make sure paging window is valid and delete fetch job if not
if start_item > total_packages:
db.session.delete(fetch_job)
db.session.commit()

message = f"Fetch job deleted. Page {page} would start at package {start_item} but there are only {total_packages} packages."

logger.error(message)

err = click.ClickException(message)
err.exit_code = 1
raise err

logger.info(
f"Processing packages {start_item} to {end_item} of {total_packages}."
)

# Import packages
api_url = fetch.assemble_api_url_dict(storage_service)

processed_packages = fetch.import_packages(
packages,
start_item,
end_item,
api_url,
ss_id,
session_id,
package_list_no,
fetch_job_id,
packages_per_page,
logger,
)

fetch_job = database_helpers.update_fetch_job(
fetch_job_id, processed_packages, total_packages
)

summary = "aips: '{}'; sips: '{}'; dips: '{}'; deleted: '{}'; replicated: '{}'".format(
fetch_job.total_aips,
fetch_job.total_sips,
fetch_job.total_dips,
fetch_job.total_deleted_aips,
fetch_job.total_replicas,
)
logger.info("%s", summary)
logger.info(
f"Updated fetch job record {fetch_job_id} with package type counts."
)
logger.info("Processing complete.")


if __name__ == "__main__":
main()
120 changes: 120 additions & 0 deletions tools/helpers/fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import json
import os

from AIPscan.Aggregator import database_helpers
from AIPscan.Aggregator.task_helpers import (
format_api_url_with_limit_offset,
get_packages_directory,
process_package_object,
)
from AIPscan.Aggregator.tasks import delete_aip, get_mets, make_request


def assemble_api_url_dict(storage_service, offset=0, limit=1_000_000):
return {
"baseUrl": storage_service.url,
"userName": storage_service.user_name,
"apiKey": storage_service.api_key,
"offset": offset,
"limit": limit,
}


def fetch_and_write_packages(storage_service, package_filename):
api_url = assemble_api_url_dict(storage_service)

(_, request_url_without_api_key, request_url) = format_api_url_with_limit_offset(
api_url
)

packages = make_request(request_url, request_url_without_api_key)
with open(package_filename, "w", encoding="utf-8") as f:
json.dump(packages, f)

return packages


def create_packages_directory(timestamp_str):
packages_dir = get_packages_directory(timestamp_str)
if not os.path.isdir(packages_dir):
os.makedirs(packages_dir)

return packages_dir


def create_mets_directory(timestamp_str):
mets_dir = os.path.join("AIPscan/Aggregator/downloads", timestamp_str, "mets")
if not os.path.isdir(mets_dir):
os.makedirs(mets_dir)


def get_packages(storage_service, packages_dir):
package_filename = os.path.join(packages_dir, "packages.json")

if os.path.isfile(package_filename):
with open(package_filename) as f:
packages = json.load(f)
else:
packages = fetch_and_write_packages(storage_service, package_filename)

return packages


def import_packages(
packages,
start_item,
end_item,
api_url,
storage_service_id,
timestamp_str,
package_list_no,
fetch_job_id,
packages_per_page,
logger,
):
processed_packages = []

package_count = 0
for package_obj in packages["objects"]:
package_count += 1

package = process_package_object(package_obj)

if package_count >= start_item and package_count <= end_item:
# Calculate current item being processed
current_item = start_item + len(processed_packages)
logger.info(f"Processing {package.uuid} ({current_item} of {end_item})")

processed_packages.append(package)

if package.is_deleted():
delete_aip(package.uuid)
continue

if not package.is_aip():
continue

storage_location = database_helpers.create_or_update_storage_location(
package.current_location, api_url, storage_service_id
)

pipeline = database_helpers.create_or_update_pipeline(
package.origin_pipeline, api_url
)

args = [
package.uuid,
package.size,
package.get_relative_path(),
api_url,
timestamp_str,
package_list_no,
storage_service_id,
storage_location.id,
pipeline.id,
fetch_job_id,
logger,
]
get_mets.apply(args=args)

return processed_packages

0 comments on commit babe75b

Please sign in to comment.