-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2 from gardenlinux/initial-wsgi
Add initial web server
- Loading branch information
Showing
6 changed files
with
165 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
# SPDX-License-Identifier: MIT | ||
|
||
import collections.abc | ||
import contextlib | ||
|
||
from quart import Quart | ||
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, create_async_engine | ||
|
||
|
||
class QuartDb: | ||
engine: AsyncEngine | ||
|
||
def __init__(self, app: Quart) -> None: | ||
# TODO: Use config | ||
self.engine = create_async_engine( | ||
"postgresql+asyncpg:///", | ||
echo=True, | ||
pool_size=50, | ||
max_overflow=0, | ||
) | ||
|
||
setattr(app, 'db_begin', self) | ||
|
||
@contextlib.asynccontextmanager | ||
async def __call__(self) -> collections.abc.AsyncGenerator[AsyncConnection, None]: | ||
''' | ||
Provides an asynchronous SQLAlchemy connection to application. | ||
All transactions are rolled back, as we don't support writing | ||
through the application. | ||
''' | ||
async with self.engine.begin() as conn: | ||
yield conn | ||
await conn.rollback() | ||
|
||
|
||
def create_app(): | ||
app = Quart(__name__) | ||
|
||
# TODO: Read config | ||
|
||
QuartDb(app) | ||
|
||
from .nvd import bp as bp_nvd | ||
app.register_blueprint(bp_nvd) | ||
|
||
return app |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# SPDX-License-Identifier: MIT | ||
|
||
from quart import Blueprint, current_app, request | ||
from sqlalchemy import select | ||
|
||
from ..database import NvdCve | ||
|
||
|
||
bp = Blueprint('nvd', __name__) | ||
|
||
|
||
@bp.route('/rest/json/cves/2.0+deb') | ||
async def nvd_cve_deb(): | ||
# XXX: Replace with view | ||
stmt = select(NvdCve.data) | ||
|
||
if cve_id := request.args.get('cveId', type=str): | ||
stmt = stmt.where(NvdCve.cve_id == cve_id) | ||
|
||
async with current_app.db_begin() as conn: | ||
result = await conn.execute(stmt) | ||
|
||
return { | ||
'format': 'NVD_CVE', | ||
'version': '2.0+deb', | ||
'vulnerabilities': [{'cve': i[0]} for i in result], | ||
}, 200 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
# SPDX-License-Identifier: MIT | ||
|
||
import pytest | ||
|
||
import contextlib | ||
|
||
from glvd.web import create_app | ||
|
||
|
||
@pytest.fixture() | ||
def app(db_conn): | ||
app = create_app() | ||
|
||
@contextlib.asynccontextmanager | ||
async def db_begin(): | ||
yield db_conn | ||
setattr(app, 'db_begin', db_begin) | ||
|
||
yield app | ||
|
||
|
||
@pytest.fixture() | ||
def client(app): | ||
return app.test_client() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
# SPDX-License-Identifier: MIT | ||
|
||
import pytest | ||
|
||
from datetime import datetime | ||
|
||
from sqlalchemy import insert | ||
|
||
from glvd.database import NvdCve | ||
|
||
|
||
class TestNvdCve: | ||
@pytest.fixture(autouse=True) | ||
async def setup_example(self, db_conn): | ||
for i in range(2): | ||
insert_stmt = insert(NvdCve).values( | ||
cve_id=f'TEST-{i}', | ||
last_mod=datetime.fromisoformat('2019-04-01T00:00:00'), | ||
data={ | ||
'id': f'TEST-{i}', | ||
}, | ||
) | ||
await db_conn.execute(insert_stmt) | ||
|
||
async def test_deb_cveid_simple(self, client, db_conn): | ||
resp = await client.get( | ||
'/rest/json/cves/2.0+deb', | ||
query_string={ | ||
'cveId': 'TEST-0', | ||
}, | ||
) | ||
|
||
assert resp.status_code == 200 | ||
assert (await resp.json) == { | ||
'format': 'NVD_CVE', | ||
'version': '2.0+deb', | ||
'vulnerabilities': [ | ||
{ | ||
'cve': { | ||
'id': 'TEST-0', | ||
}, | ||
}, | ||
], | ||
} | ||
|
||
async def test_deb_cveid_nonexist(self, client, db_conn): | ||
resp = await client.get( | ||
'/rest/json/cves/2.0+deb', | ||
query_string={ | ||
'cveId': 'TEST-NONEXIST', | ||
}, | ||
) | ||
|
||
assert resp.status_code == 200 | ||
assert (await resp.json) == { | ||
'format': 'NVD_CVE', | ||
'version': '2.0+deb', | ||
'vulnerabilities': [] | ||
} |