diff --git a/README.rst b/README.rst index 05139388..251e7726 100644 --- a/README.rst +++ b/README.rst @@ -1,8 +1,8 @@ uap-python ========== -A python implementation of the UA Parser (https://github.com/ua-parser, -formerly https://github.com/tobie/ua-parser) +Official python implementation of the `User Agent String +Parser `_ project. Build Status ------------ @@ -10,110 +10,118 @@ Build Status .. image:: https://github.com/ua-parser/uap-python/actions/workflows/ci.yml/badge.svg :alt: CI on the master branch - Installing ---------- -Install via pip -~~~~~~~~~~~~~~~ - -Just run: +Just add ``ua-parser`` to your project's dependencies, or run .. code-block:: sh $ pip install ua-parser -Manual install -~~~~~~~~~~~~~~ - -In the top-level directory run: - -.. code-block:: sh - - $ python setup.py install - -Change Log ---------------- -Because this repo is mostly a python wrapper for the User Agent String Parser repo (https://github.com/ua-parser/uap-core), the changes made to this repo are best described by the update diffs in that project. Please see the diffs for this submodule (https://github.com/ua-parser/uap-core/releases) for a list of what has changed between versions of this package. +to install in the current environment. Getting Started --------------- -Retrieve data on a user-agent string -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Retrieve all data on a user-agent string +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python - >>> from ua_parser import user_agent_parser - >>> import pprint - >>> pp = pprint.PrettyPrinter(indent=4) + >>> from ua_parser import parse >>> ua_string = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.104 Safari/537.36' - >>> parsed_string = user_agent_parser.Parse(ua_string) - >>> pp.pprint(parsed_string) - { 'device': {'brand': 'Apple', 'family': 'Mac', 'model': 'Mac'}, - 'os': { 'family': 'Mac OS X', - 'major': '10', - 'minor': '9', - 'patch': '4', - 'patch_minor': None}, - 'string': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) ' - 'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.104 ' - 'Safari/537.36', - 'user_agent': { 'family': 'Chrome', - 'major': '41', - 'minor': '0', - 'patch': '2272'}} - -Extract browser data from user-agent string -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + >>> parse(ua_string) # doctest: +NORMALIZE_WHITESPACE, +ELLIPSIS + ParseResult(user_agent=UserAgent(family='Chrome', + major='41', + minor='0', + patch='2272', + patch_minor='104'), + os=OS(family='Mac OS X', + major='10', + minor='9', + patch='4', + patch_minor=None), + device=Device(family='Mac', + brand='Apple', + model='Mac'), + string='Mozilla/5.0 (Macintosh; Intel Mac OS... + +Any datum not found in the user agent string is set to ``None``:: + + >>> parse("") + ParseResult(user_agent=None, os=None, device=None, string='') + +Extract only browser data from user-agent string +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python - >>> from ua_parser import user_agent_parser - >>> import pprint - >>> pp = pprint.PrettyPrinter(indent=4) + >>> from ua_parser import parse_user_agent >>> ua_string = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.104 Safari/537.36' - >>> parsed_string = user_agent_parser.ParseUserAgent(ua_string) - >>> pp.pprint(parsed_string) - {'family': 'Chrome', 'major': '41', 'minor': '0', 'patch': '2272'} + >>> parse_user_agent(ua_string) + UserAgent(family='Chrome', major='41', minor='0', patch='2272', patch_minor='104') -.. +For specific domains, a match failure just returns ``None``:: - ⚠️Before 0.15, the convenience parsers (``ParseUserAgent``, - ``ParseOs``, and ``ParseDevice``) were not cached, which could - result in degraded performances when parsing large amounts of - identical user-agents (which might occur for real-world datasets). - - For these versions (up to 0.10 included), prefer using ``Parse`` - and extracting the sub-component you need from the resulting - dictionary. + >>> parse_user_agent("") Extract OS information from user-agent string ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python - >>> from ua_parser import user_agent_parser - >>> import pprint - >>> pp = pprint.PrettyPrinter(indent=4) + >>> from ua_parser import parse_os >>> ua_string = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.104 Safari/537.36' - >>> parsed_string = user_agent_parser.ParseOS(ua_string) - >>> pp.pprint(parsed_string) - { 'family': 'Mac OS X', - 'major': '10', - 'minor': '9', - 'patch': '4', - 'patch_minor': None} - -Extract Device information from user-agent string + >>> parse_os(ua_string) + OS(family='Mac OS X', major='10', minor='9', patch='4', patch_minor=None) + +Extract device information from user-agent string ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python - >>> from ua_parser import user_agent_parser - >>> import pprint - >>> pp = pprint.PrettyPrinter(indent=4) + >>> from ua_parser import parse_device >>> ua_string = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.104 Safari/537.36' - >>> parsed_string = user_agent_parser.ParseDevice(ua_string) - >>> pp.pprint(parsed_string) - {'brand': 'Apple', 'family': 'Mac', 'model': 'Mac'} + >>> parse_device(ua_string) + Device(family='Mac', brand='Apple', model='Mac') + +Parser +~~~~~~ + +Parsers expose the same functions (``parse``, ``parse_user_agent``, +``parse_os``, and ``parse_device``) as the top-level of the package, +however these are all *utility* methods. + +The actual protocol of parsers, and the one method which must be +implemented / overridden is:: + + def __call__(self, str, Components, /) -> ParseResult: + +It's similar to but more flexible than ``parse``: + +- The ``str`` is the user agent string. +- The ``Components`` is a hint, through which the caller requests the + domain (component) they are looking for, any combination of + ``Components.USER_AGENT``, ``Components.OS``, and + ``Components.DEVICE``. ``Domains.ALL`` exists as a convenience alias + for the combination of all three. + + The parser *must* return at least the requested information, but if + that's more convenient or no more expensive it *can* return more. +- The ``ParseResult`` is similar to ``CompleteParseResult``, except + all the attributes are ``Optional`` and it has a ``components: + Components`` attribute which specifies whether a component was never + requested (its value for the user agent string is unknown) or it has + been requested but could not be resolved (no match was found for the + user agent). + + ``ParseResult.complete()`` convert to a ``CompleteParseResult`` if + all the components are set, and raise an exception otherwise. If + some of the components are set to ``None``, they'll be swapped for a + default value. + +Calling the parser directly is part of the public API. One of the +advantage is that it does not return default values, as such it allows +more easily differentiating between a non-match (= ``None``) and a +default fallback (``family = "Other"``). diff --git a/pyproject.toml b/pyproject.toml index 48b6aa91..b42d432e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "ua-parser" description = "Python port of Browserscope's user agent parser" -version = "1.0.0a" +version = "1.0.0a1" readme = "README.rst" requires-python = ">=3.8" dependencies = [] diff --git a/setup.py b/setup.py index 0e14118c..d33bc820 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,6 @@ #!/usr/bin/env python # flake8: noqa +import io from contextlib import suppress from os import fspath from pathlib import Path @@ -51,6 +52,13 @@ def run(self) -> None: f"Unable to find regexes.yaml, should be at {yaml_src!r}" ) + def write_matcher(f, typ: str, fields: List[Optional[object]]): + f.write(f" {typ}(".encode()) + while len(fields) > 1 and fields[-1] is None: + fields = fields[:-1] + f.write(", ".join(map(repr, fields)).encode()) + f.write(b"),\n") + def write_params(fields): # strip trailing None values while len(fields) > 1 and fields[-1] is None: @@ -70,10 +78,20 @@ def write_params(fields): outdir = dist_dir / self.pkg_name outdir.mkdir(parents=True, exist_ok=True) - dest = outdir / "_regexes.py" + dest = outdir / "_matchers.py" + dest_legacy = outdir / "_regexes.py" - with dest.open("wb") as fp: + with dest.open("wb") as f, dest_legacy.open("wb") as fp: # fmt: off + f.write(b"""\ +######################################################## +# NOTICE: this file is autogenerated from regexes.yaml # +######################################################## + +from .core import Matchers, UserAgentMatcher, OSMatcher, DeviceMatcher + +MATCHERS: Matchers = ([ +""") fp.write(b"# -*- coding: utf-8 -*-\n") fp.write(b"########################################################\n") fp.write(b"# NOTICE: This file is autogenerated from regexes.yaml #\n") @@ -87,31 +105,35 @@ def write_params(fields): fp.write(b"\n") fp.write(b"USER_AGENT_PARSERS = [\n") for device_parser in regexes["user_agent_parsers"]: - fp.write(b" UserAgentParser(\n") - write_params([ + write_matcher(f, "UserAgentMatcher", [ device_parser["regex"], device_parser.get("family_replacement"), device_parser.get("v1_replacement"), device_parser.get("v2_replacement"), ]) - fp.write(b" ),\n") - fp.write(b"]\n") - fp.write(b"\n") - fp.write(b"DEVICE_PARSERS = [\n") - for device_parser in regexes["device_parsers"]: - fp.write(b" DeviceParser(\n") + + fp.write(b" UserAgentParser(\n") write_params([ device_parser["regex"], - device_parser.get("regex_flag"), - device_parser.get("device_replacement"), - device_parser.get("brand_replacement"), - device_parser.get("model_replacement"), + device_parser.get("family_replacement"), + device_parser.get("v1_replacement"), + device_parser.get("v2_replacement"), ]) fp.write(b" ),\n") - fp.write(b"]\n") - fp.write(b"\n") + f.write(b" ], [\n") + fp.write(b"]\n\n") + fp.write(b"OS_PARSERS = [\n") for device_parser in regexes["os_parsers"]: + write_matcher(f, "OSMatcher", [ + device_parser["regex"], + device_parser.get("os_replacement"), + device_parser.get("os_v1_replacement"), + device_parser.get("os_v2_replacement"), + device_parser.get("os_v3_replacement"), + device_parser.get("os_v4_replacement"), + ]) + fp.write(b" OSParser(\n") write_params([ device_parser["regex"], @@ -122,6 +144,29 @@ def write_params(fields): device_parser.get("os_v4_replacement"), ]) fp.write(b" ),\n") + f.write(b" ], [\n") + fp.write(b"]\n\n") + + fp.write(b"DEVICE_PARSERS = [\n") + for device_parser in regexes["device_parsers"]: + write_matcher(f, "DeviceMatcher", [ + device_parser["regex"], + device_parser.get("regex_flag"), + device_parser.get("device_replacement"), + device_parser.get("brand_replacement"), + device_parser.get("model_replacement"), + ]) + + fp.write(b" DeviceParser(\n") + write_params([ + device_parser["regex"], + device_parser.get("regex_flag"), + device_parser.get("device_replacement"), + device_parser.get("brand_replacement"), + device_parser.get("model_replacement"), + ]) + fp.write(b" ),\n") + f.write(b"])\n") fp.write(b"]\n") # fmt: on diff --git a/src/ua_parser/__init__.py b/src/ua_parser/__init__.py index f1c0a2a2..f6c5327b 100644 --- a/src/ua_parser/__init__.py +++ b/src/ua_parser/__init__.py @@ -1 +1,99 @@ -VERSION = (0, 16, 1) +"""The package provides top-level helpers which use a lazily initialised +default parser. These are convenience functions, for more control it +is perfectly acceptable to instantiate and call parsers directly. + +The default parser does use a cache keyed on the user-agent string, +but its exact behaviour is unspecified, if you require a consistent +behaviour or specific algorithm, set up your own parser (global or +not). + +For convenience, direct aliases are also provided for: + +- :mod:`core types <.types>` +- :mod:`caching utilities <.caching>` +- :mod:`ua_parser.basic.Parser` as :class:`BasicParser` + +This way importing anything but the top-level package should not be +necessary unless you want to *implement* a parser. +""" + +VERSION = (1, 0, 0) + +from typing import Optional +from .core import ( + DefaultedParseResult, + Device, + DeviceMatcher, + Domain, + Matchers, + OS, + OSMatcher, + ParseResult, + Parser, + PartialParseResult, + UserAgent, + UserAgentMatcher, +) +from .basic import Parser as BasicParser +from .caching import CachingParser, Clearing, LRU +from .loaders import load_builtins, load_data, load_yaml + + +parser: Parser + + +def __getattr__(name): + global parser + if name == "parser": + parser = CachingParser( + BasicParser(load_builtins()), + LRU(200), + ) + return parser + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + +def parse(ua: str) -> ParseResult: + """Parses the :class:`.UserAgent`, :class:`.OS`, and :class:`.Device` + information using the :func:`global parser `. + + Equivalent to calling each of :func:`parse_user_agent`, + :func:`parse_os`, and :func:`parse_device` but *may* be more + efficient than calling them separately depending on the underlying + parser. + + Even in the best case, prefer the domain-specific helpers if + you're not going to use *all* of them. + """ + # import required to trigger __getattr__ and initialise the + # parser, a `global` access fails to and we get a NameError + from . import parser + + return parser.parse(ua) + + +def parse_user_agent(ua: str) -> Optional[UserAgent]: + """Parses the :class:`browser <.UserAgent>` information using the + :func:`global parser `. + """ + from . import parser + + return parser.parse_user_agent(ua) + + +def parse_os(ua: str) -> Optional[OS]: + """Parses the :class:`.OS` information using the :func:`global parser + `. + """ + from . import parser + + return parser.parse_os(ua) + + +def parse_device(ua: str) -> Optional[Device]: + """Parses the :class:`.Device` information using the :func:`global + parser `. + """ + from . import parser + + return parser.parse_device(ua) diff --git a/src/ua_parser/_matchers.pyi b/src/ua_parser/_matchers.pyi new file mode 100644 index 00000000..a27227fd --- /dev/null +++ b/src/ua_parser/_matchers.pyi @@ -0,0 +1,3 @@ +from .core import Matchers + +MATCHERS: Matchers diff --git a/src/ua_parser/_regexes.pyi b/src/ua_parser/_regexes.pyi new file mode 100644 index 00000000..9050f8b0 --- /dev/null +++ b/src/ua_parser/_regexes.pyi @@ -0,0 +1,6 @@ +from typing import List +from .user_agent_parser import UserAgentParser, OSParser, DeviceParser + +USER_AGENT_PARSERS: List[UserAgentParser] +OS_PARSERS: List[OSParser] +DEVICE_PARSERS: List[DeviceParser] diff --git a/src/ua_parser/basic.py b/src/ua_parser/basic.py new file mode 100644 index 00000000..ecc8b406 --- /dev/null +++ b/src/ua_parser/basic.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import io +import os +from itertools import starmap +from operator import methodcaller +from typing import List + +from .core import ( + Parser as AbstractParser, + PartialParseResult, + Domain, + UserAgent, + OS, + Device, + Matchers, + UserAgentMatcher, + OSMatcher, + DeviceMatcher, +) + + +class Parser(AbstractParser): + """A simple pure-python parser based around trying a numer of regular + expressions in sequence for each domain, and returning a result + when one matches. + """ + + user_agent_parsers: List[UserAgentMatcher] + os_parsers: List[OSMatcher] + device_parsers: List[DeviceMatcher] + + def __init__( + self, + matchers: Matchers, + ) -> None: + self.user_agent_parsers = matchers[0] + self.os_parsers = matchers[1] + self.device_parsers = matchers[2] + + def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: + parse = methodcaller("__call__", ua) + return PartialParseResult( + domains=domains, + string=ua, + user_agent=( + next( + filter(None, map(parse, self.user_agent_parsers)), + None, + ) + if Domain.USER_AGENT in domains + else None + ), + os=( + next( + filter(None, map(parse, self.os_parsers)), + None, + ) + if Domain.OS in domains + else None + ), + device=( + next( + filter(None, map(parse, self.device_parsers)), + None, + ) + if Domain.DEVICE in domains + else None + ), + ) diff --git a/src/ua_parser/caching.py b/src/ua_parser/caching.py new file mode 100644 index 00000000..4f34b540 --- /dev/null +++ b/src/ua_parser/caching.py @@ -0,0 +1,128 @@ +import abc +from collections import OrderedDict +import threading +from typing import Callable, ContextManager, Dict, Optional, MutableMapping + +from .core import Parser, Domain, PartialParseResult + + +__all__ = [ + "CachingParser", + "Cache", + "Clearing", + "LRU", +] + + +class Cache(abc.ABC): + """Cache abstract protocol. The :class:`CachingParser` will look + values up, merge what was returned (possibly nothing) with what it + got from its actual parser, and *re-set the result*. + + A :class:`Cache` is responsible for its own replacement policy. + """ + + @abc.abstractmethod + def __setitem__(self, key: str, value: PartialParseResult): + """Adds or replace ``value`` to the cache at key ``key``.""" + ... + + @abc.abstractmethod + def __getitem__(self, key: str) -> Optional[PartialParseResult]: + """Returns a partial result for ``key`` if there is any.""" + ... + + +class Clearing(Cache): + """A clearing cache, if the cache is full, just remove all the entries + and re-fill from scratch. + + This can also be used as a cheap permanent cache by setting the + ``maxsize`` to infinity (or at least some very large value), + however this is probably a bad idea as it *will* lead to an + ever-growing memory allocation, until every possible user agent + string has been seen. + """ + + def __init__(self, maxsize: int): + self.maxsize = maxsize + self.cache: Dict[str, PartialParseResult] = {} + + def __getitem__(self, key: str) -> Optional[PartialParseResult]: + return self.cache.get(key) + + def __setitem__(self, key: str, value: PartialParseResult): + if key not in self.cache and len(self.cache) >= self.maxsize: + self.cache.clear() + + self.cache[key] = value + + +class LRU(Cache): + """Cache following a least-recently used replacement policy: when + there is no more room in the cache, whichever entry was last seen + the least recently is removed. + + Note that the cache size is adjusted after inserting the new + entry, so the cache will temporarily contain ``maxsize + 1`` + items. + """ + + def __init__(self, maxsize: int): + self.maxsize = maxsize + self.cache: OrderedDict[str, PartialParseResult] = OrderedDict() + + def __getitem__(self, key: str) -> Optional[PartialParseResult]: + e = self.cache.get(key) + if e: + self.cache.move_to_end(key) + return e + + def __setitem__(self, key: str, value: PartialParseResult): + self.cache[key] = value + self.cache.move_to_end(key) + while len(self.cache) > self.maxsize: + self.cache.popitem(last=False) + + +Lock = Callable[[], ContextManager] + + +class CachingParser(Parser): + """A wrapping parser which takes an underlying concrete :class:`Cache` + for the actual caching and cache strategy. + + The :class:`CachingParser` only interacts with the :class:`Cache` + and delegates to the wrapped parser in case of lookup failure. + + :class:`CachingParser` will set entries back in the cache when + filling them up, it does not update results in place (and can't + really, they're immutable). + """ + + def __init__(self, parser: Parser, cache: Cache, lock: Lock = threading.Lock): + self.parser: Parser = parser + self.cache: Cache = cache + self.lock: ContextManager = lock() + + def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: + with self.lock: + entry = self.cache[ua] + if entry: + if domains in entry.domains: + return entry + + domains &= ~entry.domains + + r = self.parser(ua, domains) + if entry: + r = PartialParseResult( + string=ua, + domains=entry.domains | r.domains, + user_agent=entry.user_agent or r.user_agent, + os=entry.os or r.os, + device=entry.device or r.device, + ) + with self.lock: + self.cache[ua] = r + return r diff --git a/src/ua_parser/core.py b/src/ua_parser/core.py new file mode 100644 index 00000000..885bf45d --- /dev/null +++ b/src/ua_parser/core.py @@ -0,0 +1,365 @@ +import abc +import re +from dataclasses import dataclass, fields +from enum import Flag, auto +from typing import Literal, Optional, Tuple, List + +__all__ = [ + "DefaultedParseResult", + "Device", + "DeviceMatcher", + "Domain", + "Matchers", + "OS", + "OSMatcher", + "ParseResult", + "Parser", + "PartialParseResult", + "UserAgent", + "UserAgentMatcher", +] + + +@dataclass(frozen=True) +class UserAgent: + """Browser ("user agent" aka the software responsible for the request) + information parsed from the user agent string. + """ + + family: str = "Other" + major: Optional[str] = None + minor: Optional[str] = None + patch: Optional[str] = None + patch_minor: Optional[str] = None + + +@dataclass(frozen=True) +class OS: + """OS information parsed from the user agent string.""" + + family: str = "Other" + major: Optional[str] = None + minor: Optional[str] = None + patch: Optional[str] = None + patch_minor: Optional[str] = None + + +@dataclass(frozen=True) +class Device: + """Device information parsed from the user agent string.""" + + family: str = "Other" + brand: Optional[str] = None + model: Optional[str] = None + + +class Domain(Flag): + """Hint for selecting which domains are requested when asking for a + :class:`ParseResult`. + """ + + #: browser (user agent) domain + USER_AGENT = auto() + #: os domain + OS = auto() + #: device domain + DEVICE = auto() + #: shortcut for all three domains + ALL = USER_AGENT | OS | DEVICE + + +@dataclass(frozen=True) +class DefaultedParseResult: + """Variant of :class:`.ParseResult` where attributes are set + to a default value if the parse fails. + + For all domains, the default value has ``family`` set to + ``"Other"`` and every other attribute set to ``None``. + """ + + user_agent: UserAgent + os: OS + device: Device + string: str + + +@dataclass(frozen=True) +class ParseResult: + """Complete parser result. + + For each attribute (and domain), either the parse was a success (a + match was found) and the corresponding data is set, or it was a + failure and the value is `None`. + """ + + user_agent: Optional[UserAgent] + os: Optional[OS] + device: Optional[Device] + string: str + + def with_defaults(self): + return DefaultedParseResult( + user_agent=self.user_agent or UserAgent(), + os=self.os or OS(), + device=self.device or Device(), + string=self.string, + ) + + +@dataclass(frozen=True) +class PartialParseResult: + """Potentially partial (incomplete) parser result. + + Domain fields (``user_agent``, ``os``, and ``device``) can be: + + - unset if not parsed yet + - set to a parsing failure + - set to a parsing success + + The `domains` flags specify which is which: if a `Domain` + flag is set, the corresponding attribute was looked up and is + either ``None`` for a parsing failure (no match was found) or a + value for a parsing success. + + If the flag is unset, the field has not been looked up yet. + """ + + domains: Domain + user_agent: Optional[UserAgent] + os: Optional[OS] + device: Optional[Device] + string: str + + def complete(self) -> ParseResult: + """Requires that the result be fully resolved (every attribute is set, + even if to a lookup failure). + + Replaces lookup failures by default values. + """ + if self.domains != Domain.ALL: + raise ValueError("Only a result with all attributes set can be completed") + + return ParseResult( + user_agent=self.user_agent, + os=self.os, + device=self.device, + string=self.string, + ) + + +class Parser(abc.ABC): + @abc.abstractmethod + def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: + """Parses the ``ua`` string, returning a parse result with *at least* + the requested :class:`domains ` resolved (whether to success or + failure). + + A parser may resolve more :class:`domains ` than + requested, but it *must not* resolve less. + """ + ... + + def parse(self, ua: str) -> ParseResult: + """Convenience method for parsing all domains, and falling back to + default values for all failures. + """ + return self(ua, Domain.ALL).complete() + + def parse_user_agent(self, ua: str) -> Optional[UserAgent]: + """Convenience method for parsing the :class:`UserAgent` domain, + falling back to the default value in case of failure. + """ + return self(ua, Domain.USER_AGENT).user_agent + + def parse_os(self, ua: str) -> Optional[OS]: + """Convenience method for parsing the :class:`OS` domain, falling back + to the default value in case of failure. + """ + return self(ua, Domain.OS).os + + def parse_device(self, ua: str) -> Optional[Device]: + """Convenience method for parsing the :class:`Device` domain, falling + back to the default value in case of failure. + """ + return self(ua, Domain.DEVICE).device + + +def _get(m: re.Match, idx: int) -> Optional[str]: + return (m[idx] or None) if 0 < idx <= m.re.groups else None + + +def _replacer(repl: str, m: re.Match) -> Optional[str]: + """The replacement rules are frustratingly subtle and innimical to + standard fallbacks: + + - if there is a non-null replacement pattern, then it must be used with + match groups as template parameters (at indices 1+) + - the result is stripped + - if it is an empty string, then it's replaced by a null + - otherwise fallback to a (possibly optional) match group + - or null (device brand has no fallback) + + Replacement rules only apply to OS and Device matchers, the UA + matcher has bespoke replacement semantics for the family (just + $1), and no replacement for the other fields, either there is a + static replacement or it falls back to the corresponding + (optional) match group. + + """ + if not repl: + return None + + return re.sub(r"\$(\d)", lambda n: _get(m, int(n[1])) or "", repl).strip() or None + + +class UserAgentMatcher: + regex: re.Pattern + family: str + major: Optional[str] + minor: Optional[str] + patch: Optional[str] + patch_minor: Optional[str] + + def __init__( + self, + regex: str, + family: Optional[str] = None, + major: Optional[str] = None, + minor: Optional[str] = None, + patch: Optional[str] = None, + patch_minor: Optional[str] = None, + ) -> None: + self.regex = re.compile(regex) + self.family = family or "$1" + self.major = major + self.minor = minor + self.patch = patch + self.patch_minor = patch_minor + + def __call__(self, ua: str) -> Optional[UserAgent]: + if m := self.regex.search(ua): + return UserAgent( + family=( + self.family.replace("$1", m[1]) + if "$1" in self.family + else self.family + ), + major=self.major or _get(m, 2), + minor=self.minor or _get(m, 3), + patch=self.patch or _get(m, 4), + patch_minor=self.patch_minor or _get(m, 5), + ) + return None + + def __repr__(self): + fields = [ + ("family", self.family if self.family != "$1" else None), + ("major", self.major), + ("minor", self.minor), + ("patch", self.patch), + ("patch_minor", self.patch_minor), + ] + args = "".join(f", {k}={v!r}" for k, v in fields if v is not None) + + return f"UserAgentMatcher({self.regex.pattern!r}{args})" + + +class OSMatcher: + regex: re.Pattern + family: str + major: str + minor: str + patch: str + patch_minor: str + + def __init__( + self, + regex: str, + family: Optional[str] = None, + major: Optional[str] = None, + minor: Optional[str] = None, + patch: Optional[str] = None, + patch_minor: Optional[str] = None, + ) -> None: + self.regex = re.compile(regex) + self.family = family or "$1" + self.major = major or "$2" + self.minor = minor or "$3" + self.patch = patch or "$4" + self.patch_minor = patch_minor or "$5" + + def __call__(self, ua: str) -> Optional[OS]: + if m := self.regex.search(ua): + family = _replacer(self.family, m) + if family is None: + raise ValueError(f"Unable to find OS family in {ua}") + return OS( + family=family, + major=_replacer(self.major, m), + minor=_replacer(self.minor, m), + patch=_replacer(self.patch, m), + patch_minor=_replacer(self.patch_minor, m), + ) + return None + + def __repr__(self): + fields = [ + ("family", self.family if self.family != "$1" else None), + ("major", self.major if self.major != "$2" else None), + ("minor", self.minor if self.minor != "$3" else None), + ("patch", self.patch if self.patch != "$4" else None), + ("patch_minor", self.patch_minor if self.patch_minor != "$5" else None), + ] + args = "".join(f", {k}={v!r}" for k, v in fields if v is not None) + + return f"OSMatcher({self.regex.pattern!r}{args})" + + +class DeviceMatcher: + regex: re.Pattern + family: str + brand: str + model: str + + def __init__( + self, + regex: str, + regex_flag: Optional[Literal["i"]] = None, + family: Optional[str] = None, + brand: Optional[str] = None, + model: Optional[str] = None, + ) -> None: + self.regex = re.compile(regex, flags=re.IGNORECASE if regex_flag == "i" else 0) + self.family = family or "$1" + self.brand = brand or "" + self.model = model or "$1" + + def __call__(self, ua: str) -> Optional[Device]: + if m := self.regex.search(ua): + family = _replacer(self.family, m) + if family is None: + raise ValueError(f"Unable to find device family in {ua}") + return Device( + family=family, + brand=_replacer(self.brand, m), + model=_replacer(self.model, m), + ) + return None + + def __repr__(self): + fields = [ + ("family", self.family if self.family != "$1" else None), + ("brand", self.brand or None), + ("model", self.model if self.model != "$1" else None), + ] + iflag = ', "i"' if self.regex.flags & re.IGNORECASE else "" + args = iflag + "".join(f", {k}={v!r}" for k, v in fields if v is not None) + + return f"DeviceMatcher({self.regex.pattern!r}{args})" + + +Matchers = Tuple[ + List[UserAgentMatcher], + List[OSMatcher], + List[DeviceMatcher], +] diff --git a/src/ua_parser/loaders.py b/src/ua_parser/loaders.py new file mode 100644 index 00000000..23510eeb --- /dev/null +++ b/src/ua_parser/loaders.py @@ -0,0 +1,139 @@ +__all__ = [ + "load_builtins", + "load_data", + "load_yaml", + "MatchersData", + "UserAgentDict", + "OSDict", + "DeviceDict", +] + +import io +import json +import os +from typing import Callable, List, Optional, Tuple, Type, Union, TypedDict, Literal + +from .core import Matchers, UserAgentMatcher, OSMatcher, DeviceMatcher + +PathOrFile = Union[str, os.PathLike, io.IOBase] +load: Optional[Callable] +SafeLoader: Optional[Type] +try: + from yaml import load, CSafeLoader as SafeLoader +except ImportError: + try: + from yaml import load, SafeLoader + except ImportError: + load = SafeLoader = None + + +def load_builtins() -> Matchers: + from ._matchers import MATCHERS + + return MATCHERS + + +# superclass needed to mix required & optional typed dict entries +# before 3.11 (and Required/NotRequired) +class _RegexDict(TypedDict): + regex: str + + +class UserAgentDict(_RegexDict, total=False): + family_replacement: str + v1_replacement: str + v2_replacement: str + v3_replacement: str + v4_replacement: str + + +class OSDict(_RegexDict, total=False): + os_replacement: str + os_v1_replacement: str + os_v2_replacement: str + os_v3_replacement: str + os_v4_replacement: str + + +class DeviceDict(_RegexDict, total=False): + regex_flag: Literal["i"] + device_replacement: str + brand_replacement: str + model_replacement: str + + +MatchersData = Tuple[List[UserAgentDict], List[OSDict], List[DeviceDict]] + + +def load_data(d: MatchersData) -> Matchers: + return ( + [ + UserAgentMatcher( + p["regex"], + p.get("family_replacement"), + p.get("v1_replacement"), + p.get("v2_replacement"), + p.get("v3_replacement"), + p.get("v4_replacement"), + ) + for p in d[0] + ], + [ + OSMatcher( + p["regex"], + p.get("os_replacement"), + p.get("os_v1_replacement"), + p.get("os_v2_replacement"), + p.get("os_v3_replacement"), + p.get("os_v4_replacement"), + ) + for p in d[1] + ], + [ + DeviceMatcher( + p["regex"], + p.get("regex_flag"), + p.get("device_replacement"), + p.get("brand_replacement"), + p.get("model_replacement"), + ) + for p in d[2] + ], + ) + + +def load_json(f: PathOrFile) -> Matchers: + if isinstance(f, (str, os.PathLike)): + with open(f) as fp: + regexes = json.load(fp) + else: + regexes = json.load(f) + + return load_data( + ( + regexes["user_agent_parsers"], + regexes["os_parsers"], + regexes["device_parsers"], + ) + ) + + +load_yaml: Optional[Callable[[PathOrFile], Matchers]] +if load is None: + load_yaml = None +else: + + def load_yaml(path: PathOrFile) -> Matchers: + if isinstance(path, (str, os.PathLike)): + with open(path) as fp: + regexes = load(fp, Loader=SafeLoader) # type: ignore + else: + regexes = load(path, Loader=SafeLoader) # type: ignore + + return load_data( + ( + regexes["user_agent_parsers"], + regexes["os_parsers"], + regexes["device_parsers"], + ) + ) diff --git a/tests/test_caches.py b/tests/test_caches.py new file mode 100644 index 00000000..3fb078c9 --- /dev/null +++ b/tests/test_caches.py @@ -0,0 +1,98 @@ +from collections import OrderedDict + +from ua_parser import ( + BasicParser, + PartialParseResult, + Domain, + UserAgent, + OS, + Device, + CachingParser, + Clearing, + LRU, + UserAgentMatcher, + OSMatcher, + DeviceMatcher, +) + + +def test_clearing(): + """Tests that the cache correctly gets cleared to make room for new + entries. + """ + cache = Clearing(2) + p = CachingParser(BasicParser(([], [], [])), cache) + + p.parse("a") + p.parse("b") + + assert cache.cache == { + "a": PartialParseResult(Domain.ALL, None, None, None, "a"), + "b": PartialParseResult(Domain.ALL, None, None, None, "b"), + } + + p.parse("c") + assert cache.cache == { + "c": PartialParseResult(Domain.ALL, None, None, None, "c"), + } + + +def test_lru(): + """Tests that the cache entries do get moved when accessed, and are + popped LRU-first. + """ + cache = LRU(2) + p = CachingParser(BasicParser(([], [], [])), cache) + + p.parse("a") + p.parse("b") + + assert cache.cache == OrderedDict( + [ + ("a", PartialParseResult(Domain.ALL, None, None, None, "a")), + ("b", PartialParseResult(Domain.ALL, None, None, None, "b")), + ] + ) + + p.parse("a") + p.parse("c") + assert cache.cache == OrderedDict( + [ + ("a", PartialParseResult(Domain.ALL, None, None, None, "a")), + ("c", PartialParseResult(Domain.ALL, None, None, None, "c")), + ] + ) + + +def test_backfill(): + """Tests that caches handle partial parsing correctly, by updating the + existing entry when new parts get parsed. + """ + cache = Clearing(2) + p = CachingParser( + BasicParser( + ( + [UserAgentMatcher("(a)")], + [OSMatcher("(a)")], + [DeviceMatcher("(a)")], + ) + ), + cache, + ) + + p.parse_user_agent("a") + assert cache.cache == { + "a": PartialParseResult(Domain.USER_AGENT, UserAgent("a"), None, None, "a"), + } + p("a", Domain.OS) + assert cache.cache == { + "a": PartialParseResult( + Domain.USER_AGENT | Domain.OS, UserAgent("a"), OS("a"), None, "a" + ), + } + p.parse("a") + assert cache.cache == { + "a": PartialParseResult( + Domain.ALL, UserAgent("a"), OS("a"), Device("a", None, "a"), "a" + ), + } diff --git a/tests/test_core.py b/tests/test_core.py new file mode 100644 index 00000000..af03667b --- /dev/null +++ b/tests/test_core.py @@ -0,0 +1,146 @@ +"""Tests UAP-Python using the UAP-core test suite +""" + +import contextlib +import dataclasses +import logging +import pathlib +import platform +from operator import attrgetter + +import pytest # type: ignore + +if platform.python_implementation() == "PyPy": + from yaml import load, SafeLoader +else: + try: + from yaml import load, CSafeLoader as SafeLoader # type: ignore + except ImportError: + logging.getLogger(__name__).warning( + "PyYaml C extension not available to run tests, this will result " + "in dramatic tests slowdown." + ) + from yaml import load, SafeLoader + +from ua_parser import ( + BasicParser, + UserAgent, + OS, + Device, + ParseResult, + UserAgentMatcher, + load_builtins, + caching, +) + +CORE_DIR = (pathlib.Path(__name__).parent.parent / "uap-core").resolve() + + +PARSERS = [ + pytest.param(BasicParser(load_builtins()), id="basic"), + pytest.param( + caching.CachingParser( + BasicParser(load_builtins()), + caching.Clearing(10), + ), + id="clearing", + ), + pytest.param( + caching.CachingParser( + BasicParser(load_builtins()), + caching.LRU(10), + lock=contextlib.nullcontext, + ), + id="lru", + ), +] + + +UA_FIELDS = {f.name for f in dataclasses.fields(UserAgent)} + + +@pytest.mark.parametrize("parser", PARSERS) +@pytest.mark.parametrize( + "test_file", + [ + CORE_DIR / "tests" / "test_ua.yaml", + CORE_DIR / "test_resources" / "firefox_user_agent_strings.yaml", + CORE_DIR / "test_resources" / "pgts_browser_list.yaml", + ], + ids=attrgetter("name"), +) +def test_ua(parser, test_file): + with test_file.open("rb") as f: + contents = load(f, Loader=SafeLoader) + + for test_case in contents["test_cases"]: + res = {k: v for k, v in test_case.items() if k in UA_FIELDS} + # there seems to be broken test cases which have a patch_minor + # of null where it's not, as well as the reverse, so we can't + # test patch_minor (ua-parser/uap-core#562) + res.pop("patch_minor", None) + r = parser.parse_user_agent(test_case["user_agent_string"]) or UserAgent() + assert dataclasses.asdict(r).items() >= res.items() + + +OS_FIELDS = {f.name for f in dataclasses.fields(OS)} + + +@pytest.mark.parametrize("parser", PARSERS) +@pytest.mark.parametrize( + "test_file", + [ + CORE_DIR / "tests" / "test_os.yaml", + CORE_DIR / "test_resources" / "additional_os_tests.yaml", + ], + ids=attrgetter("name"), +) +def test_os(parser, test_file): + with test_file.open("rb") as f: + contents = load(f, Loader=SafeLoader) + + for test_case in contents["test_cases"]: + res = {k: v for k, v in test_case.items() if k in OS_FIELDS} + r = parser.parse_os(test_case["user_agent_string"]) or OS() + assert dataclasses.asdict(r) == res + + +DEVICE_FIELDS = {f.name for f in dataclasses.fields(Device)} + + +@pytest.mark.parametrize("parser", PARSERS) +@pytest.mark.parametrize( + "test_file", + [ + CORE_DIR / "tests" / "test_device.yaml", + ], + ids=attrgetter("name"), +) +def test_devices(parser, test_file): + with test_file.open("rb") as f: + contents = load(f, Loader=SafeLoader) + + for test_case in contents["test_cases"]: + res = {k: v for k, v in test_case.items() if k in DEVICE_FIELDS} + r = parser.parse_device(test_case["user_agent_string"]) or Device() + assert dataclasses.asdict(r) == res + + +def test_results(): + p = BasicParser(([UserAgentMatcher("(x)")], [], [])) + + assert p.parse_user_agent("x") == UserAgent("x") + assert p.parse_user_agent("y") is None + + assert p.parse("x") == ParseResult( + user_agent=UserAgent("x"), + os=None, + device=None, + string="x", + ) + assert p.parse("y") == ParseResult( + user_agent=None, + os=None, + device=None, + string="y", + ) diff --git a/tests/test_legacy.py b/tests/test_legacy.py index 03feeda3..7ada17c5 100644 --- a/tests/test_legacy.py +++ b/tests/test_legacy.py @@ -1,17 +1,15 @@ import logging import os import platform -import sys -import warnings -import pytest +import pytest # type: ignore import yaml if platform.python_implementation() == "PyPy": from yaml import SafeLoader else: try: - from yaml import CSafeLoader as SafeLoader + from yaml import CSafeLoader as SafeLoader # type: ignore except ImportError: logging.getLogger(__name__).warning( "PyYaml C extension not available to run tests, this will result " diff --git a/tests/test_parsers_basics.py b/tests/test_parsers_basics.py new file mode 100644 index 00000000..af38c7e7 --- /dev/null +++ b/tests/test_parsers_basics.py @@ -0,0 +1,78 @@ +import io +from ua_parser import ( + BasicParser, + PartialParseResult, + Domain, + UserAgent, + load_yaml, + UserAgentMatcher, +) + + +def test_trivial_matching(): + p = BasicParser(([UserAgentMatcher("(a)")], [], [])) + + assert p("x", Domain.ALL) == PartialParseResult( + string="x", + domains=Domain.ALL, + user_agent=None, + os=None, + device=None, + ) + + assert p("a", Domain.ALL) == PartialParseResult( + string="a", + domains=Domain.ALL, + user_agent=UserAgent("a"), + os=None, + device=None, + ) + + +def test_partial(): + p = BasicParser(([UserAgentMatcher("(a)")], [], [])) + + assert p("x", Domain.USER_AGENT) == PartialParseResult( + string="x", + domains=Domain.USER_AGENT, + user_agent=None, + os=None, + device=None, + ) + + assert p("a", Domain.USER_AGENT) == PartialParseResult( + string="a", + domains=Domain.USER_AGENT, + user_agent=UserAgent("a"), + os=None, + device=None, + ) + + +def test_init_yaml(): + assert load_yaml + f = io.BytesIO( + b"""\ +user_agent_parsers: +- regex: (a) +os_parsers: [] +device_parsers: [] +""" + ) + p = BasicParser(load_yaml(f)) + + assert p("x", Domain.USER_AGENT) == PartialParseResult( + string="x", + domains=Domain.USER_AGENT, + user_agent=None, + os=None, + device=None, + ) + + assert p("a", Domain.USER_AGENT) == PartialParseResult( + string="a", + domains=Domain.USER_AGENT, + user_agent=UserAgent("a"), + os=None, + device=None, + ) diff --git a/tox.ini b/tox.ini index 57b17f90..36ac52da 100644 --- a/tox.ini +++ b/tox.ini @@ -2,12 +2,12 @@ min_version = 4.0 env_list = py3{8,9,10,11,12} pypy3.{8,9,10} - flake8, black + flake8, black, typecheck labels = test = py3{8,9,10,11,12},pypy3.{8,9,10} cpy = py3{8,9,10,11,12} pypy = pypy3.{8,9,10} - check = flake8, black + check = flake8, black, typecheck [testenv] # wheel install @@ -30,7 +30,14 @@ commands = flake8 {posargs} [testenv:black] package = skip deps = black -commands = black --check --diff . +commands = black --check --diff {posargs:.} + +[testenv:typecheck] +package = skip +deps = + mypy + types-PyYaml +commands = mypy --check-untyped-defs --no-implicit-optional {posargs:src tests} [flake8] max_line_length = 88