Skip to content

Commit

Permalink
Enforce max_requests_in_parallel in profiles download
Browse files Browse the repository at this point in the history
  • Loading branch information
dostuffthatmatters committed Dec 26, 2023
1 parent 0da7278 commit 2203f45
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 26 deletions.
3 changes: 1 addition & 2 deletions config/config.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
"profiles": {
"server": {
"email": "...@...",
"max_requests_in_parallel": 20,
"max_requests_per_day": 100
"max_parallel_requests": 25
},
"scope": {
"from_date": "2022-01-01",
Expand Down
14 changes: 0 additions & 14 deletions src/profiles/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,6 @@ def dump(self) -> None:
with open(_CACHE_FILE, "w") as f:
f.write(self.model_dump_json(indent=4))

""" def get_already_requested_dates(
self,
location: profiles.generate_queries.ProfilesQueryLocation,
) -> set[datetime.date]:
already_requested_dates: set[datetime.date] = set()
for entry in self.root:
if entry.location == location:
already_requested_dates.update(
tum_esm_utils.time.date_range(
entry.from_date, entry.to_date
)
)
return already_requested_dates"""

def get_active_queries(
self,
atmospheric_profile_model: types.AtmosphericProfileModel,
Expand Down
6 changes: 6 additions & 0 deletions src/profiles/download_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ def download_data(
if atmospheric_profile_model == "GGG2020":
if len(tarballs_to_download) >= 1:
fulfilled_queries.append(query)
progress.print(f"Found!")
else:
progress.print(f"Not found!")
else:
if len(tarballs_to_download) >= 2:
fulfilled_queries.append(query)
progress.print(f"Found!")
else:
progress.print(f"Not found!")
for t in tarballs_to_download:
with io.BytesIO() as archive:
ftp.retrbinary(
Expand Down
24 changes: 22 additions & 2 deletions src/profiles/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ def run() -> None:
cache.dump()
print("Updated cache")

still_running_query_count = len(running_queries
) - len(fulfilled_queries)
open_query_count = config.profiles.server.max_parallel_requests - still_running_query_count

print(f"{still_running_query_count} queries are still running")
if open_query_count <= 0:
print(
"No open slots for new queries " +
"(config.profiles.server.max_parallel_requests = " +
f"{config.profiles.server.max_parallel_requests})"
)
continue
else:
print(f"{open_query_count} open slots for new queries")

# Generate daily sensor sets
missing_queries = profiles.generate_queries.generate_download_queries(
config, version
Expand All @@ -47,6 +62,8 @@ def run() -> None:
print("No data to request.")
continue

# queries might not be in cache anymore but still
# downloadable from the server
print(f"Trying to download {len(missing_queries)} queries")
fulfilled_queries = profiles.download_logic.download_data(
config, missing_queries, ftp, version
Expand All @@ -58,9 +75,12 @@ def run() -> None:
print(
f"Successfully downloaded {len(fulfilled_queries)} queries"
)
print(f"Requesting {len(missing_queries)} queries")
query_count = min(open_query_count, len(missing_queries))
print(
f"Requesting {query_count} out of {len(missing_queries)} queries"
)
profiles.upload_logic.upload_requests(
config, missing_queries, ftp, version
config, missing_queries[: query_count], ftp, version
)
print(
"Done. Run this script again (after waiting " +
Expand Down
10 changes: 2 additions & 8 deletions src/types/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,13 @@ class ProfilesServerConfig(pydantic.BaseModel):
min_length=3,
description="Email address to use to log in to the ccycle ftp server.",
)
max_requests_in_parallel: int = pydantic.Field(
max_parallel_requests: int = pydantic.Field(
...,
ge=1,
le=100,
le=200,
description=
"Maximum number of requests to put in the queue on the ccycle server at the same time. Only when a request is finished, a new one can enter the queue.",
)
max_requests_per_day: int = pydantic.Field(
...,
ge=1,
le=1000,
description="Maximum number of requests to the ccycle server per day.",
)


class ProfilesScopeConfig(pydantic.BaseModel):
Expand Down

0 comments on commit 2203f45

Please sign in to comment.