Skip to content

Commit

Permalink
adding send_batch, fixing tests, removing __del__ from Job
Browse files Browse the repository at this point in the history
  • Loading branch information
jpetrucciani committed Aug 29, 2019
1 parent 298252b commit bebc3ed
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 48 deletions.
1 change: 0 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ Basic Usage
job.user_id # the string "test_user"
# delete the job from the SQS queue
del job
job.delete()
Expand Down
82 changes: 72 additions & 10 deletions qoo/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
import os
import time
import hashlib
from qoo.utils import jsond, jsonl
from typing import Dict, List, Optional
import json
from qoo.utils import chunk, jsond, jsonl, new_uuid
from typing import Dict, List, Optional, Union


MAX_MESSAGES = 10


class Job:
Expand All @@ -18,18 +22,29 @@ def __init__(self, sqs_message: dict, queue: "Queue") -> None:
self._data = sqs_message
self._md5 = self._data["MD5OfBody"]
self._id = self._data["MessageId"]
self._body = jsonl(self._data["Body"])
try:
self._body = jsonl(self._data["Body"])
for key in self._body:
setattr(self, key, self._body[key])
except json.decoder.JSONDecodeError:
self._body = self._data["Body"]
self._attributes = self._data["Attributes"]
self._sent_at = float(self._attributes["SentTimestamp"]) / 1000
self._received_at = float(time.time())
self.elapsed = self._received_at - self._sent_at
self.approximate_receive_count = int(
self._attributes["ApproximateReceiveCount"]
)
for key in self._body:
setattr(self, key, self._body[key])
self._handle = self._data["ReceiptHandle"]

def __contains__(self, key: str) -> bool:
"""check if the given key exists in this job"""
return key in dir(self)

def __eq__(self, other: "Job") -> bool:
"""check if this Job is equal to another"""
return self._handle == other._handle

def __str__(self) -> str:
"""return a human-friendly object representation"""
return "<Job[{}]>".format(self._id)
Expand All @@ -38,10 +53,6 @@ def __repr__(self) -> str:
"""repr"""
return self.__str__()

def __del__(self) -> Dict:
"""del keyword for the job"""
return self.delete()

def delete(self) -> Dict:
"""delete this object"""
return self._queue.delete_job(self._handle)
Expand All @@ -56,6 +67,9 @@ def md5_matches(self) -> bool:
class Queue:
"""sqs queue"""

SUCCESS = "Successful"
FAILED = "Failed"

def __init__(
self,
name: str,
Expand All @@ -64,11 +78,13 @@ def __init__(
aws_secret_access_key: str = "",
max_messages: int = 1,
wait_time: int = 10,
async_send: bool = False,
) -> None:
"""queue constructor"""
self.name = name
self._max_messages = max_messages
self._wait_time = wait_time
self._async = async_send
self._region_name = region_name or os.environ.get("AWS_DEFAULT_REGION")
self._aws_access_key_id = aws_access_key_id or os.environ.get(
"AWS_ACCESS_KEY_ID"
Expand Down Expand Up @@ -144,16 +160,58 @@ def send_job(self, **attributes) -> str:
)
return response["MessageId"]

def send_batch(
self,
raw_jobs: List[Union[Dict, str]],
delay_seconds: int = 0,
auto_metadata: bool = True,
) -> List[Dict]:
"""
send a batch of jobs to the queue, chunked into 10s
accepts:
a list of message bodies
a list of dicts to json.dumps into message bodies
"""
jobs = raw_jobs
successful = []
failed = []

# if default, treat each list item as just the message body
if auto_metadata:
jobs = [
{
"Id": new_uuid(),
"MessageBody": x if isinstance(x, str) else jsond(x),
"DelaySeconds": delay_seconds,
}
for x in raw_jobs
]

# send in batches of 10
for job_batch in chunk(jobs, size=MAX_MESSAGES):
response = self._client.send_message_batch(
QueueUrl=self._queue_url, Entries=job_batch
)
if Queue.SUCCESS in response:
successful.extend(response[Queue.SUCCESS])
if Queue.FAILED in response:
failed.extend(response[Queue.FAILED])

# return the list of successful and failed jobs
return {Queue.SUCCESS: successful, Queue.FAILED: failed}

def receive_jobs(
self,
max_messages: int = None,
wait_time: int = None,
attribute_names: str = "All",
) -> List[Job]:
"""receive a list of jobs from the queue"""
num_messages = max_messages if max_messages else self._max_messages
jobs = self._client.receive_message(
QueueUrl=self._queue_url,
MaxNumberOfMessages=max_messages if max_messages else self._max_messages,
MaxNumberOfMessages=num_messages,
WaitTimeSeconds=wait_time if wait_time else self._wait_time,
AttributeNames=[attribute_names],
)
Expand All @@ -171,3 +229,7 @@ def delete_job(self, handle: str) -> Dict:
return self._client.delete_message(
QueueUrl=self._queue_url, ReceiptHandle=handle
)

def purge(self) -> None:
"""purge the queue"""
self._client.purge_queue(QueueUrl=self._queue_url)
16 changes: 11 additions & 5 deletions qoo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import datetime
import json
import re
from typing import Any, Mapping
import uuid
from typing import Any, Iterator, List, Mapping


FIRST_CAP = re.compile("(.)([A-Z][a-z]+)")
Expand All @@ -16,6 +17,11 @@
)


