Skip to content

Commit

Permalink
Do minor code cleanup suggested by pylint
Browse files Browse the repository at this point in the history
  • Loading branch information
linknum23 committed Jan 8, 2025
1 parent 5551650 commit 501453e
Showing 1 changed file with 50 additions and 46 deletions.
96 changes: 50 additions & 46 deletions pyamplipi/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,27 @@
import asyncio
import sys
import os
import json
import yaml
import datetime
from dotenv import load_dotenv
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter, Namespace, Action, ArgumentError
from typing import Optional, List, Callable, Sequence, Dict, Any
from typing import Optional, List, Callable, Sequence, Dict, Any, Union, Type
from textwrap import indent
import json
import yaml
from pydantic import BaseModel
from pydantic.fields import ModelField
from pydantic.main import ModelMetaclass
from dotenv import load_dotenv
from aiohttp import TCPConnector, ClientSession
from aiohttp.client_exceptions import ServerDisconnectedError
from tabulate import tabulate
from textwrap import indent
import validators
from .models import Status, Config, Info, Source, Zone, Group, Stream, Preset, Announcement, \
SourceUpdate, ZoneUpdate, MultiZoneUpdate, GroupUpdate, StreamUpdate, PresetUpdate
from .amplipi import AmpliPi
from .error import APIError

# f-strings are too awesome not to use them in logging
# pylint: disable=logging-fstring-interpolation

# constants
log = logging.getLogger(__name__) # central logging channel
Expand All @@ -30,12 +32,14 @@


# text formatters
# underline text for emphasis
def em(msg: str) -> str: return f'\033[4m{msg}\033[0m'
def em(msg: str) -> str:
"""Underline text for emphasis"""
return f'\033[4m{msg}\033[0m'


def table(d, h) -> str: return indent(tabulate(d, h,
tablefmt='rounded_outline'), ' ') # make a nice indented table
def table(d, h) -> str:
"""Make a nice indented table"""
return indent(tabulate(d, h, tablefmt='rounded_outline'), ' ')


# simple list json for List[BaseModel] constructs
Expand All @@ -62,15 +66,15 @@ def list_sources(sources: List[Source]):
print(em(f"Sources[{len(sources)}]"))
headers = ["ID", "Name", "Input", "Info", "State"]
data = [[s.id, s.name, s.input, s.info.name if s.info else None,
s.info.state if s.info else None] for s in sources]
s.info.state if s.info else None] for s in sources]
print(table(data, headers))


def list_zones(zones: List[Zone]):
print(em(f"Zones[{len(zones)}]"))
headers = ["ID", "Name", "Volume (dB)", "Volume%", "Range (dB)", "Muted"]
data = [[z.id, z.name, z.vol, z.vol_f,
f"{z.vol_min}..{z.vol_max}", z.mute] for z in zones]
f"{z.vol_min}..{z.vol_max}", z.mute] for z in zones]
print(table(data, headers))


