From bebc3ed5a12116d31b218de4f085c0db5c23ca67 Mon Sep 17 00:00:00 2001 From: jacobi petrucciani Date: Thu, 29 Aug 2019 16:55:26 -0400 Subject: [PATCH] adding send_batch, fixing tests, removing __del__ from Job --- README.rst | 1 - qoo/queues.py | 82 +++++++++++++++++++++++++++++++++++++++++------ qoo/utils.py | 16 ++++++--- qoo/workers.py | 31 ------------------ setup.py | 2 +- tests/test_qoo.py | 68 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 152 insertions(+), 48 deletions(-) delete mode 100644 qoo/workers.py diff --git a/README.rst b/README.rst index bae2804..80ab926 100644 --- a/README.rst +++ b/README.rst @@ -75,7 +75,6 @@ Basic Usage job.user_id # the string "test_user" # delete the job from the SQS queue - del job job.delete() diff --git a/qoo/queues.py b/qoo/queues.py index 46cf305..2dd6a9d 100644 --- a/qoo/queues.py +++ b/qoo/queues.py @@ -5,8 +5,12 @@ import os import time import hashlib -from qoo.utils import jsond, jsonl -from typing import Dict, List, Optional +import json +from qoo.utils import chunk, jsond, jsonl, new_uuid +from typing import Dict, List, Optional, Union + + +MAX_MESSAGES = 10 class Job: @@ -18,7 +22,12 @@ def __init__(self, sqs_message: dict, queue: "Queue") -> None: self._data = sqs_message self._md5 = self._data["MD5OfBody"] self._id = self._data["MessageId"] - self._body = jsonl(self._data["Body"]) + try: + self._body = jsonl(self._data["Body"]) + for key in self._body: + setattr(self, key, self._body[key]) + except json.decoder.JSONDecodeError: + self._body = self._data["Body"] self._attributes = self._data["Attributes"] self._sent_at = float(self._attributes["SentTimestamp"]) / 1000 self._received_at = float(time.time()) @@ -26,10 +35,16 @@ def __init__(self, sqs_message: dict, queue: "Queue") -> None: self.approximate_receive_count = int( self._attributes["ApproximateReceiveCount"] ) - for key in self._body: - setattr(self, key, self._body[key]) self._handle = self._data["ReceiptHandle"] + def __contains__(self, key: str) -> bool: + """check if the given key exists in this job""" + return key in dir(self) + + def __eq__(self, other: "Job") -> bool: + """check if this Job is equal to another""" + return self._handle == other._handle + def __str__(self) -> str: """return a human-friendly object representation""" return "".format(self._id) @@ -38,10 +53,6 @@ def __repr__(self) -> str: """repr""" return self.__str__() - def __del__(self) -> Dict: - """del keyword for the job""" - return self.delete() - def delete(self) -> Dict: """delete this object""" return self._queue.delete_job(self._handle) @@ -56,6 +67,9 @@ def md5_matches(self) -> bool: class Queue: """sqs queue""" + SUCCESS = "Successful" + FAILED = "Failed" + def __init__( self, name: str, @@ -64,11 +78,13 @@ def __init__( aws_secret_access_key: str = "", max_messages: int = 1, wait_time: int = 10, + async_send: bool = False, ) -> None: """queue constructor""" self.name = name self._max_messages = max_messages self._wait_time = wait_time + self._async = async_send self._region_name = region_name or os.environ.get("AWS_DEFAULT_REGION") self._aws_access_key_id = aws_access_key_id or os.environ.get( "AWS_ACCESS_KEY_ID" @@ -144,6 +160,47 @@ def send_job(self, **attributes) -> str: ) return response["MessageId"] + def send_batch( + self, + raw_jobs: List[Union[Dict, str]], + delay_seconds: int = 0, + auto_metadata: bool = True, + ) -> List[Dict]: + """ + send a batch of jobs to the queue, chunked into 10s + + accepts: + a list of message bodies + a list of dicts to json.dumps into message bodies + """ + jobs = raw_jobs + successful = [] + failed = [] + + # if default, treat each list item as just the message body + if auto_metadata: + jobs = [ + { + "Id": new_uuid(), + "MessageBody": x if isinstance(x, str) else jsond(x), + "DelaySeconds": delay_seconds, + } + for x in raw_jobs + ] + + # send in batches of 10 + for job_batch in chunk(jobs, size=MAX_MESSAGES): + response = self._client.send_message_batch( + QueueUrl=self._queue_url, Entries=job_batch + ) + if Queue.SUCCESS in response: + successful.extend(response[Queue.SUCCESS]) + if Queue.FAILED in response: + failed.extend(response[Queue.FAILED]) + + # return the list of successful and failed jobs + return {Queue.SUCCESS: successful, Queue.FAILED: failed} + def receive_jobs( self, max_messages: int = None, @@ -151,9 +208,10 @@ def receive_jobs( attribute_names: str = "All", ) -> List[Job]: """receive a list of jobs from the queue""" + num_messages = max_messages if max_messages else self._max_messages jobs = self._client.receive_message( QueueUrl=self._queue_url, - MaxNumberOfMessages=max_messages if max_messages else self._max_messages, + MaxNumberOfMessages=num_messages, WaitTimeSeconds=wait_time if wait_time else self._wait_time, AttributeNames=[attribute_names], ) @@ -171,3 +229,7 @@ def delete_job(self, handle: str) -> Dict: return self._client.delete_message( QueueUrl=self._queue_url, ReceiptHandle=handle ) + + def purge(self) -> None: + """purge the queue""" + self._client.purge_queue(QueueUrl=self._queue_url) diff --git a/qoo/utils.py b/qoo/utils.py index 3f714ce..a6fa94d 100644 --- a/qoo/utils.py +++ b/qoo/utils.py @@ -4,7 +4,8 @@ import datetime import json import re -from typing import Any, Mapping +import uuid +from typing import Any, Iterator, List, Mapping FIRST_CAP = re.compile("(.)([A-Z][a-z]+)") @@ -16,6 +17,11 @@ ) +def new_uuid() -> str: + """returns a fresh uuid""" + return str(uuid.uuid4()) + + def jsonl(json_string: str) -> Mapping: """loads a dict from a given json string""" return json.loads(json_string) @@ -26,7 +32,7 @@ def jsond(obj: Mapping, **kwargs: Any) -> str: return json.dumps(obj, **kwargs) -def snakeify(text: str) -> str: - """camelCase to snake_case""" - first_string = FIRST_CAP.sub(r"\1_\2", text) - return ALL_CAP.sub(r"\1_\2", first_string).lower() +def chunk(items: List, size: int = 10) -> Iterator[List]: + """chunk a list into n lists of $size""" + for x in range(0, len(items), size): + yield items[x : x + size] diff --git a/qoo/workers.py b/qoo/workers.py deleted file mode 100644 index 5acda19..0000000 --- a/qoo/workers.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -worker class - not yet used -""" -from time import sleep - - -class Worker: - """worker class""" - - def __init__(self, queues): - """worker constructor""" - self.queues = queues - - def __str__(self) -> str: - """return a human-friendly object representation""" - return "".format(self.queues) - - def __repr__(self) -> str: - """repr""" - return self.__str__() - - def work(self, burst=False, wait_seconds=5): - """work method""" - while True: - for queue in self.queues: - for job in queue.jobs: - job.run() - queue.remove_job(job) - - if not burst: - sleep(wait_seconds) diff --git a/setup.py b/setup.py index ff7181c..70b952b 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name="qoo", - version="0.0.2", + version="0.0.3", description=("A simple library for interacting with Amazon SQS."), long_description=LONG_DESCRIPTION, author="Jacobi Petrucciani", diff --git a/tests/test_qoo.py b/tests/test_qoo.py index c7ed23d..10224a9 100644 --- a/tests/test_qoo.py +++ b/tests/test_qoo.py @@ -67,6 +67,68 @@ def test_can_send_job(queue): assert len(queue) > 0 +def test_can_send_batch_jobs(queue): + """test that we can send many jobs into the queue""" + responses = queue.send_batch( + [ + {"job": 0, "message": "test 0"}, + {"job": 1, "message": "test 1"}, + {"job": 2, "message": "test 2"}, + {"job": 3, "message": "test 3"}, + {"job": 4, "message": "test 4"}, + {"job": 5, "message": "test 5"}, + {"job": 6, "message": "test 6"}, + {"job": 7, "message": "test 7"}, + {"job": 8, "message": "test 8"}, + {"job": 9, "message": "test 9"}, + {"job": 10, "message": "test 10"}, + {"job": 11, "message": "test 11"}, + {"job": 12, "message": "test 12"}, + {"job": 13, "message": "test 13"}, + {"job": 14, "message": "test 14"}, + {"job": 15, "message": "test 15"}, + {"job": 16, "message": "test 16"}, + {"job": 17, "message": "test 17"}, + {"job": 18, "message": "test 18"}, + {"job": 19, "message": "test 19"}, + {"job": 20, "message": "test 20"}, + ] + ) + assert len(queue) == 21 + assert len(responses["Successful"]) == 21 + + # send as list of strings + responses = queue.send_batch( + [ + "this", + "is", + "an", + "example", + "of", + "sending", + "batch", + "messages", + "as", + "a", + "list", + "of", + "strings", + ] + ) + assert len(queue) == 34 + assert len(responses["Successful"]) == 13 + jobs = queue.receive_jobs(max_messages=10) + assert len(jobs) == 10 + assert "message" in jobs[0] + assert jobs[0].job == 0 + jobs = queue.receive_jobs(max_messages=10) + assert len(jobs) == 10 + jobs = queue.receive_jobs(max_messages=10) + assert len(jobs) == 10 + jobs = queue.receive_jobs(max_messages=10) + assert len(jobs) == 4 + + def test_can_send_and_receive_job(queue): """test that we can send a job into the queue, and pull it back out""" queue.send(info="test_job") @@ -78,6 +140,12 @@ def test_can_send_and_receive_job(queue): assert job.elapsed > 0.0 +def test_can_purge_queue(queue_with_job): + """test that we can purge a queue""" + queue_with_job.purge() + assert len(queue_with_job) == 0 + + def test_can_delete_job(queue_with_job): """test that we can delete a job from the queue""" job = queue_with_job.receive()