Skip to content

Commit

Permalink
list s3 dirs programmatically
Browse files Browse the repository at this point in the history
  • Loading branch information
e-maud committed Aug 16, 2024
1 parent 192da84 commit 7bb9cb6
Showing 1 changed file with 88 additions and 18 deletions.
106 changes: 88 additions & 18 deletions impresso_commons/utils/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@
"""

import os
import logging
import bz2
import json
import logging
import os
import warnings
import bz2
from typing import Union

import boto3
from smart_open.s3 import iter_bucket
from smart_open import open as s_open
from dotenv import load_dotenv
import botocore
from dotenv import load_dotenv
from smart_open import open as s_open
from smart_open.s3 import iter_bucket

from impresso_commons.utils import _get_cores
from utils.utils import bytes_to

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,7 +47,6 @@ def get_storage_options():


def get_s3_client(host_url="https://os.zhdk.cloud.switch.ch/"):

# load environment variables from local .env files
load_dotenv()
if host_url is None:
Expand Down Expand Up @@ -324,7 +325,6 @@ def get_s3_versions(bucket_name, key_name):


def get_s3_versions_client(client, bucket_name, key_name):

versions = client.Bucket(bucket_name).object_versions.filter(Prefix=key_name)

version_ids = [
Expand Down Expand Up @@ -385,19 +385,18 @@ def readtext_jsonlines(key_name, bucket_name):
k: article_json[k]
for k in article_json
if k == "id"
or k == "s3v"
or k == "ts"
or k == "ft"
or k == "tp"
or k == "pp"
or k == "lg"
or k == "t"
or k == "s3v"
or k == "ts"
or k == "ft"
or k == "tp"
or k == "pp"
or k == "lg"
or k == "t"
}
yield json.dumps(article_reduced)


def upload(partition_name, newspaper_prefix=None, bucket_name=None):

if newspaper_prefix is not None:
key_name = os.path.join("/", newspaper_prefix, partition_name.split("/")[-1])
else:
Expand Down Expand Up @@ -453,14 +452,15 @@ def fixed_s3fs_glob(path: str, boto3_bucket=None):
"""
if boto3_bucket is None:
if path.startswith("s3://"):
path = path[len("s3://") :]
path = path[len("s3://"):]
bucket_name = path.split("/")[0]
base_path = "/".join(path.split("/")[1:]) # Remove bucket name
boto3_bucket = get_boto3_bucket(bucket_name)
else:
bucket_name = boto3_bucket.name
base_path = path
base_path, suffix_path = base_path.split("*")

filenames = [
"s3://"
+ os.path.join(
Expand All @@ -469,11 +469,52 @@ def fixed_s3fs_glob(path: str, boto3_bucket=None):
for o in boto3_bucket.objects.filter(Prefix=base_path)
if o.key.endswith(suffix_path)
]

return filenames


def s3_glob_with_size(path: str, boto3_bucket=None):
"""
Custom glob function to list S3 objects matching a pattern. This function
works around the 1000-object listing limit in S3 by using boto3 directly.
Args:
path (str): The S3 path with a wildcard (*) to match files.
Example: 's3://bucket_name/path/to/files/*.txt'.
boto3_bucket (boto3.Bucket, optional): An optional boto3 Bucket object.
If not provided, it will be
created from the path.
Returns:
list: A list of tuples containing the full S3 paths of matching files
and their sizes in megabytes.
"""
if boto3_bucket is None:
if path.startswith("s3://"):
path = path[len("s3://"):]
bucket_name = path.split("/")[0]
base_path = "/".join(path.split("/")[1:]) # Remove bucket name
boto3_bucket = get_boto3_bucket(bucket_name)
else:
bucket_name = boto3_bucket.name
base_path = path

base_path, suffix_path = base_path.split("*")

filenames = [
(
"s3://" + os.path.join(bucket_name, o.key),
round(bytes_to(o.size, "m"), 6)
)
for o in boto3_bucket.objects.filter(Prefix=base_path)
if o.key.endswith(suffix_path)
]

return filenames


def alternative_read_text(
s3_key: str, s3_credentials: dict, line_by_line: bool = True
s3_key: str, s3_credentials: dict, line_by_line: bool = True
) -> Union[list[str], str]:
"""Read from S3 a line-separated text file (e.g. `*.jsonl.bz2`).
Expand Down Expand Up @@ -502,6 +543,35 @@ def alternative_read_text(
return text


def list_s3_directories(bucket_name, prefix=''):
"""
Retrieve the 'directory' names (media titles) in an S3 bucket after a
given path prefix.
Args:
bucket_name (str): The name of the S3 bucket.
prefix (str): The prefix path within the bucket to search. Default
is the root ('').
Returns:
list: A list of 'directory' names found in the specified bucket
and prefix.
"""
s3 = get_s3_client()
result = s3.list_objects_v2(
Bucket=bucket_name, Prefix=prefix, Delimiter='/'
)

directories = []
if 'CommonPrefixes' in result:
directories = [
prefix['Prefix'][:-1].split("/")[-1]
for prefix in result['CommonPrefixes']
]

return directories


def get_s3_object_size(bucket_name, key):
"""
Get the size of an object (key) in an S3 bucket.
Expand Down

0 comments on commit 7bb9cb6

Please sign in to comment.