Skip to content

Commit

Permalink
typehint & loglevel
Browse files Browse the repository at this point in the history
  • Loading branch information
nferc committed Mar 4, 2024
1 parent a0c41e2 commit 2d5ccf0
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 21 deletions.
4 changes: 0 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,3 @@ RUN chmod u+x /app/startup.sh
ENTRYPOINT ["/app/startup.sh"]

EXPOSE 443

# DEBUG
RUN mkdir /app/dump
RUN chown www-data:www-data /app/dump
35 changes: 18 additions & 17 deletions avgate/avgate.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import ssl
import types
from email.message import EmailMessage
from typing import List, cast
from typing import Callable, Generator, List, cast
from urllib.parse import unquote, urlparse

import lxml.etree as ET
Expand Down Expand Up @@ -168,7 +168,7 @@ def check():
return Response(res, mimetype="text/plain", status=503 if err_count else 200)


def check_clamav():
def check_clamav() -> str:
clamd_path = config["config"].get("clamd_socket")
if clamd_path:
test = clamav_sock.ping()
Expand All @@ -177,7 +177,7 @@ def check_clamav():
return "clamav: no ping\n"


def check_icap():
def check_icap() -> str:
icap_host = config["config"].get("icap_host")
if icap_host:
try:
Expand All @@ -187,7 +187,7 @@ def check_icap():
return "icap: failed\n"


def phr_service():
def phr_service() -> Response:
"""Scan AV on xop documents for retrieveDocumentSetRequest"""
client_config = get_client_config()
with request_upstream(client_config) as upstream:
Expand All @@ -199,7 +199,7 @@ def phr_service():

# debugging only - remove after testing
if EICAR in data:
fn = f"/app/dump/{request.path.replace('/', '_')}.xml"
fn = f"/tmp/{request.path.replace('/', '_')}.xml"
with open(fn, "wb") as f:
f.write(data)
logger.error(f"found EICAR signature - see content in file {fn}")
Expand All @@ -209,7 +209,7 @@ def phr_service():
return response


def other():
def other() -> Response:
"""Streamed forward without scan"""
client_config = get_client_config()
upstream = request_upstream(client_config, stream=True)
Expand All @@ -223,7 +223,7 @@ def generate():
return response


def request_upstream(client_config, warn=True, stream=False):
def request_upstream(client_config, warn=True, stream=False) -> Response:
"""Request to real Konnektor"""

konn = client_config["Konnektor"]
Expand Down Expand Up @@ -272,7 +272,7 @@ def request_upstream(client_config, warn=True, stream=False):
abort(502)


def get_client_config():
def get_client_config() -> configparser.ConfigParser:
request_ip = request.headers.get("X-real-ip", request.host.split(":")[0])
port = request.host.split(":")[1] if ":" in request.host else "443"

Expand Down Expand Up @@ -444,7 +444,7 @@ def handle_attachment(
msg.attach(att)


def get_malicious_content_ids(msg: EmailMessage):
def get_malicious_content_ids(msg: EmailMessage) -> Generator[str, None, None]:
"""Extracting content_ids of malicious attachments"""
for att in msg.iter_attachments():
att = cast(EmailMessage, att)
Expand Down Expand Up @@ -524,7 +524,7 @@ def fix_status(xml_resp, xml_errlist, xml_ns, msg):

def build_payload(
msg: EmailMessage, malicious_content_ids: List[str], res: requests.Response
):
) -> bytes:
"create payload based on original response with replacing only payoad for malicious_content_ids"

content_type = res.headers["Content-Type"]
Expand Down Expand Up @@ -576,7 +576,7 @@ def get_content_id(content: bytes):
}


def get_replacement(mimetype):
def get_replacement(mimetype) -> bytes:
"""get content for replacements"""
filename = replacement_files.get(mimetype) or replacement_files.get("text/plain")
with open(filename, "rb") as f:
Expand All @@ -590,7 +590,7 @@ def dump(dict):
# File Scanning


def get_file_scanner():
def get_file_scanner() -> Callable[[bytes], List[str | None]]:
clamd_path = config["config"].get("clamd_socket")
icap_host = config["config"].get("icap_host")

Expand All @@ -611,13 +611,13 @@ def get_file_scanner():
return scan_file_icap


def scan_file_clamav(content):
def scan_file_clamav(content: bytes) -> List[str | None]:
"return scan result, do use clamav socket"
scan_res = clamav_sock.instream(io.BytesIO(content))["stream"]
return scan_res


def scan_file_icap(content):
def scan_file_icap(content: bytes) -> List[str | None]:
"return scan result, do use icap"
icap_service = config["config"]["icap_service"]
icap_host = config["config"]["icap_host"]
Expand Down Expand Up @@ -650,9 +650,10 @@ def scan_file_icap(content):
first_line = first_block.partition(b"\r\n")[0]
http_response_code = second_block.partition(b"\r\n")[0]

logger.debug(f"check icar {content[:30]} - {EICAR in content}")
if EICAR in content:
logger.debug(f"Eicar ICAP REQ \n{req.encode()}{content}{footer.encode()}")
logger.debug(f"RESP\n{first_block}{second_block[:500]}")
logger.debug(f"Eicar ICAP REQ \n{req.encode() + content + footer.encode()}")
logger.debug(f"RESP\n{first_block+second_block[:500]}")

# debug
return ["OK", None]
Expand All @@ -677,7 +678,7 @@ def scan_file_icap(content):
return ["FOUND", "unknown"]


def _open_sock(host, port, tls):
def _open_sock(host: str, port: int, tls: bool) -> socket:
if tls:
with socket.create_connection((host, port)) as sock:
context = ssl.create_default_context()
Expand Down
1 change: 1 addition & 0 deletions docs/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ AV-Gate als Docker
-e ICAP_SERVICE=icap://av_server/avscan \
-e KONNEKTOR=https://host.docker.internal:5000 \
-e SSL_VERIFY=false \
-e LOG_LEVEL=DEBUG \
--mount type=bind,source="$(pwd)"/cert,target=/app/cert,readonly \
avgate
```
Expand Down

0 comments on commit 2d5ccf0

Please sign in to comment.