From 985824d05d0412fdef32aa20b1134885ae2a8370 Mon Sep 17 00:00:00 2001 From: Mrunmay Shelar Date: Sat, 7 Oct 2023 20:11:44 +0530 Subject: [PATCH] chore: take in onnx file from s3 --- dozer-types/src/models/udf_config.rs | 65 +++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/dozer-types/src/models/udf_config.rs b/dozer-types/src/models/udf_config.rs index 914e6091e7..dc265bea88 100644 --- a/dozer-types/src/models/udf_config.rs +++ b/dozer-types/src/models/udf_config.rs @@ -2,6 +2,15 @@ use schemars::JsonSchema; use crate::serde::{Deserialize, Serialize}; +use aws_sdk_s3::{ + operation::create_bucket::CreateBucketError, + types::{ + CompletedMultipartUpload, CompletedPart, CreateBucketConfiguration, Delete, + ObjectIdentifier, + }, + Client, +}; + #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] #[serde(deny_unknown_fields)] pub struct UdfConfig { @@ -21,5 +30,59 @@ pub enum UdfType { #[serde(deny_unknown_fields)] pub struct OnnxConfig { /// path to the model file - pub path: String, + pub s3_storage: S3Storage, +} + +pub struct S3Storage { + client: Client, + region: BucketLocationConstraint, + bucket_name: String, } + + +pub use aws_sdk_s3::types::BucketLocationConstraint; + +impl S3Storage { + pub async fn new(region: BucketLocationConstraint, bucket_name: String) -> Result { + let config = aws_config::from_env().load().await; + let client = Client::new(&config); + let create_bucket_configuration = CreateBucketConfiguration::builder() + .location_constraint(region.clone()) + .build(); + if let Err(e) = client + .create_bucket() + .bucket(&bucket_name) + .create_bucket_configuration(create_bucket_configuration) + .send() + .await + { + if !is_bucket_already_owned_by_you(&e) { + return Err(e.into()); + } + } + Ok(Self { + client, + region, + bucket_name, + }) + } + + pub async fn delete(self) -> Result<(), Error> { + loop { + let objects = self.list_objects(String::new(), None).await?.objects; + if objects.is_empty() { + return self + .client + .delete_bucket() + .bucket(&self.bucket_name) + .send() + .await + .map_err(Into::into) + .map(|_| ()); + } + + self.delete_objects(objects.into_iter().map(|object| object.key).collect()) + .await?; + } + } +} \ No newline at end of file