def new_uuid() -> str:
"""returns a fresh uuid"""
return str(uuid.uuid4())


def jsonl(json_string: str) -> Mapping:
"""loads a dict from a given json string"""
return json.loads(json_string)
Expand All @@ -26,7 +32,7 @@ def jsond(obj: Mapping, **kwargs: Any) -> str:
return json.dumps(obj, **kwargs)


def snakeify(text: str) -> str:
"""camelCase to snake_case"""
first_string = FIRST_CAP.sub(r"\1_\2", text)
return ALL_CAP.sub(r"\1_\2", first_string).lower()
def chunk(items: List, size: int = 10) -> Iterator[List]:
"""chunk a list into n lists of $size"""
for x in range(0, len(items), size):
yield items[x : x + size]
31 changes: 0 additions & 31 deletions qoo/workers.py

This file was deleted.

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

setup(
name="qoo",
version="0.0.2",
version="0.0.3",
description=("A simple library for interacting with Amazon SQS."),
long_description=LONG_DESCRIPTION,
author="Jacobi Petrucciani",
Expand Down
68 changes: 68 additions & 0 deletions tests/test_qoo.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,68 @@ def test_can_send_job(queue):
assert len(queue) > 0


def test_can_send_batch_jobs(queue):
"""test that we can send many jobs into the queue"""
responses = queue.send_batch(
[
{"job": 0, "message": "test 0"},
{"job": 1, "message": "test 1"},
{"job": 2, "message": "test 2"},
{"job": 3, "message": "test 3"},
{"job": 4, "message": "test 4"},
{"job": 5, "message": "test 5"},
{"job": 6, "message": "test 6"},
{"job": 7, "message": "test 7"},
{"job": 8, "message": "test 8"},
{"job": 9, "message": "test 9"},
{"job": 10, "message": "test 10"},
{"job": 11, "message": "test 11"},
{"job": 12, "message": "test 12"},
{"job": 13, "message": "test 13"},
{"job": 14, "message": "test 14"},
{"job": 15, "message": "test 15"},
{"job": 16, "message": "test 16"},
{"job": 17, "message": "test 17"},
{"job": 18, "message": "test 18"},
{"job": 19, "message": "test 19"},
{"job": 20, "message": "test 20"},
]
)
assert len(queue) == 21
assert len(responses["Successful"]) == 21

# send as list of strings
responses = queue.send_batch(
[
"this",
"is",
"an",
"example",
"of",
"sending",
"batch",
"messages",
"as",
"a",
"list",
"of",
"strings",
]
)
assert len(queue) == 34
assert len(responses["Successful"]) == 13
jobs = queue.receive_jobs(max_messages=10)
assert len(jobs) == 10
assert "message" in jobs[0]
assert jobs[0].job == 0
jobs = queue.receive_jobs(max_messages=10)
assert len(jobs) == 10
jobs = queue.receive_jobs(max_messages=10)
assert len(jobs) == 10
jobs = queue.receive_jobs(max_messages=10)
assert len(jobs) == 4


def test_can_send_and_receive_job(queue):
"""test that we can send a job into the queue, and pull it back out"""
queue.send(info="test_job")
Expand All @@ -78,6 +140,12 @@ def test_can_send_and_receive_job(queue):
assert job.elapsed > 0.0


def test_can_purge_queue(queue_with_job):
"""test that we can purge a queue"""
queue_with_job.purge()
assert len(queue_with_job) == 0


def test_can_delete_job(queue_with_job):
"""test that we can delete a job from the queue"""
job = queue_with_job.receive()
Expand Down

0 comments on commit bebc3ed

Please sign in to comment.