{hasPagination && (
diff --git a/catalog/app/containers/Bucket/Queries/Athena/model/requests.spec.ts b/catalog/app/containers/Bucket/Queries/Athena/model/requests.spec.ts
index bf8a0793922..4faf6dbbcee 100644
--- a/catalog/app/containers/Bucket/Queries/Athena/model/requests.spec.ts
+++ b/catalog/app/containers/Bucket/Queries/Athena/model/requests.spec.ts
@@ -852,6 +852,48 @@ describe('containers/Bucket/Queries/Athena/model/requests', () => {
unmount()
})
})
+
+ it('return "not ready" if database is not ready', async () => {
+ startQueryExecution.mockImplementation(
+ reqThen
(() => ({})),
+ )
+ await act(async () => {
+ const { result, unmount, waitForNextUpdate } = renderHook(() =>
+ requests.useQueryRun({
+ workgroup: 'a',
+ catalogName: 'b',
+ database: Model.Loading,
+ queryBody: 'd',
+ }),
+ )
+ await waitForNextUpdate()
+ expect(result.current[0]).toBeUndefined()
+ unmount()
+ })
+ })
+
+ it('mark as ready to run but return error for confirmation if database is empty', async () => {
+ startQueryExecution.mockImplementation(
+ reqThen(() => ({})),
+ )
+ await act(async () => {
+ const { result, unmount, waitForValueToChange } = renderHook(() =>
+ requests.useQueryRun({
+ workgroup: 'a',
+ catalogName: 'b',
+ database: '',
+ queryBody: 'd',
+ }),
+ )
+ await waitForValueToChange(() => result.current)
+ await waitForValueToChange(() => result.current[0])
+ expect(result.current[0]).toBeNull()
+ const run = await result.current[1](false)
+ expect(run).toBeInstanceOf(Error)
+ expect(run).toBe(requests.NO_DATABASE)
+ unmount()
+ })
+ })
})
describe('useWorkgroup', () => {
@@ -1036,7 +1078,7 @@ describe('containers/Bucket/Queries/Athena/model/requests', () => {
}
})
- it('does not change query if a valid query is already selected', async () => {
+ it('retains execution query when the list is changed', async () => {
const queries = {
list: [
{ key: 'foo', name: 'Foo', body: 'SELECT * FROM foo' },
@@ -1063,8 +1105,7 @@ describe('containers/Bucket/Queries/Athena/model/requests', () => {
{
list: [
{ key: 'baz', name: 'Baz', body: 'SELECT * FROM baz' },
- { key: 'foo', name: 'Foo', body: 'SELECT * FROM foo' },
- { key: 'bar', name: 'Bar', body: 'SELECT * FROM bar' },
+ ...queries.list,
],
},
execution,
@@ -1077,6 +1118,45 @@ describe('containers/Bucket/Queries/Athena/model/requests', () => {
throw new Error('No data')
}
})
+
+ it('does not change query when list is updated if a valid query is already selected', async () => {
+ const queries = {
+ list: [
+ { key: 'foo', name: 'Foo', body: 'SELECT * FROM foo' },
+ { key: 'bar', name: 'Bar', body: 'SELECT * FROM bar' },
+ ],
+ }
+ const execution = null
+ const { result, rerender, waitForNextUpdate } = renderHook(
+ (props: Parameters) => useWrapper(props),
+ {
+ initialProps: [queries, execution],
+ },
+ )
+
+ if (Model.hasData(result.current.value)) {
+ expect(result.current.value.body).toBe('SELECT * FROM foo')
+ } else {
+ throw new Error('No data')
+ }
+ await act(async () => {
+ rerender([
+ {
+ list: [
+ { key: 'baz', name: 'Baz', body: 'SELECT * FROM baz' },
+ ...queries.list,
+ ],
+ },
+ execution,
+ ])
+ await waitForNextUpdate()
+ })
+ if (Model.hasData(result.current.value)) {
+ expect(result.current.value.body).toBe('SELECT * FROM foo')
+ } else {
+ throw new Error('No data')
+ }
+ })
})
describe('useQueryBody', () => {
@@ -1086,7 +1166,7 @@ describe('containers/Bucket/Queries/Athena/model/requests', () => {
it('sets query body from query if query is ready', () => {
const query = { name: 'Foo', key: 'foo', body: 'SELECT * FROM foo' }
- const execution = {}
+ const execution = null
const setQuery = jest.fn()
const { result } = renderHook(() => useWrapper([query, setQuery, execution]))
@@ -1098,7 +1178,7 @@ describe('containers/Bucket/Queries/Athena/model/requests', () => {
}
})
- it('sets query body from execution if query is not ready', () => {
+ it('sets query body from execution if query is not selected', () => {
const query = null
const execution = { query: 'SELECT * FROM bar' }
const setQuery = jest.fn()
@@ -1127,8 +1207,8 @@ describe('containers/Bucket/Queries/Athena/model/requests', () => {
})
it('does not change value if query and execution are both not ready', async () => {
- const query = null
- const execution = null
+ const query = undefined
+ const execution = undefined
const setQuery = jest.fn()
const { result, rerender, waitForNextUpdate } = renderHook(
@@ -1139,11 +1219,15 @@ describe('containers/Bucket/Queries/Athena/model/requests', () => {
)
expect(result.current.value).toBeUndefined()
+ // That's not possible from UI now,
+ // but let's pretend UI is ready to handle user input
act(() => {
result.current.setValue('foo')
})
expect(result.current.value).toBe('foo')
+ // We rerenderd hook but internal useEffect didn't rewrite the value
+ // to `undefined` as it was supposed to do on the first render
await act(async () => {
rerender([query, setQuery, execution])
await waitForNextUpdate()
@@ -1166,7 +1250,7 @@ describe('containers/Bucket/Queries/Athena/model/requests', () => {
expect(setQuery).toHaveBeenCalledWith(null)
})
- it('retains value when execution and query are initially empty but later updates', async () => {
+ it('obtains value when execution and query are initially empty but later update', async () => {
const initialQuery = null
const initialExecution = null
const setQuery = jest.fn()
@@ -1178,8 +1262,10 @@ describe('containers/Bucket/Queries/Athena/model/requests', () => {
},
)
- expect(result.current.value).toBeUndefined()
+ expect(result.current.value).toBeNull()
+ // Query was loaded with some value
+ // Execution is ready but it's still null
await act(async () => {
rerender([
{ key: 'up', name: 'Updated', body: 'SELECT * FROM updated' },
@@ -1195,5 +1281,68 @@ describe('containers/Bucket/Queries/Athena/model/requests', () => {
throw new Error('No data')
}
})
+
+ it('sets query body to null if query is null after being loaded', async () => {
+ const initialQuery = Model.Loading
+ const initialExecution = null
+ const setQuery = jest.fn()
+
+ const { result, rerender, waitForNextUpdate } = renderHook(
+ (props: Parameters) => useWrapper(props),
+ {
+ initialProps: [
+ initialQuery as Model.Value,
+ setQuery,
+ initialExecution,
+ ],
+ },
+ )
+
+ expect(result.current.value).toBe(Model.Loading)
+
+ await act(async () => {
+ rerender([null, setQuery, initialExecution])
+ await waitForNextUpdate()
+ })
+
+ if (Model.hasValue(result.current.value)) {
+ expect(result.current.value).toBeNull()
+ } else {
+ throw new Error('Unexpected state')
+ }
+ })
+
+ it('retains value if selected query is null and we switch from some execution', async () => {
+ // That's not ideal,
+ // but we don't know what chanded the query body: execution page or user.
+ // So, at least, it is documented here.
+ const initialQuery = null
+ const initialExecution = { id: 'any', query: 'SELECT * FROM updated' }
+ const setQuery = jest.fn()
+
+ const { result, rerender, waitForNextUpdate } = renderHook(
+ (props: Parameters) => useWrapper(props),
+ {
+ initialProps: [
+ initialQuery as Model.Value,
+ setQuery,
+ initialExecution,
+ ],
+ },
+ )
+
+ expect(result.current.value).toBe('SELECT * FROM updated')
+
+ await act(async () => {
+ rerender([initialQuery, setQuery, null])
+ await waitForNextUpdate()
+ })
+
+ if (Model.hasValue(result.current.value)) {
+ expect(result.current.value).toBe('SELECT * FROM updated')
+ } else {
+ throw new Error('Unexpected state')
+ }
+ })
})
})
diff --git a/catalog/app/containers/Bucket/Queries/Athena/model/requests.ts b/catalog/app/containers/Bucket/Queries/Athena/model/requests.ts
index 227e6ecbde2..8da2450012b 100644
--- a/catalog/app/containers/Bucket/Queries/Athena/model/requests.ts
+++ b/catalog/app/containers/Bucket/Queries/Athena/model/requests.ts
@@ -630,6 +630,9 @@ export function useQueryBody(
if (Model.isError(query)) return null
if (Model.hasData(query)) return query.body
if (Model.hasData(execution) && execution.query) return execution.query
+ if (!Model.isReady(v) && Model.isReady(query) && Model.isReady(execution)) {
+ return null
+ }
return v
})
}, [execution, query])
diff --git a/lambdas/pkgselect/.python-version b/lambdas/pkgselect/.python-version
deleted file mode 100644
index cc1923a40b1..00000000000
--- a/lambdas/pkgselect/.python-version
+++ /dev/null
@@ -1 +0,0 @@
-3.8
diff --git a/lambdas/pkgselect/__init__.py b/lambdas/pkgselect/__init__.py
deleted file mode 100644
index e69de29bb2d..00000000000
diff --git a/lambdas/pkgselect/index.py b/lambdas/pkgselect/index.py
deleted file mode 100644
index 5840cbe8c53..00000000000
--- a/lambdas/pkgselect/index.py
+++ /dev/null
@@ -1,274 +0,0 @@
-"""
-Provide a virtual-file-system view of a package's logical keys.
-"""
-
-import asyncio
-import dataclasses
-import functools
-import json
-import typing as T
-
-import boto3
-import pandas as pd
-
-from t4_lambda_shared.utils import query_manifest_content, sql_escape
-
-
-async def run_async(fn, executor=None, loop=None):
- if loop is None:
- loop = asyncio.get_running_loop()
- return await loop.run_in_executor(executor, fn)
-
-
-class PkgselectException(Exception):
- def __str__(self):
- s = self.__class__.__name__
- if self.args:
- s = f"{s}: {self.args[0]}"
- return s
-
-
-class BadInputParameters(PkgselectException):
- pass
-
-
-class AccessDenied(PkgselectException):
- pass
-
-
-class NotFound(PkgselectException):
- pass
-
-
-def validate(condition: T.Any, message: str):
- if not condition:
- raise BadInputParameters(message)
-
-
-def file_list_to_folder(df: pd.DataFrame, limit: int, offset: int) -> dict:
- """
- Post process a set of logical keys to return only the top-level folder view.
- """
- if {'physical_key', 'logical_key', 'size'}.issubset(df.columns):
- groups = df.groupby(df.logical_key.str.extract('([^/]+/?).*')[0], dropna=True)
- folder = groups.agg(
- size=('size', 'sum'),
- physical_key=('physical_key', 'first')
- )
- folder.reset_index(inplace=True) # move the logical_key from the index to column[0]
- folder.rename(columns={0: 'logical_key'}, inplace=True) # name the new column
-
- # Sort to ensure consistent paging
- folder.sort_values(by=['logical_key'], inplace=True)
-
- # Page response (folders and files) based on limit & offset
- total_results = len(folder.index)
- folder = folder.iloc[offset:offset+limit]
-
- # Do not return physical_key for prefixes
- prefixes = folder[folder.logical_key.str.contains('/')].drop(
- ['physical_key'],
- axis=1
- ).to_dict(orient='records')
- objects = folder[~folder.logical_key.str.contains('/')].to_dict(orient='records')
- else:
- # df might not have the expected columns if either:
- # (1) the package is empty (has zero package entries) or,
- # (2) zero package entries match the prefix filter.
- # In either case, the folder view is empty.
- prefixes = []
- objects = []
- total_results = 0
-
- return dict(
- total=total_results,
- prefixes=prefixes,
- objects=objects,
- )
-
-
-@functools.lru_cache(maxsize=None)
-def get_s3_client():
- return boto3.client("s3")
-
-
-async def select(bucket: str, key: str, stmt: str):
- s3 = get_s3_client()
- try:
- return await run_async(functools.partial(
- query_manifest_content,
- s3,
- bucket=bucket,
- key=key,
- sql_stmt=stmt,
- ))
- except (s3.exceptions.NoSuchKey, s3.exceptions.NoSuchBucket):
- raise NotFound
- except s3.exceptions.ClientError as ex:
- if ex.response.get("Error", {}).get("Code") == "AccessDenied":
- raise AccessDenied
- raise ex
-
-
-async def select_meta(bucket: str, manifest: str, path: T.Optional[str] = None) -> dict:
- """
- Fetch package-level, directory-level or object-level metadata
- """
- if path:
- sql_stmt = f"SELECT s.meta FROM s3object s WHERE s.logical_key = '{sql_escape(path)}' LIMIT 1"
- else:
- sql_stmt = "SELECT s.* FROM s3object s WHERE s.logical_key is NULL LIMIT 1"
-
- result = await select(bucket, manifest, sql_stmt)
- return json.load(result) if result else {}
-
-
-@dataclasses.dataclass
-class FileView:
- physical_key: str
- size: int
- hash: str
- meta: T.Optional[dict]
-
-
-async def file_view(bucket: str, manifest: str, path: str) -> T.Optional[FileView]:
- """
- Get details of a single file in the package.
- """
- validate(
- isinstance(bucket, str) and bucket,
- f"file_view: bucket must be a non-empty string (given: {bucket!r})",
- )
- validate(
- isinstance(manifest, str) and manifest,
- f"file_view: manifest must be a non-empty string (given: {manifest!r})",
- )
- validate(
- isinstance(path, str) and path,
- f"file_view: path must be a non-empty string (given: {path!r})",
- )
-
- details = await select(
- bucket,
- manifest,
- f"""
- SELECT s.physical_keys[0] as physical_key, s."size", s.hash."value" as hash, s.meta
- FROM s3object s
- WHERE s.logical_key = '{sql_escape(path)}'
- LIMIT 1
- """,
- )
- return FileView(**json.load(details)) if details is not None else None
-
-
-@dataclasses.dataclass
-class DirView:
- total: int
- prefixes: T.List[dict] # {logical_key: str, size: float}
- objects: T.List[dict] # {logical_key: str, size: float, physical_key: str}
- meta: dict
-
-
-async def dir_view(
- bucket: str,
- manifest: str,
- path: T.Optional[str],
- limit: T.Optional[int] = None,
- offset: T.Optional[int] = None,
-) -> DirView:
- validate(
- isinstance(bucket, str) and bucket,
- f"dir_view: bucket must be a non-empty string (given: {bucket!r})",
- )
- validate(
- isinstance(manifest, str) and manifest,
- f"dir_view: manifest must be a non-empty string (given: {manifest!r})",
- )
- validate(
- path is None or isinstance(path, str),
- f"dir_view: path must be a string if provided (given: {path!r})",
- )
- validate(
- limit is None or isinstance(limit, int) and limit > 0,
- f"dir_view: limit must be a positive int if provided (given: {limit!r})",
- )
- validate(
- offset is None or isinstance(offset, int) and offset >= 0,
- f"dir_view: offset must be a non-negative int if provided (given: {offset!r})",
- )
-
- if limit is None:
- limit = 1000
- if offset is None:
- offset = 0
-
- path = path.rstrip("/")
- if path:
- path += "/"
-
- meta = asyncio.create_task(select_meta(bucket, manifest, path))
-
- # Call s3 select to fetch only logical keys matching the desired prefix (folder path)
- prefix_length = len(path) if path is not None else 0
- sql_stmt = \
- f"""
- SELECT
- SUBSTRING(s.logical_key, {prefix_length + 1}) as logical_key,
- s."size",
- s.physical_keys[0] as physical_key
- FROM s3object s
- """
-
- if path:
- sql_stmt += f" WHERE SUBSTRING(s.logical_key, 1, {prefix_length}) = '{sql_escape(path)}'"
-
- result = await select(bucket, manifest, sql_stmt)
-
- # Parse the response into a logical folder view
- if result is not None:
- df = pd.read_json(
- result,
- lines=True,
- dtype=dict(logical_key="string", physical_key="string"),
- )
- else:
- df = pd.DataFrame()
-
- return DirView(
- **file_list_to_folder(df, limit, offset),
- meta=await meta,
- )
-
-
-actions = {
- "file": file_view,
- "dir": dir_view,
-}
-
-
-def lambda_handler(evt, _ctx):
- """
- Parse a manifest to return a folder-like view of its contents (logical keys).
- Payload format:
- bucket: str
- manifest: str
- action: see actions mapping
- params: see *_view functions
- Returns: {result} or {error} (see *_view functions for result format)
- """
- try:
- action = evt.get("action")
- validate(
- action in actions,
- f"action must be one of: {', '.join(actions)} (given: {action!r})",
- )
-
- result = asyncio.run(actions[action](
- evt.get("bucket"),
- evt.get("manifest"),
- **evt.get("params", {}),
- ))
- return {"result": dataclasses.asdict(result) if result is not None else None}
-
- except PkgselectException as ex:
- return {"error": str(ex)}
diff --git a/lambdas/pkgselect/requirements.txt b/lambdas/pkgselect/requirements.txt
deleted file mode 100644
index 86849527514..00000000000
--- a/lambdas/pkgselect/requirements.txt
+++ /dev/null
@@ -1,10 +0,0 @@
-boto3==1.17.100
-botocore==1.20.100
-jmespath==0.10.0
-numpy==1.22.0
-pandas==1.1.0
-python-dateutil==2.8.2
-pytz==2023.3
-s3transfer==0.4.2
-six==1.16.0
-urllib3==1.26.19
diff --git a/lambdas/pkgselect/setup.py b/lambdas/pkgselect/setup.py
deleted file mode 100644
index 1a58f98d407..00000000000
--- a/lambdas/pkgselect/setup.py
+++ /dev/null
@@ -1,7 +0,0 @@
-from setuptools import setup
-
-setup(
- name='quilt3_package_browse',
- version='0.0.1',
- py_modules=['index'],
-)
diff --git a/lambdas/pkgselect/test-requirements.txt b/lambdas/pkgselect/test-requirements.txt
deleted file mode 100644
index c6d95fdf08a..00000000000
--- a/lambdas/pkgselect/test-requirements.txt
+++ /dev/null
@@ -1,4 +0,0 @@
-coverage==5.5
-pytest==4.3.0
-pytest-cov==2.6.1
-responses==0.10.5
diff --git a/lambdas/pkgselect/test/__init__.py b/lambdas/pkgselect/test/__init__.py
deleted file mode 100644
index e69de29bb2d..00000000000
diff --git a/lambdas/pkgselect/test/test_pkgselect.py b/lambdas/pkgselect/test/test_pkgselect.py
deleted file mode 100644
index 2be9be05e73..00000000000
--- a/lambdas/pkgselect/test/test_pkgselect.py
+++ /dev/null
@@ -1,537 +0,0 @@
-"""
-Test functions for pkgselect endpoint
-"""
-
-import json
-import os
-from unittest import TestCase, skip
-from unittest.mock import patch
-
-import boto3
-import pandas as pd
-import responses
-
-from t4_lambda_shared.utils import buffer_s3response, read_body
-
-from .. import index as pkgselect
-
-
-@skip("TODO: fix tests")
-class TestPackageSelect(TestCase):
- """
- Unit tests for the PackageSelect API endpoint.
- """
-
- def make_s3response(self, payload_bytes):
- """
- Generate a mock s3 select response
- """
- return {
- 'Payload': [
- {
- 'Records': {
- 'Payload': payload_bytes
- }
- },
- {
- 'Progress': {
- 'Details': {
- 'BytesScanned': 123,
- 'BytesProcessed': 123,
- 'BytesReturned': 123
- }
- }
- },
- {
- 'Stats': {
- 'Details': {
- 'BytesScanned': 123,
- 'BytesProcessed': 123,
- 'BytesReturned': 123
- }
- }
- },
- {
- 'End': {}
- }
- ]
- }
-
- def make_s3response_empty(self):
- """
- Generate a mock s3 select response
- """
- return {
- 'Payload': [
- {
- 'Stats': {
- 'Details': {
- 'BytesScanned': 123,
- 'BytesProcessed': 123,
- 'BytesReturned': 0
- }
- }
- },
- {
- 'End': {}
- }
- ]
- }
-
- def make_manifest_query(self, logical_keys):
- entries = []
- for key in logical_keys:
- entry = dict(
- logical_key=key,
- physical_key=f"{key}?versionid=1234",
- size=100
- )
- entries.append(json.dumps(entry))
- jsonl = "\n".join(entries)
- streambytes = jsonl.encode()
-
- return self.make_s3response(streambytes)
-
- def setUp(self):
- """
- Mocks to tests calls to S3 Select
- """
-
- logical_keys = [
- "foo.csv",
- "bar/file1.txt",
- "bar/file2.txt",
- "bar/baz/file3.txt",
- "bar/baz/file4.txt"
- ]
-
- manifest_row = dict(
- logical_key="bar/file1.txt",
- physical_keys=["s3://test-bucket/bar/file1.txt"],
- size=1234,
- hash={"type": "SHA256", "value": "0123456789ABCDEF"},
- meta={}
- )
- detailbytes = json.dumps(manifest_row).encode()
-
- self.s3response = self.make_manifest_query(logical_keys)
- self.s3response_detail = self.make_s3response(detailbytes)
- self.s3response_detail_empty = self.make_s3response_empty()
- self.s3response_incomplete = {
- 'Payload': [
- {
- 'Records': {
- 'Payload': self.s3response['Payload'][0]['Records']['Payload']
- }
- },
- {
- 'Stats': {
- 'Details': {
- 'BytesScanned': 123,
- 'BytesProcessed': 123,
- 'BytesReturned': 123
- }
- }
- }
- ]
- }
-
- meta = {
- "version": "v0",
- "user_meta": {
- "somefield": "somevalue"
- },
- "message": "Commit message"
- }
- metabytes = json.dumps(meta).encode()
- self.s3response_meta = self.make_s3response(metabytes)
-
- requests_mock = responses.RequestsMock(assert_all_requests_are_fired=False)
- requests_mock.start()
- self.addCleanup(requests_mock.stop)
-
- env_patcher = patch.dict(os.environ, {
- 'AWS_ACCESS_KEY_ID': 'test_key',
- 'AWS_SECRET_ACCESS_KEY': 'test_secret',
- })
- env_patcher.start()
- self.addCleanup(env_patcher.stop)
-
- def test_browse_top_level(self):
- """
- Test that the S3 Select response is parsed
- into the correct top-level folder view.
- """
- df = pd.read_json(buffer_s3response(self.s3response), lines=True)
- assert isinstance(df, pd.DataFrame)
-
- folder = pkgselect.file_list_to_folder(df, 1000, 0)
- assert len(folder['prefixes']) == 1
- assert len(folder['objects']) == 1
- assert folder['objects'][0]['logical_key'] == 'foo.csv'
- assert folder['prefixes'][0]['logical_key'] == 'bar/'
-
- def test_limit(self):
- """
- Test that the S3 Select response is parsed
- into the correct top-level folder view.
- """
- df = pd.read_json(buffer_s3response(self.s3response), lines=True)
- assert isinstance(df, pd.DataFrame)
-
- folder = pkgselect.file_list_to_folder(df, 1, 0)
- assert len(folder['prefixes']) == 1
- assert len(folder['objects']) == 0
- assert folder['prefixes'][0]['logical_key'] == 'bar/'
-
- def test_offset(self):
- """
- Test that the S3 Select response is parsed
- into the correct top-level folder view.
- """
- df = pd.read_json(buffer_s3response(self.s3response), lines=True)
- assert isinstance(df, pd.DataFrame)
-
- folder = pkgselect.file_list_to_folder(df, 1000, 1)
- assert len(folder['prefixes']) == 0
- assert len(folder['objects']) == 1
- assert folder['objects'][0]['logical_key'] == 'foo.csv'
-
- def test_browse_subfolder(self):
- """
- Test that the S3 Select response is parsed
- into the correct sub-folder view.
- """
- prefix = "bar/"
- df = pd.read_json(buffer_s3response(self.s3response), lines=True)
- assert isinstance(df, pd.DataFrame)
- filtered_df = df[df['logical_key'].str.startswith(prefix)]
- stripped = filtered_df['logical_key'].str.slice(start=len(prefix))
- stripped_df = stripped.to_frame('logical_key')
- s3_df = pd.concat(
- [stripped_df['logical_key'], filtered_df['size'], filtered_df['physical_key']],
- axis=1,
- keys=['logical_key', 'size', 'physical_key']
- )
-
- folder = pkgselect.file_list_to_folder(s3_df, 1000, 0)
- assert len(folder['prefixes']) == 1
- assert len(folder['objects']) == 2
- object_keys = [obj['logical_key'] for obj in folder['objects']]
- assert "file1.txt" in object_keys
- assert "file2.txt" in object_keys
- assert folder['prefixes'][0]['logical_key'] == "baz/"
-
- def test_browse_subsubfolder(self):
- """
- Test that the S3 Select response is parsed
- into the correct sub-sub-folder view.
- """
- prefix = "bar/baz/"
- df = pd.read_json(buffer_s3response(self.s3response), lines=True)
- assert isinstance(df, pd.DataFrame)
- filtered_df = df[df['logical_key'].str.startswith(prefix)]
- stripped = filtered_df['logical_key'].str.slice(start=len(prefix))
- stripped_df = stripped.to_frame('logical_key')
- s3_df = pd.concat(
- [stripped_df['logical_key'], filtered_df['size'], filtered_df['physical_key']],
- axis=1,
- keys=['logical_key', 'size', 'physical_key']
- )
- folder = pkgselect.file_list_to_folder(s3_df, 1000, 0)
- assert "objects" in folder
- assert "prefixes" in folder
- assert not folder['prefixes']
- assert len(folder['objects']) == 2
- object_keys = [obj['logical_key'] for obj in folder['objects']]
- assert "file3.txt" in object_keys
- assert "file4.txt" in object_keys
-
- def test_folder_view(self):
- """
- End-to-end test (folder view without a prefix)
- """
- bucket = "bucket"
- key = ".quilt/packages/manifest_hash"
- params = dict(
- bucket=bucket,
- manifest=key,
- action="dir",
- )
-
- expected_args = {
- 'Bucket': bucket,
- 'Key': key,
- 'Expression': "SELECT SUBSTRING(s.logical_key, 1) AS logical_key FROM s3object s",
- 'ExpressionType': 'SQL',
- 'InputSerialization': {
- 'CompressionType': 'NONE',
- 'JSON': {'Type': 'LINES'}
- },
- 'OutputSerialization': {'JSON': {'RecordDelimiter': '\n'}},
- }
-
- mock_s3 = boto3.client('s3')
- with patch.object(
- mock_s3,
- 'select_object_content',
- side_effect=[
- self.s3response,
- self.s3response_meta,
- ]
- ) as client_patch, patch('boto3.Session.client', return_value=mock_s3):
- response = pkgselect.lambda_handler(params, None)
- print(response)
- folder = json.loads(read_body(response))['result']
- assert len(folder['prefixes']) == 1
- assert len(folder['objects']) == 1
- assert folder['objects'][0]['logical_key'] == 'foo.csv'
- assert folder['prefixes'][0]['logical_key'] == 'bar/'
-
- def test_folder_view_paging(self):
- """
- End-to-end test (top-level folder view with a limit & offset)
- """
- bucket = "bucket"
- key = ".quilt/packages/manifest_hash"
- params = dict(
- bucket=bucket,
- manifest=key,
- action="dir",
- params={
- "path": "paging_test/",
- "limit": 10,
- "offset": 10,
- },
- )
-
- expected_args = {
- 'Bucket': bucket,
- 'Key': key,
- 'Expression': "SELECT SUBSTRING(s.logical_key, 1) AS logical_key FROM s3object s",
- 'ExpressionType': 'SQL',
- 'InputSerialization': {
- 'CompressionType': 'NONE',
- 'JSON': {'Type': 'LINES'}
- },
- 'OutputSerialization': {'JSON': {'RecordDelimiter': '\n'}},
- }
-
- paging_logical_keys = [
- f"f{i:03d}.csv" for i in range(1000)
- ]
- s3response_paging = self.make_manifest_query(paging_logical_keys)
-
- mock_s3 = boto3.client('s3')
- with patch.object(
- mock_s3,
- 'select_object_content',
- side_effect=[
- s3response_paging,
- self.s3response_meta
- ]
- ) as client_patch, patch(
- 'boto3.Session.client',
- return_value=mock_s3
- ):
- response = pkgselect.lambda_handler(params, None)
- print(response)
- folder = json.loads(read_body(response))['result']
- assert len(folder['prefixes']) == 0
- assert len(folder['objects']) == 10
- assert folder['total'] == 1000
- assert folder['objects'][0]['logical_key'] == 'f010.csv'
-
- def test_detail_view(self):
- """
- End-to-end test (detail view)
- """
- bucket = "bucket"
- key = ".quilt/packages/manifest_hash"
- logical_key = "bar/file1.txt"
- params = dict(
- bucket=bucket,
- manifest=key,
- action="file",
- params={"path": logical_key},
- )
-
- expected_sql = "SELECT s.* FROM s3object s WHERE s.logical_key = 'bar/file1.txt' LIMIT 1"
- expected_args = {
- 'Bucket': bucket,
- 'Key': key,
- 'Expression': "SELECT SUBSTRING(s.logical_key, 1) AS logical_key FROM s3object s",
- 'ExpressionType': 'SQL',
- 'InputSerialization': {
- 'CompressionType': 'NONE',
- 'JSON': {'Type': 'LINES'}
- },
- 'OutputSerialization': {'JSON': {'RecordDelimiter': '\n'}},
- }
-
- mock_s3 = boto3.client('s3')
- with patch.object(
- mock_s3,
- 'select_object_content',
- return_value=self.s3response_detail
- ) as client_patch, patch(
- 'boto3.Session.client',
- return_value=mock_s3
- ):
- response = pkgselect.lambda_handler(params, None)
- print(response)
- json.loads(read_body(response))['result']
-
- def test_non_existing_logical_key(self):
- """
- End-to-end test (detail view)
- """
- bucket = "bucket"
- key = ".quilt/packages/manifest_hash"
- logical_key = "non-existing.txt"
- params = dict(
- bucket=bucket,
- manifest=key,
- action="file",
- params={"path": logical_key},
- )
-
- expected_sql = f"SELECT s.* FROM s3object s WHERE s.logical_key = '{logical_key}' LIMIT 1"
- expected_args = {
- 'Bucket': bucket,
- 'Key': key,
- 'Expression': "SELECT SUBSTRING(s.logical_key, 1) AS logical_key FROM s3object s",
- 'ExpressionType': 'SQL',
- 'InputSerialization': {
- 'CompressionType': 'NONE',
- 'JSON': {'Type': 'LINES'}
- },
- 'OutputSerialization': {'JSON': {'RecordDelimiter': '\n'}},
- }
-
- mock_s3 = boto3.client('s3')
- with patch.object(
- mock_s3,
- 'select_object_content',
- return_value=self.s3response_detail_empty
- ) as client_patch, patch(
- 'boto3.Session.client',
- return_value=mock_s3
- ):
- response = pkgselect.lambda_handler(params, None)
- print(response)
- assert response['statusCode'] == 404
-
- def test_non_string_keys(self):
- """
- End-to-end test (folder view without a prefix)
- """
- bucket = "bucket"
- key = ".quilt/packages/manifest_hash"
- params = dict(
- bucket=bucket,
- manifest=key,
- action="dir",
- )
-
- expected_args = {
- 'Bucket': bucket,
- 'Key': key,
- 'Expression': "SELECT SUBSTRING(s.logical_key, 1) AS logical_key FROM s3object s",
- 'ExpressionType': 'SQL',
- 'InputSerialization': {
- 'CompressionType': 'NONE',
- 'JSON': {'Type': 'LINES'}
- },
- 'OutputSerialization': {'JSON': {'RecordDelimiter': '\n'}},
- }
-
- # Return a response with keys that are not strings (integers here)
- # The important test case is where all members of a column are
- # non-string
- logical_keys = [
- "1",
- "2",
- "3",
- ]
- entries = []
- for key in logical_keys:
- entry = dict(
- logical_key=key,
- physical_key=key,
- size=100
- )
- entries.append(json.dumps(entry))
- jsonl = "\n".join(entries)
- streambytes = jsonl.encode()
- non_string_s3response = self.make_s3response(streambytes)
-
- mock_s3 = boto3.client('s3')
- with patch.object(
- mock_s3,
- 'select_object_content',
- side_effect=[
- non_string_s3response,
- self.s3response_meta
- ]
- ) as client_patch, patch(
- 'boto3.Session.client',
- return_value=mock_s3
- ):
- response = pkgselect.lambda_handler(params, None)
- print(response)
- folder = json.loads(read_body(response))['result']
- assert not folder['prefixes']
- assert len(folder['objects']) == 3
- assert folder['objects'][0]['logical_key'] == '1'
- assert folder['objects'][1]['logical_key'] == '2'
- assert folder['objects'][2]['logical_key'] == '3'
-
- def test_empty_manifest(self):
- """
- End-to-end test (folder view without a prefix) for an
- empty package manifest
- """
- bucket = "bucket"
- key = ".quilt/packages/manifest_hash"
- params = dict(
- bucket=bucket,
- manifest=key,
- action="dir",
- )
-
- expected_args = {
- 'Bucket': bucket,
- 'Key': key,
- 'Expression': "SELECT SUBSTRING(s.logical_key, 1) AS logical_key FROM s3object s",
- 'ExpressionType': 'SQL',
- 'InputSerialization': {
- 'CompressionType': 'NONE',
- 'JSON': {'Type': 'LINES'}
- },
- 'OutputSerialization': {'JSON': {'RecordDelimiter': '\n'}},
- }
-
- # Empty manifest
- jsonl = '{"version": "v0", "message": null}'
- streambytes = jsonl.encode()
- non_string_s3response = self.make_s3response(streambytes)
-
- mock_s3 = boto3.client('s3')
- with patch.object(
- mock_s3,
- 'select_object_content',
- side_effect=[
- non_string_s3response,
- self.s3response_meta
- ]
- ) as client_patch, patch(
- 'boto3.Session.client',
- return_value=mock_s3
- ):
- response = pkgselect.lambda_handler(params, None)
- print(response)
- folder = json.loads(read_body(response))['result']
- assert not folder['prefixes']
- assert not folder['objects']
- assert folder['total'] == 0
diff --git a/lambdas/s3select/.python-version b/lambdas/s3select/.python-version
deleted file mode 100644
index cc1923a40b1..00000000000
--- a/lambdas/s3select/.python-version
+++ /dev/null
@@ -1 +0,0 @@
-3.8
diff --git a/lambdas/s3select/requirements.txt b/lambdas/s3select/requirements.txt
deleted file mode 100644
index d60d4aa053c..00000000000
--- a/lambdas/s3select/requirements.txt
+++ /dev/null
@@ -1,13 +0,0 @@
-attrs==19.1.0
-botocore==1.21.44
-certifi==2024.7.4
-chardet==3.0.4
-docutils==0.14
-idna==3.7
-jmespath==0.9.4
-jsonschema==3.0.1
-pyrsistent==0.15.3
-python-dateutil==2.8.0
-requests==2.32.0
-six==1.12.0
-urllib3==1.26.19
diff --git a/lambdas/s3select/setup.py b/lambdas/s3select/setup.py
deleted file mode 100644
index 0e4b32174d8..00000000000
--- a/lambdas/s3select/setup.py
+++ /dev/null
@@ -1,8 +0,0 @@
-from setuptools import find_packages, setup
-
-setup(
- name='t4_lambda_s3select',
- version='0.0.1',
- packages=find_packages(where="src"),
- package_dir={"": "src"},
-)
diff --git a/lambdas/s3select/src/t4_lambda_s3select/__init__.py b/lambdas/s3select/src/t4_lambda_s3select/__init__.py
deleted file mode 100644
index 2a13e51f1cb..00000000000
--- a/lambdas/s3select/src/t4_lambda_s3select/__init__.py
+++ /dev/null
@@ -1,75 +0,0 @@
-"""
-Sign S3 select requests (because S3 select does not allow anonymous access).
-
-The implementation doesn't care what the request is, and just signs it using
-the current AWS credentials.
-"""
-import os
-from urllib.parse import urlencode
-
-import requests
-from botocore.auth import SigV4Auth
-from botocore.awsrequest import AWSRequest
-from botocore.session import Session
-
-from t4_lambda_shared.decorator import api
-from t4_lambda_shared.utils import get_default_origins
-
-SERVICE = 's3'
-REGION = os.environ.get('AWS_REGION', '')
-
-REQUEST_HEADERS_TO_FORWARD = {'content-type', 'cache-control', 'pragma', 'x-amz-content-sha256', 'x-amz-user-agent'}
-REQUEST_HEADERS_TO_SIGN = {'host', 'x-amz-content-sha256', 'x-amz-user-agent'}
-RESPONSE_HEADERS_TO_FORWARD = {'content-type'}
-
-session = requests.Session()
-
-
-@api(cors_origins=get_default_origins())
-def lambda_handler(request):
- """
- Sign the request and forward it to S3.
- """
- if not (request.method == 'POST' and 'select' in request.args):
- return requests.codes.bad_request, 'Not an S3 select', {'content-type': 'text/plain'}
-
- bucket, key = request.pathParameters['proxy'].split('/', 1)
- host = f'{bucket}.s3.amazonaws.com'
-
- # Make an unsigned HEAD request to test anonymous access.
-
- object_url = f'https://{host}/{key}'
- head_response = session.head(object_url)
- if not head_response.ok:
- return requests.codes.forbidden, 'Not allowed', {'content-type': 'text/plain'}
-
- # Sign the full S3 select request.
-
- url = f'{object_url}?{urlencode(request.args)}'
-
- headers = {k: v for k, v in request.headers.items() if k in REQUEST_HEADERS_TO_FORWARD}
- headers['host'] = host
-
- aws_request = AWSRequest(
- method=request.method,
- url=url,
- data=request.data,
- headers={k: v for k, v in headers.items() if k in REQUEST_HEADERS_TO_SIGN}
- )
- credentials = Session().get_credentials()
- auth = SigV4Auth(credentials, SERVICE, REGION)
- auth.add_auth(aws_request)
-
- headers.update(aws_request.headers)
-
- response = session.post(
- url=url,
- data=request.data, # Forward the POST data.
- headers=headers,
- )
-
- response_headers = {k: v for k, v in response.headers.items() if k in RESPONSE_HEADERS_TO_FORWARD}
- # Add a default content type to prevent API Gateway from setting it to application/json.
- response_headers.setdefault('content-type', 'application/octet-stream')
-
- return response.status_code, response.content, response_headers
diff --git a/lambdas/s3select/test-requirements.txt b/lambdas/s3select/test-requirements.txt
deleted file mode 100644
index 7e23afb8f36..00000000000
--- a/lambdas/s3select/test-requirements.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-atomicwrites==1.3.0
-importlib-metadata==0.18
-more-itertools==7.1.0
-pluggy==0.13.1
-py==1.10.0
-pyparsing==2.4.0
-pytest==4.3.0
-pytest-cov==2.6.1
-responses==0.10.6
-wcwidth==0.1.7
-zipp==3.19.1
diff --git a/lambdas/s3select/tests/__init__.py b/lambdas/s3select/tests/__init__.py
deleted file mode 100644
index e69de29bb2d..00000000000
diff --git a/lambdas/s3select/tests/test_s3select.py b/lambdas/s3select/tests/test_s3select.py
deleted file mode 100644
index 13e61696508..00000000000
--- a/lambdas/s3select/tests/test_s3select.py
+++ /dev/null
@@ -1,104 +0,0 @@
-import os
-from base64 import b64decode
-from unittest import TestCase
-from unittest.mock import patch
-
-import responses
-
-import t4_lambda_s3select
-
-
-@patch('t4_lambda_s3select.REGION', 'us-east-1')
-class TestS3Select(TestCase):
- """Tests S3 Select"""
- def setUp(self):
- self.requests_mock = responses.RequestsMock(assert_all_requests_are_fired=False)
- self.requests_mock.start()
-
- self.env_patcher = patch.dict(os.environ, {
- 'AWS_ACCESS_KEY_ID': 'test_key',
- 'AWS_SECRET_ACCESS_KEY': 'test_secret',
- })
- self.env_patcher.start()
-
- def tearDown(self):
- self.env_patcher.stop()
- self.requests_mock.stop()
-
- @classmethod
- def _make_event(cls, path, query, headers, body):
- return {
- 'httpMethod': 'POST',
- 'path': f'/lambda/{path}',
- 'pathParameters': {
- 'proxy': path
- },
- 'queryStringParameters': query or None,
- 'headers': headers or None,
- 'body': body,
- 'isBase64Encoded': False,
- }
-
- def test_signature(self):
- url = 'https://bucket.s3.amazonaws.com/object.csv'
-
- self.requests_mock.add(
- responses.HEAD,
- url,
- status=200)
-
- def _callback(request):
- assert 'X-Amz-Date' in request.headers
- assert 'Authorization' in request.headers
- assert request.headers['content-type'] == 'application/octet-stream'
- assert request.headers['cache-control'] == 'no-cache'
- assert request.headers['pragma'] == 'no-cache'
- assert 'referer' not in request.headers
- return 200, {}, b'results'
-
- self.requests_mock.add_callback(
- responses.POST,
- url,
- _callback)
-
- query = {
- 'select': '',
- 'select-type': '2',
- }
- headers = {
- 'content-type': 'application/octet-stream',
- 'x-amz-content-sha256': '123456',
- 'x-amz-user-agent': 'test',
- 'cache-control': 'no-cache',
- 'pragma': 'no-cache',
- 'referer': 'http://example.com'
- }
- body = b's3 select request body'
-
- event = self._make_event('bucket/object.csv', query, headers, body)
- resp = t4_lambda_s3select.lambda_handler(event, None)
- assert resp['statusCode'] == 200
- assert resp['isBase64Encoded']
- assert b64decode(resp['body']) == b'results'
-
- def test_not_public(self):
- url = 'https://bucket.s3.amazonaws.com/object.csv'
-
- self.requests_mock.add(
- responses.HEAD,
- url,
- status=403)
-
- event = self._make_event('bucket/object.csv', {'select': None}, {}, b'test')
- resp = t4_lambda_s3select.lambda_handler(event, None)
- assert resp['statusCode'] == 403
-
- def test_bad_request(self):
- event = self._make_event('bucket/object.csv', {}, {}, b'test')
- resp = t4_lambda_s3select.lambda_handler(event, None)
- assert resp['statusCode'] == 400
-
- event = self._make_event('bucket/object.csv', {'select': None}, {}, b'test')
- event['httpMethod'] = 'PUT'
- resp = t4_lambda_s3select.lambda_handler(event, None)
- assert resp['statusCode'] == 400
diff --git a/lambdas/shared/t4_lambda_shared/utils.py b/lambdas/shared/t4_lambda_shared/utils.py
index 9fb161dc15b..8d94dc7cef8 100644
--- a/lambdas/shared/t4_lambda_shared/utils.py
+++ b/lambdas/shared/t4_lambda_shared/utils.py
@@ -2,7 +2,6 @@
Helper functions.
"""
import gzip
-import io
import json
import logging
import os
@@ -96,13 +95,6 @@ def read_body(resp):
return body
-class IncompleteResultException(Exception):
- """
- Exception indicating an incomplete response
- (e.g., from S3 Select)
- """
-
-
def sql_escape(s):
"""
Escape strings that might contain single quotes for use in Athena
@@ -110,60 +102,3 @@ def sql_escape(s):
"""
escaped = s or ""
return escaped.replace("'", "''")
-
-
-def buffer_s3response(s3response):
- """
- Read a streaming response (botocore.eventstream.EventStream) from s3 select
- into a BytesIO buffer
- """
- logger_ = logging.getLogger(LOGGER_NAME)
- response = io.BytesIO()
- end_event_received = False
- stats = None
- found_records = False
- for event in s3response['Payload']:
- if 'Records' in event:
- records = event['Records']['Payload']
- response.write(records)
- found_records = True
- elif 'Progress' in event:
- logger_.info("select progress: %s", event['Progress'].get('Details'))
- elif 'Stats' in event:
- logger_.info("select stats: %s", event['Stats'])
- elif 'End' in event:
- # End event indicates that the request finished successfully
- end_event_received = True
-
- if not end_event_received:
- raise IncompleteResultException("Error: Received an incomplete response from S3 Select.")
- response.seek(0)
- return response if found_records else None
-
-
-def query_manifest_content(
- s3_client: str,
- *,
- bucket: str,
- key: str,
- sql_stmt: str
-) -> io.BytesIO:
- """
- Call S3 Select to read only the logical keys from a
- package manifest that match the desired folder path
- prefix
- """
- logger_ = get_quilt_logger()
- logger_.debug("utils.py: manifest_select: %s", sql_stmt)
- response = s3_client.select_object_content(
- Bucket=bucket,
- Key=key,
- ExpressionType='SQL',
- Expression=sql_stmt,
- InputSerialization={
- 'JSON': {'Type': 'LINES'},
- 'CompressionType': 'NONE'
- },
- OutputSerialization={'JSON': {'RecordDelimiter': '\n'}}
- )
- return buffer_s3response(response)
diff --git a/lambdas/shared/tests/test_utils.py b/lambdas/shared/tests/test_utils.py
index b9a7a403db3..6cb0a4eea7d 100644
--- a/lambdas/shared/tests/test_utils.py
+++ b/lambdas/shared/tests/test_utils.py
@@ -7,16 +7,13 @@
from unittest import TestCase
from unittest.mock import patch
-import boto3
import pytest
from testfixtures import LogCapture
from t4_lambda_shared.utils import (
- IncompleteResultException,
get_available_memory,
get_default_origins,
make_json_response,
- query_manifest_content,
separated_env_to_iter,
)
@@ -143,76 +140,6 @@ def test_json_response(self):
assert json.loads(body) == {'foo': 'bar'}
assert headers == {'Content-Type': 'application/json', 'Content-Length': '123'}
- def test_call_s3select(self):
- """
- Test that parameters are correctly passed to
- S3 Select (without a prefix)
- """
- bucket = "bucket"
- key = ".quilt/packages/manifest_hash"
-
- expected_sql = "SELECT SUBSTRING(s.logical_key, 1) AS logical_key FROM s3object s"
- expected_args = {
- 'Bucket': bucket,
- 'Key': key,
- 'Expression': expected_sql,
- 'ExpressionType': 'SQL',
- 'InputSerialization': {
- 'CompressionType': 'NONE',
- 'JSON': {'Type': 'LINES'}
- },
- 'OutputSerialization': {'JSON': {'RecordDelimiter': '\n'}},
- }
-
- mock_s3 = boto3.client('s3')
- with patch.object(
- mock_s3,
- 'select_object_content',
- return_value=self.s3response
- ) as patched:
- query_manifest_content(
- mock_s3,
- bucket=bucket,
- key=key,
- sql_stmt=expected_sql)
- patched.assert_called_once_with(**expected_args)
-
- def test_call_s3select_incomplete_response(self):
- """
- Test that an incomplete response from S3 Select is
- detected and an exception is raised.
- """
- bucket = "bucket"
- key = ".quilt/packages/manifest_hash"
-
- expected_sql = "SELECT SUBSTRING(s.logical_key, 1) AS logical_key FROM s3object s"
- expected_args = {
- 'Bucket': bucket,
- 'Key': key,
- 'Expression': expected_sql,
- 'ExpressionType': 'SQL',
- 'InputSerialization': {
- 'CompressionType': 'NONE',
- 'JSON': {'Type': 'LINES'}
- },
- 'OutputSerialization': {'JSON': {'RecordDelimiter': '\n'}},
- }
-
- mock_s3 = boto3.client('s3')
- with patch.object(
- mock_s3,
- 'select_object_content',
- return_value=self.s3response_incomplete
- ) as patched:
- with self.assertRaises(IncompleteResultException):
- query_manifest_content(
- mock_s3,
- bucket=bucket,
- key=key,
- sql_stmt=expected_sql
- )
- patched.assert_called_once_with(**expected_args)
-
@pytest.mark.parametrize(
"level, call, message, expected, name",