Skip to content

Commit

Permalink
PB-756: API to upload collection assets
Browse files Browse the repository at this point in the history
Add upload endpoints for collection assets. Mostly a duplication of the already
existing multipart upload endpoints.
  • Loading branch information
benschs committed Sep 9, 2024
1 parent 9ec93f0 commit cd4977a
Show file tree
Hide file tree
Showing 10 changed files with 2,103 additions and 37 deletions.
137 changes: 137 additions & 0 deletions app/stac_api/migrations/0050_collectionassetupload_and_more.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Generated by Django 5.0.8 on 2024-09-09 10:59

import pgtrigger.compiler
import pgtrigger.migrations

import django.core.serializers.json
import django.core.validators
import django.db.models.deletion
from django.db import migrations
from django.db import models

import stac_api.models


class Migration(migrations.Migration):

dependencies = [
('stac_api', '0049_item_properties_expires'),
]

operations = [
migrations.CreateModel(
name='CollectionAssetUpload',
fields=[
('id', models.BigAutoField(primary_key=True, serialize=False)),
('upload_id', models.CharField(max_length=255)),
(
'status',
models.CharField(
choices=[(None, ''), ('in-progress', 'In Progress'),
('completed', 'Completed'), ('aborted', 'Aborted')],
default='in-progress',
max_length=32
)
),
(
'number_parts',
models.IntegerField(
validators=[
django.core.validators.MinValueValidator(1),
django.core.validators.MaxValueValidator(100)
]
)
),
(
'md5_parts',
models.JSONField(
editable=False, encoder=django.core.serializers.json.DjangoJSONEncoder
)
),
(
'urls',
models.JSONField(
blank=True,
default=list,
encoder=django.core.serializers.json.DjangoJSONEncoder
)
),
('created', models.DateTimeField(auto_now_add=True)),
('ended', models.DateTimeField(blank=True, default=None, null=True)),
('checksum_multihash', models.CharField(max_length=255)),
('etag', models.CharField(default=stac_api.models.compute_etag, max_length=56)),
(
'update_interval',
models.IntegerField(
default=-1,
help_text=
'Interval in seconds in which the asset data is updated.-1 means that the data is not on a regular basis updated.This field can only be set via the API.',
validators=[django.core.validators.MinValueValidator(-1)]
)
),
(
'content_encoding',
models.CharField(
blank=True,
choices=[(None, ''), ('gzip', 'Gzip'), ('br', 'Br')],
default='',
max_length=32
)
),
(
'asset',
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name='+',
to='stac_api.collectionasset'
)
),
],
),
migrations.AddConstraint(
model_name='collectionassetupload',
constraint=models.UniqueConstraint(
fields=('asset', 'upload_id'),
name='unique_asset_upload_collection_asset_upload_id'
),
),
migrations.AddConstraint(
model_name='collectionassetupload',
constraint=models.UniqueConstraint(
condition=models.Q(('status', 'in-progress')),
fields=('asset', 'status'),
name='unique_asset_upload_in_progress'
),
),
pgtrigger.migrations.AddTrigger(
model_name='collectionassetupload',
trigger=pgtrigger.compiler.Trigger(
name='add_asset_upload_trigger',
sql=pgtrigger.compiler.UpsertTriggerSql(
func=
'\n -- update AssetUpload auto variable\n NEW.etag = public.gen_random_uuid();\n\n RETURN NEW;\n ',
hash='5f51ec3c72c4d9fbe6b81d2fd881dd5228dc80bf',
operation='INSERT',
pgid='pgtrigger_add_asset_upload_trigger_8330c',
table='stac_api_collectionassetupload',
when='BEFORE'
)
),
),
pgtrigger.migrations.AddTrigger(
model_name='collectionassetupload',
trigger=pgtrigger.compiler.Trigger(
name='update_asset_upload_trigger',
sql=pgtrigger.compiler.UpsertTriggerSql(
condition='WHEN (OLD.* IS DISTINCT FROM NEW.*)',
func=
'\n -- update AssetUpload auto variable\n NEW.etag = public.gen_random_uuid();\n\n RETURN NEW;\n ',
hash='0a7f1aa8f8c0bb2c413a7ce626f75c8da5bf4b6d',
operation='UPDATE',
pgid='pgtrigger_update_asset_upload_trigger_8d012',
table='stac_api_collectionassetupload',
when='BEFORE'
)
),
),
]
76 changes: 64 additions & 12 deletions app/stac_api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,19 +710,10 @@ def get_asset_path(self):
return get_collection_asset_path(self.collection, self.name)


class AssetUpload(models.Model):
class BaseAssetUpload(models.Model):

class Meta:
constraints = [
models.UniqueConstraint(fields=['asset', 'upload_id'], name='unique_together'),
# Make sure that there is only one asset upload in progress per asset
models.UniqueConstraint(
fields=['asset', 'status'],
condition=Q(status='in-progress'),
name='unique_in_progress'
)
]
triggers = generates_asset_upload_triggers()
abstract = True

class Status(models.TextChoices):
# pylint: disable=invalid-name
Expand All @@ -741,7 +732,6 @@ class ContentEncoding(models.TextChoices):