Expand Down Expand Up @@ -158,15 +162,15 @@ async def do_status_list(args: Namespace, amplipi: AmpliPi, shell: bool, **kwarg


async def do_status_get(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):
""" Gets Status json represenatation
""" Gets Status json representation
"""
log.debug("status.get()")
status: Status = await amplipi.get_status()
write_out(status.json(**json_ser_kwargs), args.outfile)


async def do_config_load(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):
""" Sets Config json represenatation
""" Sets Config json representation
"""
log.debug(f"config.load(«stdin») forced = {args.force}")
# Be sure to consume stdin before entering interactive dialogue
Expand Down Expand Up @@ -219,7 +223,7 @@ async def do_system_shutdown(args: Namespace, amplipi: AmpliPi, shell: bool, **k


async def do_info_get(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):
""" Gets Status-Info json represenatation
""" Gets Status-Info json representation
"""
log.debug("status.info()")
info: Info = await amplipi.get_info()
Expand All @@ -236,7 +240,7 @@ async def do_source_list(args: Namespace, amplipi: AmpliPi, shell: bool, **kwarg


async def do_source_get(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):
""" Gets Source(id) json represenatation by source_id
""" Gets Source(id) json representation by source_id
"""
log.debug(f"source.get({args.sourceid})")
assert 0 <= args.sourceid <= 3, "source id must be in range 0..3"
Expand All @@ -245,7 +249,7 @@ async def do_source_get(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs


async def do_source_getall(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):
""" Gets Sources json represenatation
""" Gets Sources json representation
"""
log.debug("source.getall()")
sources: List[Source] = await amplipi.get_sources()
Expand All @@ -259,9 +263,9 @@ async def do_source_set(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs
f"source.set({args.sourceid}, input={args.input if args.input is not None else '«stdin»'})")
assert 0 <= args.sourceid <= 3, "source id must be in range 0..3"

def validate(input: dict):
log.debug(f"validating source_update kwargs: {input}")
assert any(input.values()), "no actual source values to be set"
def validate(_input: dict):
log.debug(f"validating source_update kwargs: {_input}")
assert any(_input.values()), "no actual source values to be set"
src_update: SourceUpdate = instantiate_model(
SourceUpdate, args.infile, args.input, validate)
# ignoring status return value
Expand All @@ -288,7 +292,7 @@ async def do_zone_list(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs)


async def do_zone_get(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):
""" Gets Zone json represenatation by zone_id
""" Gets Zone json representation by zone_id
"""
log.debug(f"zone.get({args.zoneid})")
assert 0 <= args.zoneid <= 35, "zone id must be in range 0..35"
Expand All @@ -297,7 +301,7 @@ async def do_zone_get(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):


async def do_zone_getall(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):
""" Gets Zones json represenatation
""" Gets Zones json representation
"""
log.debug("zone.getall()")
zones: List[Zone] = await amplipi.get_zones()
Expand All @@ -311,9 +315,9 @@ async def do_zone_set(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):
f"zone.set({args.zoneid}, input={args.input if args.input is not None else '«stdin»'})")
assert 0 <= args.zoneid <= 35, "zone id must be in range 0..35"

def validate(input: dict):
log.debug(f"validating zone_update kwargs: {input}")
assert len(input.keys()) > 0, "no actual zone values to be set"
def validate(_input: dict):
log.debug(f"validating zone_update kwargs: {_input}")
assert len(_input.keys()) > 0, "no actual zone values to be set"
zone_update: ZoneUpdate = instantiate_model(
ZoneUpdate, args.infile, args.input, validate)
# ignoring status return value
Expand All @@ -339,7 +343,7 @@ async def do_group_list(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs


async def do_group_get(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):
""" Gets Group json represenatation by group_id
""" Gets Group json representation by group_id
"""
log.debug(f"group.get({args.groupid})")
assert 0 <= args.groupid, "group id must be > 0"
Expand All @@ -348,7 +352,7 @@ async def do_group_get(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs)


async def do_group_getall(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):
""" Gets Groups json represenatation
""" Gets Groups json representation
"""
log.debug("group.getall()")
groups: List[Group] = await amplipi.get_groups()
Expand All @@ -362,9 +366,9 @@ async def do_group_set(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs)
f"group.set({args.groupid}, input={args.input if args.input is not None else '«stdin»'})")
assert 0 <= args.groupid, "group id must be > 0"

def validate(input: dict):
log.debug(f"validating group_update kwargs: {input}")
assert len(input.keys()) > 0, "no actual group values to be set"
def validate(_input: dict):
log.debug(f"validating group_update kwargs: {_input}")
assert len(_input.keys()) > 0, "no actual group values to be set"
group_update: GroupUpdate = instantiate_model(
GroupUpdate, args.infile, args.input, validate)
# ignoring status return value
Expand All @@ -377,11 +381,11 @@ async def do_group_new(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs)
log.debug(
f"group.create(input={args.input if args.input is not None else '«stdin»'})")

def validate(input: dict):
log.debug(f"validating group_update kwargs: {input}")
assert all((input.get('name'), input.get('zones'))
def validate(_input: dict):
log.debug(f"validating group_update kwargs: {_input}")
assert all((_input.get('name'), _input.get('zones'))
), "group needs a name and list of zones"
assert all([bool(0 <= zid <= 35) for zid in input['zones']]
assert all([bool(0 <= zid <= 35) for zid in _input['zones']]
), "zone ids must be in range 0..35"
group: Group = instantiate_model(Group, args.infile, args.input, validate)
await amplipi.create_group(group) # ignoring status return value
Expand All @@ -405,7 +409,7 @@ async def do_stream_list(args: Namespace, amplipi: AmpliPi, shell: bool, **kwarg


async def do_stream_get(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):
""" Gets Stream json represenatation by stream_id
""" Gets Stream json representation by stream_id
"""
log.debug(f"stream.get({args.streamid})")
assert 0 <= args.streamid, "stream id must be > 0"
Expand All @@ -414,7 +418,7 @@ async def do_stream_get(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs


async def do_stream_getall(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):
""" Gets Streams json represenatation
""" Gets Streams json representation
"""
log.debug("stream.getall()")
streams: List[Stream] = await amplipi.get_streams()
Expand All @@ -428,9 +432,9 @@ async def do_stream_set(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs
f"stream.set({args.streamid}, input={args.input if args.input is not None else '«stdin»'})")
assert 0 <= args.streamid, "stream id must be > 0"

def validate(input: dict):
log.debug(f"validating stream_update kwargs: {input}")
assert len(input.keys()) > 0, "no actual stream values to be set"
def validate(_input: dict):
log.debug(f"validating stream_update kwargs: {_input}")
assert len(_input.keys()) > 0, "no actual stream values to be set"
stream_update: StreamUpdate = instantiate_model(
StreamUpdate, args.infile, args.input, validate)
# ignoring status return value
Expand All @@ -443,9 +447,9 @@ async def do_stream_new(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs
log.debug(
f"stream.create(input={args.input if args.input is not None else '«stdin»'})")

def validate(input: dict):
log.debug(f"validating stream_update kwargs: {input}")
assert all((input.get('name'), input.get('type'))
def validate(_input: dict):
log.debug(f"validating stream_update kwargs: {_input}")
assert all((_input.get('name'), _input.get('type'))
), "stream needs a name and a type"
stream: Stream = instantiate_model(
Stream, args.infile, args.input, validate)
Expand Down Expand Up @@ -576,10 +580,10 @@ async def do_preset_load(args: Namespace, amplipi: AmpliPi, shell: bool, **kwarg
async def do_announce(args: Namespace, amplipi: AmpliPi, shell: bool, **kwargs):
""" Plays announcement
"""
def validate(input: dict):
log.debug(f"validating announcement kwargs: {input}")
assert validators.url(input['media']), "media_url must be a valid URL"
assert 'vol_f' not in input or 0.0 <= input['vol_f'] <= 1.0, "vol_f must be in range 0.0..1.0"
def validate(_input: dict):
log.debug(f"validating announcement kwargs: {_input}")
assert validators.url(_input['media']), "media_url must be a valid URL"
assert 'vol_f' not in _input or 0.0 <= _input['vol_f'] <= 1.0, "vol_f must be in range 0.0..1.0"

log.debug(
f"announce(input={args.input if args.input is not None else '«stdin»'})")
Expand Down Expand Up @@ -1179,8 +1183,8 @@ def make_amplipi(args: Namespace) -> AmpliPi:
return AmpliPi(endpoint, timeout=timeout, http_session=http_session)


# main script entrypoint
def main():
"""Main script entrypoint"""
exitcode = 0 # assuming all will be well
load_dotenv()
ap = get_arg_parser()
Expand Down

0 comments on commit 501453e

Please sign in to comment.