diff --git a/.gitignore b/.gitignore index 10ed083..65fd507 100644 --- a/.gitignore +++ b/.gitignore @@ -79,7 +79,6 @@ coverage.xml .scrapy # Sphinx documentation -docs/_build/ sphinx/_build/ # PyBuilder @@ -169,3 +168,4 @@ Temporary Items .nfs* .ruff_cache +.vscode \ No newline at end of file diff --git a/docs/.buildinfo b/docs/.buildinfo deleted file mode 100644 index f2fc22d..0000000 --- a/docs/.buildinfo +++ /dev/null @@ -1,4 +0,0 @@ -# Sphinx build info version 1 -# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. -config: fd5a3bc78e368d40015d5dc5d40a4f56 -tags: 645f666f9bcd5a90fca523b33c5a78b7 diff --git a/docs/.doctrees/environment.pickle b/docs/.doctrees/environment.pickle deleted file mode 100644 index 4d938e3..0000000 Binary files a/docs/.doctrees/environment.pickle and /dev/null differ diff --git a/docs/.doctrees/index.doctree b/docs/.doctrees/index.doctree deleted file mode 100644 index c38c9b5..0000000 Binary files a/docs/.doctrees/index.doctree and /dev/null differ diff --git a/docs/.nojekyll b/docs/.nojekyll deleted file mode 100644 index e69de29..0000000 diff --git a/docs/_modules/index.html b/docs/_modules/index.html deleted file mode 100644 index 40d5ebc..0000000 --- a/docs/_modules/index.html +++ /dev/null @@ -1,123 +0,0 @@ - - - - -
- - -
-# (c) 2022, Cantab Research Ltd.
-"""
-Wrapper library to interface with Speechmatics ASR batch v2 API.
-"""
-
-import json
-import logging
-import os
-from pathlib import Path
-import time
-from typing import Any, Dict, Iterable, List, Tuple, Union
-
-import httpx
-from polling2 import poll
-from tenacity import retry, retry_if_exception_type, stop_after_attempt
-
-from speechmatics.exceptions import JobNotFoundException, TranscriptionError
-from speechmatics.helpers import get_version
-from speechmatics.models import BatchTranscriptionConfig, ConnectionSettings, UsageMode
-
-LOGGER = logging.getLogger(__name__)
-
-# If the logging level is set to DEBUG then websockets logs very verbosely,
-# including a hex dump of every message being sent. Setting the websockets
-# logger at INFO level specifically prevents this spam.
-logging.getLogger("websockets.protocol").setLevel(logging.INFO)
-
-POLLING_DURATION = 15
-
-# This is a reasonable default for when multiple audio files are submitted for
-# transcription in one go, in submit_jobs.
-#
-# Customers are free to increase this to any kind of maximum they are
-# comfortable with, but bear in mind there will be rate-limitting at the API
-# end for over-use.
-CONCURRENCY_DEFAULT = 5
-CONCURRENCY_MAXIMUM = 50
-
-
-class _ForceMultipartDict(dict):
- """Creates a dictionary that evaluates to True, even if empty.
- Used in submit_job() to force proper multipart encoding when fetch_data is used.
- See https://github.com/encode/httpx/discussions/2399 (link to psf/requests#1081) for details.
- """
-
- def __bool__(self):
- return True
-
-
-[docs]class HttpClient(httpx.Client):
- """Wrapper class around httpx.Client that adds the sm-sdk query parameter to request urls"""
-
- def __init__(self, *args, **kwargs):
- self._from_cli = False
- if "from_cli" in kwargs:
- self._from_cli = kwargs["from_cli"]
- kwargs.pop("from_cli")
- super().__init__(*args, **kwargs)
-
-[docs] def build_request(self, method: str, url, *args, **kwargs):
- cli = "-cli" if self._from_cli is True else ""
- version = get_version()
- url = httpx.URL(url)
- url = url.copy_merge_params({"sm-sdk": f"python{cli}-{version}"})
- return super().build_request(method, url, *args, **kwargs)
-
-
-[docs]class BatchClient:
- """Client class for Speechmatics Batch ASR REST API.
-
- This client may be used directly but must be closed afterwards, e.g.::
-
- client = BatchClient(auth_token)
- client.connect()
- list_of_jobs = client.list_jobs()
- client.close()
-
- It may also be used as a context manager, which handles opening and
- closing the connection for you, e.g.::
-
- with BatchClient(settings) as client:
- list_of_jobs = client.list_jobs()
-
- """
-
- def __init__(
- self,
- connection_settings_or_auth_token: Union[str, ConnectionSettings, None] = None,
- from_cli=False,
- ):
- """
- Args:
- connection_settings_or_auth_token (Union[str, ConnectionSettings, None], optional)
- If `str`,, assumes auth_token passed and default URL being used
- If `None`, attempts using auth_token from config.
- Defaults to `None`
- from_clie (bool)
- """
- if not isinstance(connection_settings_or_auth_token, ConnectionSettings):
- self.connection_settings = ConnectionSettings.create(
- UsageMode.Batch, connection_settings_or_auth_token
- )
- else:
- self.connection_settings = connection_settings_or_auth_token
- self.connection_settings.set_missing_values_from_config(UsageMode.Batch)
- if self.connection_settings.url[-1] == "/":
- self.connection_settings.url = self.connection_settings.url[:-1]
- if not self.connection_settings.url.endswith("/v2"):
- self.connection_settings.url = "/".join(
- [self.connection_settings.url, "v2"]
- )
-
- self.connection_settings = self.connection_settings
- self.transcription_config = None
-
- self.default_headers = {
- "Authorization": f"Bearer {self.connection_settings.auth_token}",
- "Accept-Charset": "utf-8",
- }
- self.api_client = None
- self._from_cli = from_cli
-
-[docs] def connect(self):
- """Create a connection to a Speechmatics Transcription REST endpoint"""
- self.api_client = HttpClient(
- base_url=self.connection_settings.url,
- timeout=None,
- headers=self.default_headers,
- http2=True,
- verify=self.connection_settings.ssl_context,
- from_cli=self._from_cli,
- )
- return self
-
- def __enter__(self):
- return self.connect()
-
- def __exit__(self, exc_type, exc_value, traceback):
- # pylint: disable=redefined-outer-name
- self.close()
-
-[docs] def close(self) -> None:
- """
- Clean up/close client connection pool.
-
- This is required when using the client directly, but not required when
- using the client as a context manager.
-
- :rtype: None
- """
- self.api_client.close()
-
-[docs] def send_request(self, method: str, path: str, **kwargs) -> httpx.Response:
- """
- Send a request using httpx.Client()
-
- :param method: HTTP request method
- :type method: str
-
- :param path: Configuration for the transcription.
- :type path: str
-
- :param **kwargs: Any valid kwarg of httpx.Client
-
- :returns: httpx Response object
- :rtype: httpx.Response
-
- :raises httpx.HTTPError: When a request fails, raises an HTTPError
- """
-
- # pylint: disable=no-member
- @retry(
- stop=stop_after_attempt(2),
- retry=retry_if_exception_type(httpx.RemoteProtocolError),
- )
- def send():
- with self.api_client.stream(method, path, **kwargs) as response:
- response.read()
- response.raise_for_status()
- return response
-
- return send()
-
-[docs] def list_jobs(self) -> List[Dict[str, Any]]:
- """
- Lists last 100 jobs within 7 days associated with auth_token for the SaaS
- or all of the jobs for the batch appliance.
-
- :returns: List of jobs
- :rtype: List[Dict[str, Any]]
- """
- return self.send_request("GET", "jobs").json()["jobs"]
-
-[docs] def submit_job(
- self,
- audio: Union[Tuple[str, bytes], str, os.PathLike, None],
- transcription_config: Union[
- Dict[str, Any], BatchTranscriptionConfig, str, os.PathLike
- ],
- ) -> str:
- """
- Submits audio and config for transcription.
-
- :param audio: Audio file path or tuple of filename and bytes, or None if using fetch_url
- NOTE: You must expliticly pass audio=None if providing a fetch_url in the config
- :type audio: os.Pathlike | str | Tuple[str, bytes] | None
-
- :param transcription_config: Configuration for the transcription.
- :type transcription_config:
- Dict[str, Any] | speechmatics.models.BatchTranscriptionConfig | str
-
- :returns: Job ID
- :rtype: str
-
- :raises httpx.HTTPError: For any request errors, httpx exceptions are raised.
- """
-
- # Handle getting config into a dict
- if isinstance(transcription_config, (str or os.PathLike)):
- with Path(transcription_config).expanduser().open(
- mode="rt", encoding="utf-8"
- ) as file:
- config_dict = json.load(file)
- elif isinstance(transcription_config, BatchTranscriptionConfig):
- config_dict = json.loads(transcription_config.as_config())
- elif isinstance(transcription_config, dict):
- config_dict = transcription_config
- else:
- raise ValueError(
- """Job configuration must be a BatchTranscriptionConfig object,
- a filepath as a string or Path object, or a dict"""
- )
-
- # If audio=None, fetch_data must be specified
- file_object = None
- try:
- if audio and "fetch_data" in config_dict:
- raise ValueError("Only one of audio or fetch_data can be set at a time")
- if not audio and "fetch_data" in config_dict:
- audio_data = None
- elif isinstance(audio, (str, os.PathLike)):
- # httpx performance is better when using a file-like object
- # compared to passing the file contents as bytes.
- file_object = Path(audio).expanduser().open("rb")
- audio_data = os.path.basename(file_object.name), file_object
- elif isinstance(audio, tuple) and "fetch_data" not in config_dict:
- audio_data = audio
- else:
- raise ValueError(
- "Audio must be a filepath or a tuple of (filename, bytes)"
- )
-
- # httpx seems to expect an un-nested json, throws a type error otherwise.
- config_data = {"config": json.dumps(config_dict, ensure_ascii=False)}
-
- if audio_data:
- audio_file = {"data_file": audio_data}
- else:
- audio_file = _ForceMultipartDict()
-
- response = self.send_request(
- "POST", "jobs", data=config_data, files=audio_file
- )
- return response.json()["id"]
- finally:
- if file_object:
- file_object.close()
-
- def submit_jobs(
- self,
- audio_paths: Iterable[Union[str, os.PathLike]],
- transcription_config: Any,
- concurrency=CONCURRENCY_DEFAULT,
- ):
- if concurrency > CONCURRENCY_MAXIMUM:
- raise Exception(
- f"concurrency={concurrency} is too high, choose a value <= {CONCURRENCY_MAXIMUM}!"
- )
- pool = {}
-
- def wait():
- while True:
- for job_id in list(pool):
- path = pool[job_id]
- status = self.check_job_status(job_id)["job"]["status"]
- LOGGER.debug("%s for %s is %s", job_id, path, status)
- if status == "running":
- continue
- del pool[job_id]
- return path, job_id
- time.sleep(POLLING_DURATION)
-
- for audio_path in audio_paths:
- if len(pool) >= concurrency:
- yield wait()
- try:
- job_id = self.submit_job(audio_path, transcription_config)
- LOGGER.debug("%s submitted as job %s", audio_path, job_id)
- pool[job_id] = audio_path
- except httpx.HTTPStatusError as exc:
- LOGGER.warning("%s submit failed with %s", audio_path, exc)
-
- while pool:
- yield wait()
-
-[docs] def get_job_result(
- self,
- job_id: str,
- transcription_format: str = "json-v2",
- ) -> Union[bool, str, Dict[str, Any]]:
- """
- Request results of a transcription job.
-
- :param job_id: ID of previously submitted job.
- :type job_id: str
-
- :param transcription_format: Format of transcript. Defaults to json.
- Valid options are json-v2, txt, srt. json is accepted as an
- alias for json-v2.
- :type format: str
-
- :returns: False if job is still running or does not exist, or
- transcription in requested format
- :rtype: bool | str | Dict[str, Any]
-
- :raises JobNotFoundException : When a job_id is not found.
- :raises httpx.HTTPError: For any request other than 404, httpx exceptions are raised.
- :raises TranscriptionError: When the transcription format is invalid.
- """
- transcription_format = transcription_format.lower()
- if transcription_format not in ["json-v2", "json_v2", "json", "txt", "srt"]:
- raise TranscriptionError(
- 'Invalid transcription format. Valid formats are : "json-v2",'
- '"json_v2", "json", "txt", "srt "'
- )
-
- if transcription_format in ["json-v2", "json", "json_v2"]:
- transcription_format = "json-v2"
- try:
- response = self.send_request(
- "GET",
- "/".join(["jobs", job_id, "transcript"]),
- params={"format": transcription_format},
- )
- except httpx.HTTPStatusError as exc:
- if exc.response.status_code == 404:
- raise JobNotFoundException(f"Could not find job {job_id}") from exc
- raise exc
-
- if transcription_format == "json-v2":
- return response.json()
- return response.text
-
-[docs] def delete_job(self, job_id: str, force: bool = False) -> str:
- """
- Delete a job. Must pass force=True to cancel a running job.
-
- :param job_id: ID of previously submitted job.
- :type job_id: str
-
- :param force: When set, a running job will be force terminated. When
- unset (default), a running job will not be terminated and we will
- return False.
- :type format: bool
-
- :return: Deletion status
- :rtype: str
- """
-
- try:
- response = self.send_request(
- "DELETE",
- "/".join(["jobs", job_id]),
- params={"force": str(force).lower()},
- )
- return (
- f"Job {job_id} deleted"
- if (response.json())["job"]["status"] == "deleted"
- else f"Job {job_id} was not deleted. Error {response.json()}"
- )
- except httpx.HTTPStatusError as exc:
- if exc.response.status_code == 404:
- raise JobNotFoundException(f"Could not find job {job_id}") from exc
- raise exc
- except KeyError:
- return False
-
-[docs] def check_job_status(self, job_id: str) -> Dict[str, Any]:
- """
- Check the status of a job.
-
- :param job_id: ID of previously submitted job.
- :type job_id: str
-
- :return: Job status
- :rtype: Dict[str, Any]
-
- :raises JobNotFoundException: When a job_id is not found.
- :raises httpx.HTTPError: For any request other than 404, httpx exceptions are raised.
- """
- try:
- response = self.send_request("GET", "/".join(["jobs", job_id]))
- except httpx.HTTPStatusError as error:
- if error.response.status_code == 404:
- raise JobNotFoundException(f"Job {job_id} not found") from error
- raise error
- return response.json()
-
-[docs] def wait_for_completion(
- self, job_id: str, transcription_format: str = "txt"
- ) -> Union[str, Dict[str, Any]]:
- """
- Blocks until job is complete, returning a transcript in
- the requested format.
-
- :param job_id: ID of previously submitted job.
- :type job_id: str
-
- :param transcription_format: Format of transcript. Defaults to txt.
- Valid options are json-v2, txt, srt. json is accepted as an
- alias for json-v2.
- :type format: str
-
- :return: Transcription in requested format
- :rtype: Union[str, Dict[str, Any]]
-
- :raises JobNotFoundException : When a job_id is not found.
- :raises httpx.HTTPError: For any request other than 404, httpx exceptions are raised.
- """
-
- def _poll_for_status() -> bool:
- job_status = self.check_job_status(job_id)["job"]["status"]
- if job_status == "done":
- return True
- if job_status == "running":
- LOGGER.info(
- "Job ID %s still running, polling again in %s seconds.",
- job_id,
- POLLING_DURATION,
- )
- return False
- raise TranscriptionError(f"{job_id} status {job_status}")
-
- status = self.check_job_status(job_id)
-
- if status["job"]["status"] == "done":
- return self.get_job_result(job_id, transcription_format)
-
- min_rtf = 0.10
- duration = status["job"]["duration"]
- LOGGER.info(
- "Waiting %i sec to begin polling for completion.", round(duration * min_rtf)
- )
- # Wait until the min. processing time has passed before polling.
- time.sleep(duration * min_rtf)
-
- LOGGER.info("Starting poll.")
- poll(_poll_for_status, step=POLLING_DURATION, timeout=3600)
- return self.get_job_result(job_id, transcription_format)
-
-# (c) 2020, Cantab Research Ltd.
-"""
-Wrapper library to interface with Real-time ASR v2 API.
-Based on http://asyncio.readthedocs.io/en/latest/producer_consumer.html
-"""
-
-import asyncio
-import copy
-import json
-import logging
-import os
-from typing import Union
-from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
-
-import httpx
-import websockets
-
-from speechmatics.exceptions import (
- EndOfTranscriptException,
- ForceEndSession,
- TranscriptionError,
-)
-from speechmatics.helpers import get_version, json_utf8, read_in_chunks
-from speechmatics.models import (
- AudioSettings,
- ClientMessageType,
- ConnectionSettings,
- ServerMessageType,
- TranscriptionConfig,
- UsageMode,
-)
-
-LOGGER = logging.getLogger(__name__)
-
-# If the logging level is set to DEBUG then websockets logs very verbosely,
-# including a hex dump of every message being sent. Setting the websockets
-# logger at INFO level specifically prevents this spam.
-logging.getLogger("websockets.protocol").setLevel(logging.INFO)
-
-
-[docs]class WebsocketClient:
- """
- Manage a transcription session with the server.
-
- The best way to interact with this library is to instantiate this client
- and then add a set of handlers to it. Handlers respond to particular types
- of messages received from the server.
-
- :param connection_settings: Settings for the WebSocket connection,
- including the URL of the server.
- :type connection_settings: speechmatics.models.ConnectionSettings
- """
-
- # pylint: disable=too-many-instance-attributes
-
- def __init__(
- self,
- connection_settings_or_auth_token: Union[str, ConnectionSettings, None] = None,
- ):
- """
- Args:
- connection_settings_or_auth_token (Union[str, ConnectionSettings, None], optional): Defaults to None.
- If `str`,, assumes auth_token passed and default URL being used
- If `None`, attempts using auth_token from config.
- """
- if not isinstance(connection_settings_or_auth_token, ConnectionSettings):
- self.connection_settings = ConnectionSettings.create(
- UsageMode.RealTime, connection_settings_or_auth_token
- )
- else:
- self.connection_settings = connection_settings_or_auth_token
- self.connection_settings.set_missing_values_from_config(UsageMode.RealTime)
- self.websocket = None
- self.transcription_config = None
- self.translation_config = None
-
- self.event_handlers = {x: [] for x in ServerMessageType}
- self.middlewares = {x: [] for x in ClientMessageType}
-
- self.seq_no = 0
- self.session_running = False
- self._language_pack_info = None
- self._transcription_config_needs_update = False
- self._session_needs_closing = False
-
- # The following asyncio fields are fully instantiated in
- # _init_synchronization_primitives
- self._recognition_started = asyncio.Event
- # Semaphore used to ensure that we don't send too much audio data to
- # the server too quickly and burst any buffers downstream.
- self._buffer_semaphore = asyncio.BoundedSemaphore
-
- async def _init_synchronization_primitives(self):
- """
- Used to initialise synchronization primitives that require
- an event loop
- """
- self._recognition_started = asyncio.Event()
- self._buffer_semaphore = asyncio.BoundedSemaphore(
- self.connection_settings.message_buffer_size
- )
-
- def _flag_recognition_started(self):
- """
- Handle a
- :py:attr:`speechmatics.models.ClientMessageType.SetRecognitionConfig`
- message from the server.
- This updates an internal flag to mark the recognition session
- as started meaning, AddAudio is now allowed.
- """
- self._recognition_started.set()
-
- def _set_language_pack_info(self, language_pack_info: dict):
- """
- Update the `language_pack_info` which is a subset of information from the
- manifest in the language pack which we expose to end users via the
- RecognitionStarted message.
- """
- self._language_pack_info = language_pack_info
-
-[docs] def get_language_pack_info(self) -> dict:
- """
- Get the `language_pack_info` which is a subset of information from the
- manifest in the language pack which we expose to end users.
-
- Can be None if this field has not yet been set - i.e. if the RecognitionStarted
- message has not been received yet.
- """
- return self._language_pack_info
-
- @json_utf8
- def _set_recognition_config(self):
- """
- Constructs a
- :py:attr:`speechmatics.models.ClientMessageType.SetRecognitionConfig`
- message.
- """
- msg = {
- "message": ClientMessageType.SetRecognitionConfig,
- "transcription_config": self.transcription_config.as_config(),
- }
- if self.translation_config is not None:
- msg["translation_config"] = self.translation_config.asdict()
- self._call_middleware(ClientMessageType.SetRecognitionConfig, msg, False)
- return msg
-
- @json_utf8
- def _start_recognition(self, audio_settings):
- """
- Constructs a
- :py:attr:`speechmatics.models.ClientMessageType.StartRecognition`
- message.
- This initiates the recognition session.
-
- :param audio_settings: Audio settings to use.
- :type audio_settings: speechmatics.models.AudioSettings
- """
- msg = {
- "message": ClientMessageType.StartRecognition,
- "audio_format": audio_settings.asdict(),
- "transcription_config": self.transcription_config.as_config(),
- }
- if self.translation_config is not None:
- msg["translation_config"] = self.translation_config.asdict()
- self.session_running = True
- self._call_middleware(ClientMessageType.StartRecognition, msg, False)
- LOGGER.debug(msg)
- return msg
-
- @json_utf8
- def _end_of_stream(self):
- """
- Constructs an
- :py:attr:`speechmatics.models.ClientMessageType.EndOfStream`
- message.
- """
- msg = {"message": ClientMessageType.EndOfStream, "last_seq_no": self.seq_no}
- self._call_middleware(ClientMessageType.EndOfStream, msg, False)
- LOGGER.debug(msg)
- return msg
-
- def _consumer(self, message):
- """
- Consumes messages and acts on them.
-
- :param message: Message received from the server.
- :type message: str
-
- :raises TranscriptionError: on an error message received from the
- server after the Session started.
- :raises EndOfTranscriptException: on EndOfTranscription message.
- :raises ForceEndSession: If this was raised by the user's event
- handler.
- """
- LOGGER.debug(message)
- message = json.loads(message)
- message_type = message["message"]
-
- for handler in self.event_handlers[message_type]:
- try:
- handler(copy.deepcopy(message))
- except ForceEndSession:
- LOGGER.warning("Session was ended forcefully by an event handler")
- raise
-
- if message_type == ServerMessageType.RecognitionStarted:
- self._flag_recognition_started()
- if "language_pack_info" in message:
- self._set_language_pack_info(message["language_pack_info"])
- elif message_type == ServerMessageType.AudioAdded:
- self._buffer_semaphore.release()
- elif message_type == ServerMessageType.EndOfTranscript:
- raise EndOfTranscriptException()
- elif message_type == ServerMessageType.Warning:
- LOGGER.warning(message["reason"])
- elif message_type == ServerMessageType.Error:
- raise TranscriptionError(message["reason"])
-
- async def _producer(self, stream, audio_chunk_size):
- """
- Yields messages to send to the server.
-
- :param stream: File-like object which an audio stream can be read from.
- :type stream: io.IOBase
-
- :param audio_chunk_size: Size of audio chunks to send.
- :type audio_chunk_size: int
- """
- async for audio_chunk in read_in_chunks(stream, audio_chunk_size):
- if self._session_needs_closing:
- break
-
- if self._transcription_config_needs_update:
- yield self._set_recognition_config()
- self._transcription_config_needs_update = False
-
- await asyncio.wait_for(
- self._buffer_semaphore.acquire(),
- timeout=self.connection_settings.semaphore_timeout_seconds,
- )
- self.seq_no += 1
- self._call_middleware(ClientMessageType.AddAudio, audio_chunk, True)
- yield audio_chunk
-
- yield self._end_of_stream()
-
- async def _consumer_handler(self):
- """
- Controls the consumer loop for handling messages from the server.
-
- raises: ConnectionClosedError when the upstream closes unexpectedly
- """
- while self.session_running:
- try:
- message = await self.websocket.recv()
- except websockets.exceptions.ConnectionClosedOK:
- # Can occur if a timeout has closed the connection.
- LOGGER.info("Cannot receive from closed websocket.")
- return
- except websockets.exceptions.ConnectionClosedError as ex:
- LOGGER.info("Disconnected while waiting for recv().")
- raise ex
- self._consumer(message)
-
- async def _producer_handler(self, stream, audio_chunk_size):
- """
- Controls the producer loop for sending messages to the server.
- """
- await self._recognition_started.wait()
- async for message in self._producer(stream, audio_chunk_size):
- try:
- await self.websocket.send(message)
- except websockets.exceptions.ConnectionClosedOK:
- # Can occur if a timeout has closed the connection.
- LOGGER.info("Cannot send from a closed websocket.")
- return
- except websockets.exceptions.ConnectionClosedError:
- LOGGER.info("Disconnected while sending a message().")
- return
-
- def _call_middleware(self, event_name, *args):
- """
- Call the middlewares attached to the client for the given event name.
-
- :raises ForceEndSession: If this was raised by the user's middleware.
- """
- for middleware in self.middlewares[event_name]:
- try:
- middleware(*args)
- except ForceEndSession:
- LOGGER.warning("Session was ended forcefully by a middleware")
- raise
-
-[docs] def update_transcription_config(self, new_transcription_config):
- """
- Updates the transcription config used for the session.
- This results in a SetRecognitionConfig message sent to the server.
-
- :param new_transcription_config: The new config object.
- :type new_transcription_config: speechmatics.models.TranscriptionConfig
- """
- if new_transcription_config != self.transcription_config:
- self.transcription_config = new_transcription_config
- self._transcription_config_needs_update = True
-
-[docs] def add_event_handler(self, event_name, event_handler):
- """
- Add an event handler (callback function) to handle an incoming
- message from the server. Event handlers are passed a copy of the
- incoming message from the server. If `event_name` is set to 'all' then
- the handler will be added for every event.
-
- For example, a simple handler that just prints out the
- :py:attr:`speechmatics.models.ServerMessageType.AddTranscript`
- messages received:
-
- >>> client = WebsocketClient(
- ConnectionSettings(url="wss://localhost:9000"))
- >>> handler = lambda msg: print(msg)
- >>> client.add_event_handler(ServerMessageType.AddTranscript, handler)
-
- :param event_name: The name of the message for which a handler is
- being added. Refer to
- :py:class:`speechmatics.models.ServerMessageType` for a list
- of the possible message types.
- :type event_name: str
-
- :param event_handler: A function to be called when a message of the
- given type is received.
- :type event_handler: Callable[[dict], None]
-
- :raises ValueError: If the given event name is not valid.
- """
- if event_name == "all":
- for name in self.event_handlers.keys():
- self.event_handlers[name].append(event_handler)
- elif event_name not in self.event_handlers:
- raise ValueError(
- f"Unknown event name: {event_name!r}, expected to be "
- f"'all' or one of {list(self.event_handlers.keys())}."
- )
- else:
- self.event_handlers[event_name].append(event_handler)
-
-[docs] def add_middleware(self, event_name, middleware):
- """
- Add a middleware to handle outgoing messages sent to the server.
- Middlewares are passed a reference to the outgoing message, which
- they may alter.
- If `event_name` is set to 'all' then the handler will be added for
- every event.
-
- :param event_name: The name of the message for which a middleware is
- being added. Refer to the V2 API docs for a list of the possible
- message types.
- :type event_name: str
-
- :param middleware: A function to be called to process an outgoing
- message of the given type. The function receives the message as
- the first argument and a second, boolean argument indicating
- whether or not the message is binary data (which implies it is an
- AddAudio message).
- :type middleware: Callable[[dict, bool], None]
-
- :raises ValueError: If the given event name is not valid.
- """
- if event_name == "all":
- for name in self.middlewares.keys():
- self.middlewares[name].append(middleware)
- elif event_name not in self.middlewares:
- raise ValueError(
- (
- f"Unknown event name: {event_name}, expected to be 'all'"
- f"or one of {list(self.middlewares.keys())}."
- )
- )
- else:
- self.middlewares[event_name].append(middleware)
-
- async def _communicate(self, stream, audio_settings):
- """
- Create a producer/consumer for transcription messages and
- communicate with the server.
- Internal method called from _run.
- """
- try:
- start_recognition_msg = self._start_recognition(audio_settings)
- except ForceEndSession:
- return
- await self.websocket.send(start_recognition_msg)
-
- consumer_task = asyncio.create_task(self._consumer_handler())
- producer_task = asyncio.create_task(
- self._producer_handler(stream, audio_settings.chunk_size)
- )
- (done, pending) = await asyncio.wait(
- [consumer_task, producer_task], return_when=asyncio.FIRST_EXCEPTION
- )
-
- # If a task is pending the other one threw an exception, so tidy up
- for task in pending:
- task.cancel()
-
- for task in done:
- exc = task.exception()
- if exc and not isinstance(exc, (EndOfTranscriptException, ForceEndSession)):
- raise exc
-
-[docs] async def run(
- self,
- stream,
- transcription_config: TranscriptionConfig,
- audio_settings=AudioSettings(),
- from_cli=False,
- ):
- """
- Begin a new recognition session.
- This will run asynchronously. Most callers may prefer to use
- :py:meth:`run_synchronously` which will block until the session is
- finished.
-
- :param stream: File-like object which an audio stream can be read from.
- :type stream: io.IOBase
-
- :param transcription_config: Configuration for the transcription.
- :type transcription_config: speechmatics.models.TranscriptionConfig
-
- :param audio_settings: Configuration for the audio stream.
- :type audio_settings: speechmatics.models.AudioSettings
-
- :param from_cli: Indicates whether the caller is the command-line interface or not.
- :type from_cli: bool
-
- :raises Exception: Can raise any exception returned by the
- consumer/producer tasks.
- """
- self.transcription_config = transcription_config
- self.translation_config = transcription_config.translation_config
- self.seq_no = 0
- self._language_pack_info = None
- await self._init_synchronization_primitives()
- extra_headers = {}
- if (
- not self.connection_settings.generate_temp_token
- and self.connection_settings.auth_token is not None
- ):
- token = f"Bearer {self.connection_settings.auth_token}"
- extra_headers["Authorization"] = token
-
- if (
- self.connection_settings.generate_temp_token
- and self.connection_settings.auth_token is not None
- ):
- temp_token = await _get_temp_token(self.connection_settings.auth_token)
- token = f"Bearer {temp_token}"
- extra_headers["Authorization"] = token
-
- url = self.connection_settings.url
- if not url.endswith(self.transcription_config.language.strip()):
- if url.endswith("/"):
- url += self.transcription_config.language.strip()
- else:
- url += f"/{self.transcription_config.language.strip()}"
-
- # Extend connection url with sdk version information
- cli = "-cli" if from_cli is True else ""
- version = get_version()
- parsed_url = urlparse(url)
- query_params = dict(parse_qsl(parsed_url.query))
- query_params["sm-sdk"] = f"python{cli}-{version}"
- updated_query = urlencode(query_params)
- updated_url = urlunparse(parsed_url._replace(query=updated_query))
-
- try:
- async with websockets.connect( # pylint: disable=no-member
- updated_url,
- ssl=self.connection_settings.ssl_context,
- ping_timeout=self.connection_settings.ping_timeout_seconds,
- # Don't limit the max. size of incoming messages
- max_size=None,
- extra_headers=extra_headers,
- ) as self.websocket:
- await self._communicate(stream, audio_settings)
- finally:
- self.session_running = False
- self._session_needs_closing = False
- self.websocket = None
-
-[docs] def stop(self):
- """
- Indicates that the recognition session should be forcefully stopped.
- Only used in conjunction with `run`.
- You probably don't need to call this if you're running the client via
- :py:meth:`run_synchronously`.
- """
- self._session_needs_closing = True
-
-[docs] def run_synchronously(self, *args, timeout=None, **kwargs):
- """
- Run the transcription synchronously.
- :raises asyncio.TimeoutError: If the given timeout is exceeded.
- """
- # pylint: disable=no-value-for-parameter
- asyncio.run(asyncio.wait_for(self.run(*args, **kwargs), timeout=timeout))
-
-
-async def _get_temp_token(api_key):
- """
- Used to get a temporary token from management platform api for SaaS users
- """
- version = get_version()
- mp_api_url = os.getenv("SM_MANAGEMENT_PLATFORM_URL", "https://mp.speechmatics.com")
- endpoint = mp_api_url + "/v1/api_keys"
- params = {"type": "rt", "sm-sdk": f"python-{version}"}
- body = {"ttl": 60}
- headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
- # pylint: disable=no-member
- response = httpx.post(endpoint, json=body, params=params, headers=headers)
- response.raise_for_status()
- response.read()
- key_object = response.json()
- return key_object["key_value"]
-
-# (c) 2020, Cantab Research Ltd.
-"""
-Exceptions and errors used by the library.
-"""
-
-
-
-
-
-[docs]class EndOfTranscriptException(Exception):
- """
- Indicates that the transcription session has finished.
- """
-
-
-[docs]class ForceEndSession(Exception):
- """
- Can be raised by the user from a middleware or event handler
- in order to force the transcription session to end early.
- """
-
-
-
-
-# (c) 2020, Cantab Research Ltd.
-"""
-Helper functions used by the library.
-"""
-
-import asyncio
-import concurrent.futures
-import inspect
-import json
-import os
-import sys
-
-import pkg_resources
-
-
-[docs]def del_none(dictionary):
- """
- Recursively delete from the dictionary all entries which values are None.
- This function changes the input parameter in place.
-
- :param dictionary: input dictionary
- :type dictionary: dict
-
- :return: output dictionary
- :rtype: dict
- """
- for key, value in list(dictionary.items()):
- if value is None:
- del dictionary[key]
- elif isinstance(value, dict):
- del_none(value)
- return dictionary
-
-
-[docs]def json_utf8(func):
- """A decorator to turn a function's return value into JSON"""
-
- def wrapper(*args, **kwargs):
- """wrapper"""
- return json.dumps(func(*args, **kwargs))
-
- return wrapper
-
-
-[docs]async def read_in_chunks(stream, chunk_size):
- """
- Utility method for reading in and yielding chunks
-
- :param stream: file-like object to read audio from
- :type stream: io.IOBase
-
- :param chunk_size: maximum chunk size in bytes
- :type chunk_size: int
-
- :raises ValueError: if no data was read from the stream
-
- :return: a sequence of chunks of data where the length in bytes of each
- chunk is <= max_sample_size and a multiple of max_sample_size
- :rtype: collections.AsyncIterable
-
- """
- while True:
- # Work with both async and synchronous file readers.
- if inspect.iscoroutinefunction(stream.read):
- audio_chunk = await stream.read(chunk_size)
- else:
- # Run the read() operation in a separate thread to avoid blocking the event loop.
- with concurrent.futures.ThreadPoolExecutor() as executor:
- audio_chunk = await asyncio.get_event_loop().run_in_executor(
- executor, stream.read, chunk_size
- )
-
- if not audio_chunk:
- break
- yield audio_chunk
-
-
-[docs]def get_version() -> str:
- """
- Reads the version number from the package or from VERSION file in case
- the package information is not found.
-
- :return: the library version
- :rtype: str
- """
- try:
- version = pkg_resources.get_distribution("speechmatics-python").version
- except pkg_resources.DistributionNotFound:
- # The library is not running from the distributed package
- # Get the version from the VERSION file
- base_path = os.path.abspath(os.path.dirname(__file__))
- version_path = os.path.join(base_path, "..", "VERSION")
- with open(version_path, "r", encoding="utf-8") as version_file:
- version = version_file.read().strip()
-
- return version
-
-
-def _process_status_errors(error):
- """
- Takes an httpx.HTTPSStatusError and prints in a useful format for CLI
-
- :param error: the status error produced by the server for a request
- :type error: httpx.HTTPStatusError
-
- :raises SystemExit: for all cases
- """
-
- error_string = f"{error.request.method} request to {error.request.url} returned {error.response.status_code}"
- if error.response.status_code == 401:
- sys.exit(
- f"Unauthorized: {error_string}. \n Make sure you're using a valid API key or JWT."
- )
- if error.response.status_code == 404:
- sys.exit(
- f"NotFound: {error_string}. Make sure the url and resource id are correct."
- )
- if error.response.status_code == 429:
- sys.exit(
- f"TooManyRequests: {error_string}. "
- + "In order to ensure a good service to all our users, we rate limit requests. "
- + "Consider redesigning your code to reduce the number of requests or spread your requests over time."
- )
- if error.response.status_code in [400, 422]:
- sys.exit(
- f"BadOrUnprocessableRequest: {error_string}.\n\nresponse: {error.response.text}\n"
- + "Make sure the config you've submitted has a valid structure, and that the values are allowed.\n"
- + "(e.g. --lang abc is invalid)."
- )
- sys.exit(f"httpx.HTTPStatusError: {error}")
-
-# (c) 2020, Cantab Research Ltd.
-"""
-Data models and message types used by the library.
-"""
-
-import json
-import ssl
-import sys
-from dataclasses import asdict, dataclass, field, fields
-from enum import Enum
-from typing import Any, Dict, List, Optional
-
-from speechmatics.config import CONFIG_PATH, read_config_from_home
-from speechmatics.constants import BATCH_SELF_SERVICE_URL, RT_SELF_SERVICE_URL
-
-if sys.version_info >= (3, 8):
- from typing import Literal
-else:
- from typing_extensions import Literal # pragma: no cover
-
-
-SummaryContentType = Literal["informative", "conversational", "auto"]
-SummaryLength = Literal["brief", "detailed"]
-SummaryType = Literal["paragraphs", "bullets"]
-
-
-[docs]@dataclass
-class FetchData:
- """Batch: Optional configuration for fetching file for transcription."""
-
- url: str
- """URL to fetch"""
-
- auth_headers: str = None
- """
- A list of additional headers to be added to the input fetch request
- when using http or https. This is intended to support authentication or
- authorization, for example by supplying an OAuth2 bearer token
- """
-
-
-[docs]@dataclass
-class NotificationConfig:
- """Batch: Optional configuration for callback notification."""
-
- url: str
- """URL for notification. The `id` and `status` query parameters will be added."""
-
- contents: List[str] = None
- """
- Specifies a list of items to be attached to the notification message.
- When multiple items are requested, they are included as named file
- attachments.
- """
-
- method: str = "post"
- """The HTTP(S) method to be used. Only `post` and `put` are supported."""
-
- auth_headers: List[str] = None
- """
- A list of additional headers to be added to the notification request
- when using http or https. This is intended to support authentication or
- authorization, for example by supplying an OAuth2 bearer token
- """
-
-
-[docs]@dataclass
-class SRTOverrides:
- """Batch: Optional configuration for SRT output."""
-
- max_line_length: int = 37
- """Maximum count of characters per subtitle line including white space"""
-
- max_lines: int = 2
- """Sets maximum count of lines in a subtitle section"""
-
-
-[docs]@dataclass
-class _TranscriptionConfig: # pylint: disable=too-many-instance-attributes
- """Base model for defining transcription parameters."""
-
- def __init__(self, language=None, **kwargs):
- """
- Ignores values which are not dataclass members when initalising.
- This allows **kwargs to contain fields which are not in the model,
- which is useful for reusing code to build RT and batch configs.
- See cli.get_transcription_config() for an example.
- """
- super().__init__()
- # the language attribute is a special case, as it's a positional parameter
- if language is not None:
- self.language = language
-
- # pylint: disable=consider-using-set-comprehension
- names = set([f.name for f in fields(self)])
- for key, value in kwargs.items():
- if key in names:
- setattr(self, key, value)
-
-[docs] def asdict(self) -> Dict[Any, Any]:
- """Returns model as a dict while excluding None values recursively."""
- return asdict(
- self, dict_factory=lambda x: {k: v for (k, v) in x if v is not None}
- )
-
- language: str = "en"
- """ISO 639-1 language code. eg. `en`"""
-
- operating_point: str = None
- """Specifies which acoustic model to use."""
-
- output_locale: str = None
- """RFC-5646 language code for transcript output. eg. `en-AU`"""
-
- diarization: str = None
- """Indicates type of diarization to use, if any."""
-
- additional_vocab: dict = None
- """Additional vocabulary that is not part of the standard language."""
-
- punctuation_overrides: dict = None
- """Permitted puctuation marks for advanced punctuation."""
-
- domain: str = None
- """Optionally request a language pack optimized for a specific domain,
- e.g. 'finance'"""
-
- enable_entities: bool = None
- """Indicates if inverse text normalization entity output is enabled."""
-
-
-[docs]@dataclass
-class RTSpeakerDiarizationConfig:
- """Real-time mode: Speaker diarization config."""
-
- max_speakers: int = None
- """This enforces the maximum number of speakers allowed in a single audio stream."""
-
-
-[docs]@dataclass
-class TranslationConfig:
- """Translation config."""
-
- target_languages: List[str] = None
- """Target languages for which translation should be produced."""
-
- def asdict(self):
- return asdict(self)
-
-
-[docs]@dataclass
-class RTTranslationConfig(TranslationConfig):
- """Real-time mode: Translation config."""
-
- enable_partials: bool = False
- """Indicates if partial translation, where sentences are produced
- immediately, is enabled."""
-
-
-[docs]@dataclass
-class BatchSpeakerDiarizationConfig:
- """Batch mode: Speaker diarization config."""
-
- speaker_sensitivity: float = None
- """The sensitivity of the speaker detection.
- This is a number between 0 and 1, where 0 means least sensitive and 1 means
- most sensitive."""
-
-
-[docs]@dataclass
-class BatchTranslationConfig(TranslationConfig):
- """Batch mode: Translation config."""
-
-
-[docs]@dataclass
-class BatchLanguageIdentificationConfig:
- """Batch mode: Language identification config."""
-
- expected_languages: List[str] = None
- """Expected languages for language identification"""
-
-
-[docs]@dataclass
-class SummarizationConfig:
- """Defines summarization parameters."""
-
- content_type: SummaryContentType = "auto"
- """Optional summarization content_type parameter."""
-
- summary_length: SummaryLength = "brief"
- """Optional summarization summary_length parameter."""
-
- summary_type: SummaryType = "bullets"
- """Optional summarization summary_type parameter."""
-
-
-
-
-
-[docs]@dataclass
-class TopicDetectionConfig:
- """Defines topic detection parameters."""
-
- topics: List[str] = None
- """Optional list of topics for topic detection."""
-
-
-
-
-
-[docs]@dataclass(init=False)
-class TranscriptionConfig(_TranscriptionConfig):
- # pylint: disable=too-many-instance-attributes
- """
- Real-time: Defines transcription parameters.
- The `.as_config()` method removes translation_config and returns it wrapped into a Speechmatics json config.
- """
-
- max_delay: float = None
- """Maximum acceptable delay."""
-
- max_delay_mode: str = None
- """Determines whether the threshold specified in max_delay can be exceeded
- if a potential entity is detected. Flexible means if a potential entity
- is detected, then the max_delay can be overriden until the end of that
- entity. Fixed means that max_delay specified ignores any potential
- entity that would not be completed within that threshold."""
-
- speaker_diarization_config: RTSpeakerDiarizationConfig = None
- """Configuration for speaker diarization."""
-
- speaker_change_sensitivity: float = None
- """Sensitivity level for speaker change."""
-
- streaming_mode: bool = None
- """Indicates if we run the engine in streaming mode, or regular RT mode."""
-
- ctrl: dict = None
- """Internal Speechmatics flag that allows to give special commands to the engine."""
-
- enable_partials: bool = None
- """Indicates if partials for both transcripts and translation, where words are produced
- immediately, is enabled."""
-
- enable_transcription_partials: bool = None
- """Indicates if partial transcripts, where words are produced
- immediately, is enabled."""
-
- enable_translation_partials: bool = None
- """Indicates if partial translation, where words are produced
- immediately, is enabled."""
-
- translation_config: TranslationConfig = None
- """Optional configuration for translation."""
-
- def as_config(self):
- dictionary = self.asdict()
- dictionary.pop("translation_config", None)
- dictionary.pop("enable_translation_partials", None)
- enable_transcription_partials = dictionary.pop(
- "enable_transcription_partials", False
- )
- # set enable_partials to True if either one is True
- if dictionary.get("enable_partials") is True or enable_transcription_partials:
- dictionary["enable_partials"] = True
-
- return dictionary
-
-
-[docs]@dataclass(init=False)
-class BatchTranscriptionConfig(_TranscriptionConfig):
- # pylint: disable=too-many-instance-attributes
- """Batch: Defines transcription parameters for batch requests.
- The `.as_config()` method will return it wrapped into a Speechmatics json config."""
-
- fetch_data: FetchData = None
- """Optional configuration for fetching file for transcription."""
-
- notification_config: NotificationConfig = None
- """Optional configuration for callback notification."""
-
- language_identification_config: BatchLanguageIdentificationConfig = None
- """Optional configuration for language identification."""
-
- translation_config: TranslationConfig = None
- """Optional configuration for translation."""
-
- srt_overrides: SRTOverrides = None
- """Optional configuration for SRT output."""
-
- speaker_diarization_config: BatchSpeakerDiarizationConfig = None
- """The sensitivity of the speaker detection."""
-
- channel_diarization_labels: List[str] = None
- """Add your own speaker or channel labels to the transcript"""
-
- summarization_config: SummarizationConfig = None
- """Optional configuration for transcript summarization."""
-
- sentiment_analysis_config: Optional[SentimentAnalysisConfig] = None
- """Optional configuration for sentiment analysis of the transcript"""
-
- topic_detection_config: Optional[TopicDetectionConfig] = None
- """Optional configuration for detecting topics of the transcript"""
-
- auto_chapters_config: Optional[AutoChaptersConfig] = None
- """Optional configuration for detecting chapters of the transcript"""
-
- def as_config(self):
- dictionary = self.asdict()
-
- fetch_data = dictionary.pop("fetch_data", None)
- notification_config = dictionary.pop("notification_config", None)
- language_identification_config = dictionary.pop(
- "language_identification_config", None
- )
- translation_config = dictionary.pop("translation_config", None)
- srt_overrides = dictionary.pop("srt_overrides", None)
- summarization_config = dictionary.pop("summarization_config", None)
- sentiment_analysis_config = dictionary.pop("sentiment_analysis_config", None)
- topic_detection_config = dictionary.pop("topic_detection_config", None)
- auto_chapters_config = dictionary.pop("auto_chapters_config", None)
- config = {"type": "transcription", "transcription_config": dictionary}
-
- if fetch_data:
- config["fetch_data"] = fetch_data
-
- if notification_config:
- if isinstance(notification_config, dict):
- notification_config = [notification_config]
- config["notification_config"] = notification_config
-
- if language_identification_config:
- config["language_identification_config"] = language_identification_config
-
- if translation_config:
- config["translation_config"] = translation_config
-
- if srt_overrides:
- config["output_config"] = {"srt_overrides": srt_overrides}
-
- if summarization_config:
- config["summarization_config"] = summarization_config
-
- if sentiment_analysis_config is not None:
- config["sentiment_analysis_config"] = sentiment_analysis_config
-
- if topic_detection_config:
- config["topic_detection_config"] = topic_detection_config
-
- if auto_chapters_config is not None:
- config["auto_chapters_config"] = auto_chapters_config
-
- return json.dumps(config)
-
-
-[docs]@dataclass
-class AudioSettings:
- """Real-time: Defines audio parameters."""
-
- encoding: str = None
- """Encoding format when raw audio is used. Allowed values are
- `pcm_f32le`, `pcm_s16le` and `mulaw`."""
-
- sample_rate: int = 44100
- """Sampling rate in hertz."""
-
- chunk_size: int = 1024 * 4
- """Chunk size."""
-
- def asdict(self):
- if not self.encoding:
- return {"type": "file"}
-
- return {
- "type": "raw",
- "encoding": self.encoding,
- "sample_rate": self.sample_rate,
- }
-
-
-[docs]class UsageMode(str, Enum):
- # pylint: disable=invalid-name
- Batch = "batch"
- RealTime = "rt"
-
-
-[docs]@dataclass
-class ConnectionSettings:
- """Defines connection parameters."""
-
- url: str
- """Websocket server endpoint."""
-
- message_buffer_size: int = 512
- """Message buffer size in bytes."""
-
- ssl_context: ssl.SSLContext = field(default_factory=ssl.create_default_context)
- """SSL context."""
-
- semaphore_timeout_seconds: float = 120
- """Semaphore timeout in seconds."""
-
- ping_timeout_seconds: float = 60
- """Ping-pong timeout in seconds."""
-
- auth_token: Optional[str] = None
- """auth token to authenticate a customer."""
-
- generate_temp_token: Optional[bool] = False
- """Automatically generate a temporary token for authentication.
- Enterprise customers should set this to False."""
-
- def set_missing_values_from_config(self, mode: UsageMode):
- stored_config = read_config_from_home()
- if self.url is None or self.url == "":
- url_key = "realtime_url" if mode == UsageMode.RealTime else "batch_url"
- if stored_config and url_key in stored_config:
- self.url = stored_config[url_key]
- else:
- raise ValueError(f"No URL provided or set in {CONFIG_PATH}")
- if self.auth_token is None or self.auth_token == "":
- if stored_config and stored_config.get("auth_token"):
- self.auth_token = stored_config["auth_token"]
-
- @classmethod
- def create(cls, mode: UsageMode, auth_token: Optional[str] = None):
- stored_config = read_config_from_home()
- default_url = (
- RT_SELF_SERVICE_URL
- if mode == UsageMode.RealTime
- else BATCH_SELF_SERVICE_URL
- )
- url_key = "realtime_url" if mode == UsageMode.RealTime else "batch_url"
- if stored_config and url_key in stored_config:
- url = stored_config[url_key]
- else:
- url = default_url
- if auth_token is not None:
- return ConnectionSettings(
- url=url,
- auth_token=auth_token,
- generate_temp_token=True,
- )
- if stored_config and stored_config.get("auth_token"):
- url = stored_config.get(url_key, default_url)
- return ConnectionSettings(
- url,
- auth_token=stored_config["auth_token"],
- generate_temp_token=stored_config.get("generate_temp_token", True),
- )
- raise ValueError(f"No acces token provided or set in {CONFIG_PATH}")
-
-
-[docs]@dataclass
-class RTConnectionSettings(ConnectionSettings):
- url = f"{RT_SELF_SERVICE_URL}/en"
-
-
-
-
-
-[docs]class ClientMessageType(str, Enum):
- # pylint: disable=invalid-name
- """Real-time: Defines various messages sent from client to server."""
-
- StartRecognition = "StartRecognition"
- """Initiates a recognition job based on configuration set previously."""
-
- AddAudio = "AddAudio"
- """Adds more audio data to the recognition job. The server confirms
- receipt by sending an :py:attr:`ServerMessageType.AudioAdded` message."""
-
- EndOfStream = "EndOfStream"
- """Indicates that the client has no more audio to send."""
-
- SetRecognitionConfig = "SetRecognitionConfig"
- """Allows the client to re-configure the recognition session."""
-
-
-[docs]class ServerMessageType(str, Enum):
- # pylint: disable=invalid-name
- """Real-time: Defines various message types sent from server to client."""
-
- RecognitionStarted = "RecognitionStarted"
- """Server response to :py:attr:`ClientMessageType.StartRecognition`,
- acknowledging that a recognition session has started."""
-
- AudioAdded = "AudioAdded"
- """Server response to :py:attr:`ClientMessageType.AddAudio`, indicating
- that audio has been added successfully."""
-
- AddPartialTranscript = "AddPartialTranscript"
- """Indicates a partial transcript, which is an incomplete transcript that
- is immediately produced and may change as more context becomes available.
- """
-
- AddTranscript = "AddTranscript"
- """Indicates the final transcript of a part of the audio."""
-
- AddPartialTranslation = "AddPartialTranslation"
- """Indicates a partial translation, which is an incomplete translation that
- is immediately produced and may change as more context becomes available.
- """
-
- AddTranslation = "AddTranslation"
- """Indicates the final translation of a part of the audio."""
-
- EndOfTranscript = "EndOfTranscript"
- """Server response to :py:attr:`ClientMessageType.EndOfStream`,
- after the server has finished sending all :py:attr:`AddTranscript`
- messages."""
-
- Info = "Info"
- """Indicates a generic info message."""
-
- Warning = "Warning"
- """Indicates a generic warning message."""
-
- Error = "Error"
- """Indicates n generic error message."""
-