# using BigIntegerField as primary_key to deal with the expected large number of assets.
id = models.BigAutoField(primary_key=True)
asset = models.ForeignKey(Asset, related_name='+', on_delete=models.CASCADE)
upload_id = models.CharField(max_length=255, blank=False, null=False)
status = models.CharField(
choices=Status.choices, max_length=32, default=Status.IN_PROGRESS, blank=False, null=False
Expand Down Expand Up @@ -777,6 +767,23 @@ class ContentEncoding(models.TextChoices):
# Custom Manager that preselects the collection
objects = AssetUploadManager()


class AssetUpload(BaseAssetUpload):

class Meta:
constraints = [
models.UniqueConstraint(fields=['asset', 'upload_id'], name='unique_together'),
# Make sure that there is only one asset upload in progress per asset
models.UniqueConstraint(
fields=['asset', 'status'],
condition=Q(status='in-progress'),
name='unique_in_progress'
)
]
triggers = generates_asset_upload_triggers()

asset = models.ForeignKey(Asset, related_name='+', on_delete=models.CASCADE)

def update_asset_from_upload(self):
'''Updating the asset's file:checksum and update_interval from the upload
Expand Down Expand Up @@ -804,6 +811,51 @@ def update_asset_from_upload(self):
self.asset.save()


class CollectionAssetUpload(BaseAssetUpload):

class Meta:
constraints = [
models.UniqueConstraint(
fields=['asset', 'upload_id'],
name='unique_asset_upload_collection_asset_upload_id'
),
# Make sure that there is only one asset upload in progress per asset
models.UniqueConstraint(
fields=['asset', 'status'],
condition=Q(status='in-progress'),
name='unique_asset_upload_in_progress'
)
]
triggers = generates_asset_upload_triggers()

asset = models.ForeignKey(CollectionAsset, related_name='+', on_delete=models.CASCADE)

def update_asset_from_upload(self):
'''Updating the asset's file:checksum and update_interval from the upload
When the upload is completed, the new file:checksum and update interval from the upload
is set to its asset parent.
'''
logger.debug(
'Updating asset %s file:checksum from %s to %s and update_interval from %d to %d '
'due to upload complete',
self.asset.name,
self.asset.checksum_multihash,
self.checksum_multihash,
self.asset.update_interval,
self.update_interval,
extra={
'upload_id': self.upload_id,
'asset': self.asset.name,
'collection': self.asset.collection.name
}
)

self.asset.checksum_multihash = self.checksum_multihash
self.asset.update_interval = self.update_interval
self.asset.save()


class CountBase(models.Model):
'''CountBase tables are used to help calculate the summary on a collection.
This is only performant if the distinct number of values is small, e.g. we currently only have
Expand Down
46 changes: 22 additions & 24 deletions app/stac_api/s3_multipart_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,24 @@ def list_multipart_uploads(self, key=None, limit=100, start=None):
response.get('NextUploadIdMarker', None),
)

def log_extra(self, asset, upload_id=None, parts=None):
if hasattr(asset, 'item'):
log_extra = {
'collection': asset.item.collection.name,
'item': asset.item.name,
'asset': asset.name,
}
else:
log_extra = {
'collection': asset.collection.name,
'asset': asset.name,
}
if upload_id is not None:
log_extra['upload_id'] = upload_id
if parts is not None:
log_extra['parts'] = parts
return log_extra

def create_multipart_upload(
self, key, asset, checksum_multihash, update_interval, content_encoding
):
Expand Down Expand Up @@ -99,11 +117,7 @@ def create_multipart_upload(
CacheControl=get_s3_cache_control_value(update_interval),
ContentType=asset.media_type,
**extra_params,
log_extra={
'collection': asset.item.collection.name,
'item': asset.item.name,
'asset': asset.name
}
log_extra=self.log_extra(asset)
)
logger.info(
'S3 Multipart upload successfully created: upload_id=%s',
Expand Down Expand Up @@ -148,12 +162,7 @@ def create_presigned_url(self, key, asset, part, upload_id, part_md5):
Params=params,
ExpiresIn=settings.AWS_PRESIGNED_URL_EXPIRES,
HttpMethod='PUT',
log_extra={
'collection': asset.item.collection.name,
'item': asset.item.name,
'asset': asset.name,
'upload_id': upload_id
}
log_extra=self.log_extra(asset, upload_id=upload_id)
)

logger.info(
Expand Down Expand Up @@ -191,13 +200,7 @@ def complete_multipart_upload(self, key, asset, parts, upload_id):
Key=key,
MultipartUpload={'Parts': parts},
UploadId=upload_id,
log_extra={
'parts': parts,
'upload_id': upload_id,
'collection': asset.item.collection.name,
'item': asset.item.name,
'asset': asset.name
}
log_extra=self.log_extra(asset, upload_id=upload_id, parts=parts)
)
except ClientError as error:
raise serializers.ValidationError(str(error)) from None
Expand Down Expand Up @@ -271,12 +274,7 @@ def list_upload_parts(self, key, asset, upload_id, limit, offset):
UploadId=upload_id,
MaxParts=limit,
PartNumberMarker=offset,
log_extra={
'collection': asset.item.collection.name,
'item': asset.item.name,
'asset': asset.name,
'upload_id': upload_id
}
log_extra=self.log_extra(asset, upload_id=upload_id)
)
return response, response.get('IsTruncated', False)

Expand Down
Loading

0 comments on commit cd4977a

Please sign in to comment.