From a644fc4940a8a76cd67b7f31e073605787d3df24 Mon Sep 17 00:00:00 2001 From: Bodong Yang <86948717+Bodong-Yang@users.noreply.github.com> Date: Mon, 2 Dec 2024 11:39:57 +0900 Subject: [PATCH] refactor(backport v3.8.x): ota_proxy: refine request handling flow (#443) This PR simplifies the request handling flow, instead of implementing the guard conditions checks in retrieve_file API, now the guard conditions checks are implemented within each _retrieve_file handlers. --- src/ota_proxy/ota_cache.py | 181 +++++++++++++++++++++---------------- 1 file changed, 102 insertions(+), 79 deletions(-) diff --git a/src/ota_proxy/ota_cache.py b/src/ota_proxy/ota_cache.py index 641f381c8..5a7e927dc 100644 --- a/src/ota_proxy/ota_cache.py +++ b/src/ota_proxy/ota_cache.py @@ -22,7 +22,7 @@ import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path -from typing import AsyncIterator, Dict, List, Mapping, Optional, Tuple +from typing import AsyncIterator, Mapping, Optional from urllib.parse import SplitResult, quote, urlsplit import aiohttp @@ -276,7 +276,7 @@ def _background_check_free_space(self): ) time.sleep(cfg.DISK_USE_PULL_INTERVAL) - def _cache_entries_cleanup(self, entry_hashes: List[str]): + def _cache_entries_cleanup(self, entry_hashes: list[str]) -> None: """Cleanup entries indicated by entry_hashes list.""" for entry_hash in entry_hashes: # remove cache entry @@ -368,7 +368,7 @@ async def _retrieve_file_by_downloading( raw_url: str, *, headers: Mapping[str, str], - ) -> Tuple[AsyncIterator[bytes], CIMultiDictProxy[str]]: + ) -> tuple[AsyncIterator[bytes], CIMultiDictProxy[str]]: async def _do_request() -> AsyncIterator[bytes]: async with self._session.get( self._process_raw_url(raw_url), @@ -392,14 +392,25 @@ async def _do_request() -> AsyncIterator[bytes]: resp_headers: CIMultiDictProxy[str] = await (_remote_fd := _do_request()).__anext__() # type: ignore return _remote_fd, resp_headers - async def _retrieve_file_by_cache( - self, cache_identifier: str, *, retry_cache: bool + async def _retrieve_file_by_cache_lookup( + self, *, raw_url: str, cache_policy: OTAFileCacheControl ) -> tuple[AsyncIterator[bytes], CIMultiDict[str]] | None: """ Returns: A tuple of bytes iterator and headers dict for back to client. """ - # cache file available, lookup the db for metadata + if ( + not self._cache_enabled + or cache_policy.no_cache + or cache_policy.retry_caching + ): + return + + cache_identifier = cache_policy.file_sha256 + if not cache_identifier: + # fallback to use URL based hash, and clear compression_alg for such case + cache_identifier = url_based_hash(raw_url) + meta_db_entry = await self._lru_helper.lookup_entry(cache_identifier) if not meta_db_entry: return @@ -407,13 +418,7 @@ async def _retrieve_file_by_cache( # NOTE: db_entry.file_sha256 can be either # 1. valid sha256 value for corresponding plain uncompressed OTA file # 2. URL based sha256 value for corresponding requested URL - # otaclient indicates that this cache entry is invalid, cleanup and exit cache_file = self._base_dir / cache_identifier - if retry_cache: - logger.debug(f"requested with retry_cache: {meta_db_entry=}..") - await self._lru_helper.remove_entry(cache_identifier) - cache_file.unlink(missing_ok=True) - return # check if cache file exists # NOTE(20240729): there is an edge condition that the finished cached file is not yet renamed, @@ -423,7 +428,6 @@ async def _retrieve_file_by_cache( for _retry_count in range(_retry_count_max): if cache_file.is_file(): break - await asyncio.sleep(get_backoff(_retry_count, _factor, _backoff_max)) if not cache_file.is_file(): @@ -445,7 +449,12 @@ async def _retrieve_file_by_external_cache( self, client_cache_policy: OTAFileCacheControl ) -> tuple[AsyncIterator[bytes], CIMultiDict[str]] | None: # skip if not external cache or otaclient doesn't sent valid file_sha256 - if not self._external_cache or not client_cache_policy.file_sha256: + if ( + not self._external_cache + or client_cache_policy.no_cache + or client_cache_policy.retry_caching + or not client_cache_policy.file_sha256 + ): return cache_identifier = client_cache_policy.file_sha256 @@ -473,83 +482,34 @@ async def _retrieve_file_by_external_cache( ) return read_file(cache_file, executor=self._executor), _header - # exposed API - - async def retrieve_file( + async def _retrieve_file_by_new_caching( self, + *, raw_url: str, - headers_from_client: Dict[str, str], - ) -> tuple[AsyncIterator[bytes], CIMultiDict[str] | CIMultiDictProxy[str]] | None: - """Retrieve a file descriptor for the requested . - - This method retrieves a file descriptor for incoming client request. - Upper uvicorn app can use this file descriptor to yield chunks of data, - and send chunks to the on-calling ota_client. - - NOTE: use raw_url in all operations, except opening remote file. - - Args: - raw_url: unquoted raw url received from uvicorn - headers_from_client: headers come from client's request, which will be - passthrough to upper otaproxy and/or remote OTA image server. - - Returns: - A tuple contains an asyncio generator for upper server app to yield data chunks from - and headers dict that should be sent back to client in resp. - """ - if self._closed: - raise BaseOTACacheError("ota cache pool is closed") - - cache_policy = OTAFileCacheControl.parse_header( - headers_from_client.get(HEADER_OTA_FILE_CACHE_CONTROL, "") - ) - if cache_policy.no_cache: - logger.info(f"client indicates that do not cache for {raw_url=}") - - if not self._upper_proxy: - headers_from_client.pop(HEADER_OTA_FILE_CACHE_CONTROL, None) - - # --- case 1: not using cache, directly download file --- # - if ( - not self._cache_enabled # ota_proxy is configured to not cache anything - or cache_policy.no_cache # ota_client send request with no_cache policy - or not self._storage_below_hard_limit_event.is_set() # disable cache if space hardlimit is reached - ): - logger.debug( - f"not use cache({self._cache_enabled=}, {cache_policy=}, " - f"{self._storage_below_hard_limit_event.is_set()=}): {raw_url=}" - ) - return await self._retrieve_file_by_downloading( - raw_url, headers=headers_from_client - ) - - # --- case 2: if externel cache source available, try to use it --- # - # NOTE: if client requsts with retry_caching directive, it may indicate cache corrupted - # in external cache storage, in such case we should skip the use of external cache. + cache_policy: OTAFileCacheControl, + headers_from_client: dict[str, str], + ) -> tuple[AsyncIterator[bytes], CIMultiDictProxy[str] | CIMultiDict[str]] | None: + # NOTE(20241202): no new cache on hard limit being reached if ( - self._external_cache - and not cache_policy.retry_caching - and (_res := await self._retrieve_file_by_external_cache(cache_policy)) + not self._cache_enabled + or cache_policy.no_cache + or not self._storage_below_hard_limit_event.is_set() ): - return _res + return - # pre-calculated cache_identifier and corresponding compression_alg cache_identifier = cache_policy.file_sha256 compression_alg = cache_policy.file_compression_alg - - # fallback to use URL based hash, and clear compression_alg if not cache_identifier: + # fallback to use URL based hash, and clear compression_alg for such case cache_identifier = url_based_hash(raw_url) compression_alg = "" - # --- case 3: try to use local cache --- # - if _res := await self._retrieve_file_by_cache( - cache_identifier, retry_cache=cache_policy.retry_caching - ): - return _res + # if set, cleanup any previous cache file before starting new cache + if cache_policy.retry_caching: + logger.debug(f"requested with retry_cache for {raw_url=} ...") + await self._lru_helper.remove_entry(cache_identifier) + (self._base_dir / cache_identifier).unlink(missing_ok=True) - # --- case 4: no cache available, streaming remote file and cache --- # - # a online tracker is available for this requrest if (tracker := self._on_going_caching.get_tracker(cache_identifier)) and ( subscription := await tracker.subscribe_tracker() ): @@ -588,3 +548,66 @@ async def retrieve_file( raise finally: tracker = None # remove ref + + # exposed API + + async def retrieve_file( + self, raw_url: str, headers_from_client: dict[str, str] + ) -> tuple[AsyncIterator[bytes], CIMultiDict[str] | CIMultiDictProxy[str]] | None: + """Retrieve a file descriptor for the requested . + + This method retrieves a file descriptor for incoming client request. + Upper uvicorn app can use this file descriptor to yield chunks of data, + and send chunks to the on-calling ota_client. + + NOTE: use raw_url in all operations, except opening remote file. + + Args: + raw_url: unquoted raw url received from uvicorn + headers_from_client: headers come from client's request, which will be + passthrough to upper otaproxy and/or remote OTA image server. + + Returns: + A tuple contains an asyncio generator for upper server app to yield data chunks from + and headers dict that should be sent back to client in resp. + """ + if self._closed: + raise BaseOTACacheError("ota cache pool is closed") + + cache_policy = OTAFileCacheControl.parse_header( + headers_from_client.get(HEADER_OTA_FILE_CACHE_CONTROL, "") + ) + if cache_policy.no_cache: + logger.info(f"client indicates that do not cache for {raw_url=}") + + # when there is no upper_proxy, do not passthrough the OTA_FILE_CACHE_CONTROL header. + if not self._upper_proxy: + headers_from_client.pop(HEADER_OTA_FILE_CACHE_CONTROL, None) + + # a fastpath when cache is not enabled or client requires so + if not self._cache_enabled or cache_policy.no_cache: + return await self._retrieve_file_by_downloading( + raw_url, headers=headers_from_client + ) + + # NOTE(20241202): behavior changed: even if _cache_enabled is False, if external_cache is configured + # and loaded, still try to use external cache source. + if _res := await self._retrieve_file_by_external_cache(cache_policy): + return _res + + if _res := await self._retrieve_file_by_cache_lookup( + raw_url=raw_url, cache_policy=cache_policy + ): + return _res + + if _res := await self._retrieve_file_by_new_caching( + raw_url=raw_url, + cache_policy=cache_policy, + headers_from_client=headers_from_client, + ): + return _res + + # as last resort, finally try to handle the request by directly downloading + return await self._retrieve_file_by_downloading( + raw_url, headers=headers_from_client + )