From a168a16646ba1cffe8117614b88b56220177f78d Mon Sep 17 00:00:00 2001 From: Hailiang Zhang Date: Sat, 14 Dec 2024 23:27:12 -0500 Subject: [PATCH 01/12] return self._http_session from get_requests_session method in Store class --- earthaccess/store.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/earthaccess/store.py b/earthaccess/store.py index 58ac9f59..b2808a6f 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -336,7 +336,10 @@ def get_requests_session(self, bearer_token: bool = True) -> requests.Session: Returns: requests Session """ - return self.auth.get_session() + if hasattr(self, "_http_session"): + return self._http_session + else: + raise AttributeError("The requests session hasn't been set up yet.") def open( self, From 36b95faaab4e300dbe74eddd1e41943d6815a93f Mon Sep 17 00:00:00 2001 From: Hailiang Zhang Date: Sat, 14 Dec 2024 23:30:28 -0500 Subject: [PATCH 02/12] get existing session instead of creating a new one when downloading the file --- earthaccess/store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/earthaccess/store.py b/earthaccess/store.py index b2808a6f..60ee759e 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -672,7 +672,7 @@ def _download_file(self, url: str, directory: Path) -> str: path = directory / Path(local_filename) if not path.exists(): try: - session = self.auth.get_session() + session = self.get_requests_session() with session.get( url, stream=True, From 3c35c7a43e2c5648a40a8a9661bc35329d5c4ef6 Mon Sep 17 00:00:00 2001 From: Hailiang Zhang Date: Mon, 16 Dec 2024 16:05:11 -0500 Subject: [PATCH 03/12] set the initial requests session with bearer_token from store --- earthaccess/store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/earthaccess/store.py b/earthaccess/store.py index 60ee759e..8f465578 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -127,7 +127,7 @@ def __init__(self, auth: Any, pre_authorize: bool = False) -> None: oauth_profile = f"https://{auth.system.edl_hostname}/profile" # sets the initial URS cookie self._requests_cookies: Dict[str, Any] = {} - self.set_requests_session(oauth_profile) + self.set_requests_session(oauth_profile, bearer_token=True) if pre_authorize: # collect cookies from other DAACs for url in DAAC_TEST_URLS: From 2a3850e6908a9802c5dd9aba150182a751a65da4 Mon Sep 17 00:00:00 2001 From: Hailiang Zhang Date: Sat, 21 Dec 2024 14:44:22 -0500 Subject: [PATCH 04/12] clone requests session from _download_file --- earthaccess/store.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/earthaccess/store.py b/earthaccess/store.py index 8f465578..b9190e11 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -18,7 +18,7 @@ import earthaccess -from .auth import Auth +from .auth import Auth, SessionWithHeaderRedirection from .daac import DAAC_TEST_URLS, find_provider from .results import DataGranule from .search import DataCollections @@ -655,6 +655,13 @@ def _get_granules( data_links, local_path, pqdm_kwargs=pqdm_kwargs ) + def _clone_session(self, original_session: SessionWithHeaderRedirection) -> SessionWithHeaderRedirection: + new_session = SessionWithHeaderRedirection() + new_session.headers.update(original_session.headers) + new_session.cookies.update(original_session.cookies) + new_session.auth = original_session.auth + return new_session + def _download_file(self, url: str, directory: Path) -> str: """Download a single file from an on-prem location, a DAAC data center. @@ -672,7 +679,8 @@ def _download_file(self, url: str, directory: Path) -> str: path = directory / Path(local_filename) if not path.exists(): try: - session = self.get_requests_session() + original_session = self.get_requests_session() + session = self._clone_session(original_session) with session.get( url, stream=True, From 318c593b30053fa9544cbd30c473dfaab68abd53 Mon Sep 17 00:00:00 2001 From: Hailiang Zhang Date: Sat, 21 Dec 2024 14:44:41 -0500 Subject: [PATCH 05/12] black code --- earthaccess/store.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/earthaccess/store.py b/earthaccess/store.py index b9190e11..17e27ceb 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -655,7 +655,9 @@ def _get_granules( data_links, local_path, pqdm_kwargs=pqdm_kwargs ) - def _clone_session(self, original_session: SessionWithHeaderRedirection) -> SessionWithHeaderRedirection: + def _clone_session( + self, original_session: SessionWithHeaderRedirection + ) -> SessionWithHeaderRedirection: new_session = SessionWithHeaderRedirection() new_session.headers.update(original_session.headers) new_session.cookies.update(original_session.cookies) From 40e610398f80ae40e85d070728a93bc3ae6e5a91 Mon Sep 17 00:00:00 2001 From: Hailiang Zhang Date: Sat, 21 Dec 2024 14:47:52 -0500 Subject: [PATCH 06/12] comments added for _clone_session method --- earthaccess/store.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/earthaccess/store.py b/earthaccess/store.py index 17e27ceb..84e98c64 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -658,6 +658,14 @@ def _get_granules( def _clone_session( self, original_session: SessionWithHeaderRedirection ) -> SessionWithHeaderRedirection: + """Creates a new session that replicates the settings of the original session. + + Parameters: + original_session: The original session object to be cloned + + Returns: + A new session that has the same headers, cookies, and auth as the original session. + """ new_session = SessionWithHeaderRedirection() new_session.headers.update(original_session.headers) new_session.cookies.update(original_session.cookies) From f807616893e95f784d834aa99ab9b583638e133d Mon Sep 17 00:00:00 2001 From: Hailiang Zhang Date: Sat, 21 Dec 2024 17:22:34 -0500 Subject: [PATCH 07/12] set up local thread session from _clone_session_in_local_thread --- earthaccess/store.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/earthaccess/store.py b/earthaccess/store.py index 84e98c64..dd1027f9 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -8,6 +8,7 @@ from pickle import dumps, loads from typing import Any, Dict, List, Mapping, Optional, Tuple, Union from uuid import uuid4 +import threading import fsspec import requests @@ -119,6 +120,7 @@ def __init__(self, auth: Any, pre_authorize: bool = False) -> None: Parameters: auth: Auth instance to download and access data. """ + self.thread_locals = threading.local() if auth.authenticated is True: self.auth = auth self._s3_credentials: Dict[ @@ -655,7 +657,7 @@ def _get_granules( data_links, local_path, pqdm_kwargs=pqdm_kwargs ) - def _clone_session( + def _clone_session_in_local_thread( self, original_session: SessionWithHeaderRedirection ) -> SessionWithHeaderRedirection: """Creates a new session that replicates the settings of the original session. @@ -666,11 +668,12 @@ def _clone_session( Returns: A new session that has the same headers, cookies, and auth as the original session. """ - new_session = SessionWithHeaderRedirection() - new_session.headers.update(original_session.headers) - new_session.cookies.update(original_session.cookies) - new_session.auth = original_session.auth - return new_session + if not hasattr(self.thread_locals, 'local_thread_session'): + local_thread_session = SessionWithHeaderRedirection() + local_thread_session.headers.update(original_session.headers) + local_thread_session.cookies.update(original_session.cookies) + local_thread_session.auth = original_session.auth + self.thread_locals.local_thread_session = local_thread_session def _download_file(self, url: str, directory: Path) -> str: """Download a single file from an on-prem location, a DAAC data center. @@ -690,7 +693,8 @@ def _download_file(self, url: str, directory: Path) -> str: if not path.exists(): try: original_session = self.get_requests_session() - session = self._clone_session(original_session) + self._clone_session_in_local_thread(original_session) + session = self.thread_locals.local_thread_session with session.get( url, stream=True, From 5b486cd767d87d4f8f9a0a31e06317b5a901e375 Mon Sep 17 00:00:00 2001 From: Hailiang Zhang Date: Sat, 21 Dec 2024 17:22:48 -0500 Subject: [PATCH 08/12] black code --- earthaccess/store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/earthaccess/store.py b/earthaccess/store.py index dd1027f9..1574e5c4 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -668,7 +668,7 @@ def _clone_session_in_local_thread( Returns: A new session that has the same headers, cookies, and auth as the original session. """ - if not hasattr(self.thread_locals, 'local_thread_session'): + if not hasattr(self.thread_locals, "local_thread_session"): local_thread_session = SessionWithHeaderRedirection() local_thread_session.headers.update(original_session.headers) local_thread_session.cookies.update(original_session.cookies) From 9930d8412593657348fb240952ca245e802a3736 Mon Sep 17 00:00:00 2001 From: Hailiang Zhang Date: Sat, 21 Dec 2024 17:26:31 -0500 Subject: [PATCH 09/12] comments adjusted --- earthaccess/store.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/earthaccess/store.py b/earthaccess/store.py index 1574e5c4..2b7e23b2 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -659,14 +659,17 @@ def _get_granules( def _clone_session_in_local_thread( self, original_session: SessionWithHeaderRedirection - ) -> SessionWithHeaderRedirection: - """Creates a new session that replicates the settings of the original session. + ) -> None: + """Clone the original session and store it in the local thread context. + + This method creates a new session that replicates the headers, cookies, and authentication settings + from the provided original session. The new session is stored in a thread-local storage. Parameters: - original_session: The original session object to be cloned + original_session (SessionWithHeaderRedirection): The session to be cloned. Returns: - A new session that has the same headers, cookies, and auth as the original session. + None """ if not hasattr(self.thread_locals, "local_thread_session"): local_thread_session = SessionWithHeaderRedirection() From fee69588e5848831a3d14df29b7f452edc8dfd20 Mon Sep 17 00:00:00 2001 From: betolink Date: Fri, 17 Jan 2025 14:51:48 -0600 Subject: [PATCH 10/12] add unit test for the session cookie sharing --- tests/unit/test_store.py | 77 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_store.py b/tests/unit/test_store.py index f2be362b..9d25f91b 100644 --- a/tests/unit/test_store.py +++ b/tests/unit/test_store.py @@ -1,13 +1,18 @@ # package imports import os +import threading import unittest +from pathlib import Path +from unittest.mock import MagicMock, patch import fsspec import pytest import responses import s3fs from earthaccess import Auth, Store +from earthaccess.auth import SessionWithHeaderRedirection from earthaccess.store import EarthAccessFile +from pqdm.threads import pqdm class TestStoreSessions(unittest.TestCase): @@ -128,6 +133,76 @@ def test_store_can_create_s3_fsspec_session(self): return None + @responses.activate + def test_session_cloning_and_file_download(self): + # Mock URLs and their responses + mock_creds = { + "accessKeyId": "sure", + "secretAccessKey": "correct", + "sessionToken": "whynot", + } + urls = [f"https://example.com/file{i}" for i in range(1, 11)] + for i, url in enumerate(urls): + responses.add( + responses.GET, url, body=f"Content of file {i + 1}", status=200 + ) + + # Mock authentication and store setup + mock_auth = MagicMock() + mock_auth.authenticated = True + mock_auth.system.edl_hostname = "urs.earthdata.nasa.gov" # Mock hostname + responses.add( + responses.GET, + "https://urs.earthdata.nasa.gov/profile", + json=mock_creds, + status=200, + ) + + original_session = SessionWithHeaderRedirection() + original_session.cookies.set("sessionid", "mocked-session-id") + mock_auth.get_session.return_value = original_session + + # Create the Store instance + store = Store(auth=mock_auth) + store.thread_locals = threading.local() # Use real thread-local storage + + # Track cloned sessions + cloned_sessions = set() + + def mock_clone_session_in_local_thread(original_session): + """Mock session cloning to track cloned sessions.""" + if not hasattr(store.thread_locals, "local_thread_session"): + session = SessionWithHeaderRedirection() + session.cookies.update(original_session.cookies) + cloned_sessions.add(id(session)) # Track unique sessions by ID + store.thread_locals.local_thread_session = session + + with patch.object( + store, + "_clone_session_in_local_thread", + side_effect=mock_clone_session_in_local_thread, + ): + mock_directory = Path("/mock/directory") + downloaded_files = [] + + def mock_download_file(url): + """Mock file download to track downloaded files.""" + # Ensure session cloning happens before downloading + store._clone_session_in_local_thread(original_session) + downloaded_files.append(url) + return mock_directory / f"{url.split('/')[-1]}" + + with patch.object(store, "_download_file", side_effect=mock_download_file): + # Test multi-threaded download with 2 threads + pqdm(urls, store._download_file, n_jobs=2) # type: ignore + + # Verify sessions cloned + self.assertEqual(len(cloned_sessions), 2) # 2 sessions, one per thread + + # Verify files downloaded + self.assertEqual(len(downloaded_files), 10) # 10 files downloaded + self.assertCountEqual(downloaded_files, urls) # All files accounted for + @pytest.mark.xfail( reason="Expected failure: Reproduces a bug (#610) that has not yet been fixed." @@ -135,7 +210,7 @@ def test_store_can_create_s3_fsspec_session(self): def test_earthaccess_file_getattr(): fs = fsspec.filesystem("memory") with fs.open("/foo", "wb") as f: - earthaccess_file = EarthAccessFile(f, granule="foo") + earthaccess_file = EarthAccessFile(f, granule="foo") # type: ignore assert f.tell() == earthaccess_file.tell() # cleanup fs.store.clear() From 5140afa73270c7fe8e2ac99a8f7f89e828ee22a3 Mon Sep 17 00:00:00 2001 From: betolink Date: Fri, 17 Jan 2025 20:31:42 -0600 Subject: [PATCH 11/12] update test, multithreading is not evenly disttributed --- tests/unit/test_store.py | 129 ++++++++++++++++++++------------------- 1 file changed, 67 insertions(+), 62 deletions(-) diff --git a/tests/unit/test_store.py b/tests/unit/test_store.py index 9d25f91b..069cb030 100644 --- a/tests/unit/test_store.py +++ b/tests/unit/test_store.py @@ -134,74 +134,79 @@ def test_store_can_create_s3_fsspec_session(self): return None @responses.activate - def test_session_cloning_and_file_download(self): - # Mock URLs and their responses + def test_session_reuses_token_download(self): mock_creds = { "accessKeyId": "sure", "secretAccessKey": "correct", "sessionToken": "whynot", } - urls = [f"https://example.com/file{i}" for i in range(1, 11)] - for i, url in enumerate(urls): - responses.add( - responses.GET, url, body=f"Content of file {i + 1}", status=200 - ) - - # Mock authentication and store setup - mock_auth = MagicMock() - mock_auth.authenticated = True - mock_auth.system.edl_hostname = "urs.earthdata.nasa.gov" # Mock hostname - responses.add( - responses.GET, - "https://urs.earthdata.nasa.gov/profile", - json=mock_creds, - status=200, - ) + test_cases = [ + (2, 500), # 2 threads, 500 files + (4, 400), # 4 threads, 400 files + (8, 5000), # 8 threads, 5k files + ] + for n_threads, n_files in test_cases: + with self.subTest(n_threads=n_threads, n_files=n_files): + urls = [f"https://example.com/file{i}" for i in range(1, n_files + 1)] + for i, url in enumerate(urls): + responses.add( + responses.GET, url, body=f"Content of file {i + 1}", status=200 + ) + + mock_auth = MagicMock() + mock_auth.authenticated = True + mock_auth.system.edl_hostname = "urs.earthdata.nasa.gov" + responses.add( + responses.GET, + "https://urs.earthdata.nasa.gov/profile", + json=mock_creds, + status=200, + ) - original_session = SessionWithHeaderRedirection() - original_session.cookies.set("sessionid", "mocked-session-id") - mock_auth.get_session.return_value = original_session - - # Create the Store instance - store = Store(auth=mock_auth) - store.thread_locals = threading.local() # Use real thread-local storage - - # Track cloned sessions - cloned_sessions = set() - - def mock_clone_session_in_local_thread(original_session): - """Mock session cloning to track cloned sessions.""" - if not hasattr(store.thread_locals, "local_thread_session"): - session = SessionWithHeaderRedirection() - session.cookies.update(original_session.cookies) - cloned_sessions.add(id(session)) # Track unique sessions by ID - store.thread_locals.local_thread_session = session - - with patch.object( - store, - "_clone_session_in_local_thread", - side_effect=mock_clone_session_in_local_thread, - ): - mock_directory = Path("/mock/directory") - downloaded_files = [] - - def mock_download_file(url): - """Mock file download to track downloaded files.""" - # Ensure session cloning happens before downloading - store._clone_session_in_local_thread(original_session) - downloaded_files.append(url) - return mock_directory / f"{url.split('/')[-1]}" - - with patch.object(store, "_download_file", side_effect=mock_download_file): - # Test multi-threaded download with 2 threads - pqdm(urls, store._download_file, n_jobs=2) # type: ignore - - # Verify sessions cloned - self.assertEqual(len(cloned_sessions), 2) # 2 sessions, one per thread - - # Verify files downloaded - self.assertEqual(len(downloaded_files), 10) # 10 files downloaded - self.assertCountEqual(downloaded_files, urls) # All files accounted for + original_session = SessionWithHeaderRedirection() + original_session.cookies.set("sessionid", "mocked-session-cookie") + mock_auth.get_session.return_value = original_session + + store = Store(auth=mock_auth) + store.thread_locals = threading.local() # Use real thread-local storage + + # Track cloned sessions + cloned_sessions = set() + + def mock_clone_session_in_local_thread(original_session): + """Mock session cloning to track cloned sessions.""" + if not hasattr(store.thread_locals, "local_thread_session"): + session = SessionWithHeaderRedirection() + session.cookies.update(original_session.cookies) + cloned_sessions.add(id(session)) + store.thread_locals.local_thread_session = session + + with patch.object( + store, + "_clone_session_in_local_thread", + side_effect=mock_clone_session_in_local_thread, + ): + mock_directory = Path("/mock/directory") + downloaded_files = [] + + def mock_download_file(url): + """Mock file download to track downloaded files.""" + # Ensure session cloning happens before downloading + store._clone_session_in_local_thread(original_session) + downloaded_files.append(url) + return mock_directory / f"{url.split('/')[-1]}" + + with patch.object( + store, "_download_file", side_effect=mock_download_file + ): + # Test multi-threaded download + pqdm(urls, store._download_file, n_jobs=n_threads) # type: ignore + + # We make sure we reuse the token up to N threads + self.assertTrue(len(cloned_sessions) <= n_threads) + + self.assertEqual(len(downloaded_files), n_files) # 10 files downloaded + self.assertCountEqual(downloaded_files, urls) # All files accounted for @pytest.mark.xfail( From b628c71dd0e0485d14bff49e16c554c92c406806 Mon Sep 17 00:00:00 2001 From: betolink Date: Fri, 17 Jan 2025 20:40:56 -0600 Subject: [PATCH 12/12] fix docstring --- earthaccess/store.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/earthaccess/store.py b/earthaccess/store.py index c1bb4742..35f369e0 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -331,9 +331,6 @@ def get_requests_session(self) -> SessionWithHeaderRedirection: This HTTPS session can be used to download granules if we want to use a direct, lower level API. - Parameters: - bearer_token: if true, will be used for authenticated queries on CMR - Returns: requests Session """