-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
31 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
Base classes for FPDS XML elements. | ||
author: [email protected] | ||
last_updated: 01/20/2024 | ||
last_updated: 07/13/2024 | ||
""" | ||
|
||
import asyncio | ||
|
@@ -17,6 +17,7 @@ | |
|
||
from aiohttp import ClientSession | ||
|
||
from fpds.core import FPDS_ENTRY | ||
from fpds.core.mixins import fpdsMixin | ||
from fpds.core.xml import fpdsXML | ||
from fpds.utilities import validate_kwarg | ||
|
@@ -74,9 +75,6 @@ def __init__( | |
for kwarg, value in self.kwargs.items(): | ||
self.kwargs[kwarg] = validate_kwarg(kwarg=kwarg, string=value) | ||
|
||
def __call__(self) -> List[Dict[str, Union[str, float]]]: | ||
return self.process_records() | ||
|
||
def __str__(self) -> str: # pragma: no cover | ||
"""String representation of `fpdsRequest`.""" | ||
kwargs_str = " ".join([f"{key}={value}" for key, value in self.kwargs.items()]) | ||
|
@@ -93,7 +91,7 @@ def search_params(self) -> str: | |
return " ".join(_params) | ||
|
||
@property | ||
def max_pages(self) -> int: | ||
def page_count(self) -> int: | ||
"""Total number of FPDS pages contained in request.""" | ||
return len(self.links) | ||
|
||
|
@@ -119,7 +117,7 @@ async def convert(self, session: ClientSession, link: str) -> fpdsXML: | |
xml = fpdsXML(content=self.convert_to_lxml_tree(content)) | ||
return xml | ||
|
||
async def fetch(self): | ||
async def fetch(self) -> List[fpdsXML]: | ||
self.create_request_links() | ||
semaphore = Semaphore(self.thread_count) | ||
|
||
|
@@ -128,9 +126,11 @@ async def fetch(self): | |
tasks = [self.convert(session, link) for link in self.links] | ||
return await asyncio.gather(*tasks) | ||
|
||
def page_index(self): | ||
def page_index(self) -> Optional[int]: | ||
"""Converts `page` to index integer.""" | ||
idx = 0 if self.page == 1 else self.page - 1 | ||
idx = None | ||
if self.page: | ||
idx = 0 if self.page == 1 else self.page - 1 | ||
return idx | ||
|
||
def create_request_links(self) -> None: | ||
|
@@ -146,22 +146,23 @@ def create_request_links(self) -> None: | |
self.links = links | ||
|
||
if self.page: | ||
if self.page > self.max_pages: | ||
raise ValueError(f"Max response page count is {self.max_pages}!") | ||
self.links = [links[self.page_index()]] | ||
idx = self.page_index() | ||
if idx: | ||
if self.page > self.page_count: | ||
raise ValueError(f"Max response page count is {self.page_count}!") | ||
self.links = [links[idx]] | ||
|
||
@staticmethod | ||
def _jsonify(entry): | ||
def _jsonify(entry) -> List[FPDS_ENTRY]: | ||
"""Wrapper around `jsonify` method for avoiding pickle issue.""" | ||
return entry.jsonify() | ||
|
||
async def data(self) -> List[Dict[str, Union[str, float]]]: | ||
async def data(self) -> List[FPDS_ENTRY]: | ||
num_processes = multiprocessing.cpu_count() | ||
data = await self.fetch() | ||
|
||
# for parallel processing | ||
with ProcessPoolExecutor(max_workers=num_processes) as pool: | ||
results = pool.map(self._jsonify, data) | ||
results = list(pool.map(self._jsonify, data)) | ||
|
||
data = list(chain.from_iterable(results)) | ||
return data | ||
return list(chain.from_iterable(results)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
XML classes for parsing FPDS content. | ||
author: [email protected] | ||
last_updated: 06/05/2024 | ||
last_updated: 07/13/2024 | ||
""" | ||
|
||
import re | ||
|
@@ -109,8 +109,10 @@ def namespace_dict(self) -> Dict[str, str]: | |
return namespace_dict | ||
|
||
@property | ||
def total_record_count(self) -> int: | ||
"""Total number of records across all pagination links.""" | ||
def lower_limit(self) -> int: | ||
"""Lower limit of record count (i.e. if 40, it means there is a total of | ||
40-49 records). | ||
""" | ||
last_link = self.tree.find(".//ns0:link[@rel='last']", self.namespace_dict) | ||
if isinstance(last_link, Element): | ||
# length of last_link should always be 1 | ||
|
@@ -126,8 +128,8 @@ def pagination_links(self, params: str) -> List[str]: | |
total record count value. | ||
""" | ||
resp_size = self.response_size | ||
offset = 0 if self.total_record_count < 10 else resp_size | ||
page_range = list(range(0, self.total_record_count + offset, resp_size)) | ||
offset = 0 if self.lower_limit < 10 else resp_size | ||
page_range = list(range(0, self.lower_limit + offset, resp_size)) | ||
page_links = [] | ||
for num in page_range: | ||
link = f"{self.url_base}&q={params}&start={num}" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters