Skip to content

Commit

Permalink
Avoid order race conditions (#8)
Browse files Browse the repository at this point in the history
* Minor refactors

* Fix bug during the order payment retry

* Optimal way to build docker image with app

* Upgrade tests to support multiple clients

* Ensure base for testing concurrent payments

* Fix payment test comments

* Test payment trigger over empty carts

* Sync concurrent orders from the impacted stock

* Use product-based locks when ordering concurrently

* Mock Redis usage during testing for locking
  • Loading branch information
cmin764 authored Dec 17, 2024
1 parent 9387fde commit 02e1bcd
Show file tree
Hide file tree
Showing 18 changed files with 452 additions and 111 deletions.
2 changes: 1 addition & 1 deletion .env.template
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
LOG_LEVEL=INFO
DEBUG=false

# Postgres
POSTGRES_SERVER=localhost
POSTGRES_PORT=5432
POSTGRES_USER=deep
POSTGRES_PASSWORD=icecream
POSTGRES_DB=deep_ice
Expand Down
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ WORKDIR /app
# Install uv deps with pip.
RUN pip install uv
COPY pyproject.toml .
RUN uv export --no-dev >requirements.txt && pip install -Ur requirements.txt
RUN uv pip install --system -Ur pyproject.toml

# Copy the rest of the application code.
# Copy the rest of the application code and install the project too.
COPY . .
RUN uv pip install --system -e .

# Run the FastAPI app using uvicorn on default port.
EXPOSE 80

# Command to run the FastAPI app using uvicorn.
CMD ["fastapi", "run", "deep_ice", "--port", "80"]
2 changes: 1 addition & 1 deletion deep_ice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async def lifespan(fast_app: FastAPI):
class TaskQueue:
functions = [payment_service.make_payment_task]
redis_settings = redis_settings
max_tries = settings.TASK_MAX_RETRIES
max_tries = settings.TASK_MAX_TRIES
retry_delay = settings.TASK_RETRY_DELAY


Expand Down
6 changes: 3 additions & 3 deletions deep_ice/api/routes/cart.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ async def get_cart_items(current_user: CurrentUserDep, cart_service: CartService
return cart


@router.post("/items", response_model=RetrieveCartItem)
@router.post(
"/items", response_model=RetrieveCartItem, status_code=status.HTTP_201_CREATED
)
async def add_item_to_cart(
session: SessionDep,
current_user: CurrentUserDep,
cart_service: CartServiceDep,
item: Annotated[CreateCartItem, Body()],
response: Response,
):
cart = await cart_service.ensure_cart(cast(int, current_user.id))
cart_item = CartItem(cart_id=cart.id, **item.model_dump())
Expand All @@ -65,7 +66,6 @@ async def add_item_to_cart(
else:
cart_item.icecream = icecream

response.status_code = status.HTTP_201_CREATED
return cart_item


Expand Down
95 changes: 61 additions & 34 deletions deep_ice/api/routes/payments.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,30 @@
from typing import Annotated, cast

import sentry_sdk
from aioredlock import LockError
from fastapi import APIRouter, Body, HTTPException, Request, Response, status
from fastapi.responses import RedirectResponse
from sqlalchemy.exc import SQLAlchemyError
from sqlmodel.ext.asyncio.session import AsyncSession

from deep_ice.core import logger
from deep_ice.core.dependencies import CurrentUserDep, SessionDep
from deep_ice.models import PaymentMethod, PaymentStatus, RetrievePayment
from deep_ice.services.cart import CartService
from deep_ice.core.dependencies import (
CartServiceDep,
CurrentUserDep,
RedlockDep,
SessionDep,
)
from deep_ice.models import Cart, Payment, PaymentMethod, PaymentStatus, RetrievePayment
from deep_ice.services.order import OrderService
from deep_ice.services.payment import PaymentError, PaymentService, payment_stub
from deep_ice.services.stats import stats_service

router = APIRouter()


@router.post("", response_model=RetrievePayment)
async def make_payment(
session: SessionDep,
current_user: CurrentUserDep,
method: Annotated[PaymentMethod, Body(embed=True)],
request: Request,
response: Response,
):
# FIXME(cmin764): Check if we need an async Lock primitive here in order to allow
# only one user to submit an order at a time. (based on available stock check)
cart_service = CartService(session)
cart = await cart_service.get_cart(cast(int, current_user.id))
if not cart or not cart.items:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="There are no items in the cart",
)

cart_ok = await cart_service.check_items_against_stock(cart)
if not cart_ok:
# Redirect back to the cart so we get aware of the new state based on the
# available stock. And let the user decide if it continues with a payment.
return RedirectResponse(url=request.url_for("get_cart_items"))

async def _make_payment(
session: AsyncSession, *, cart: Cart, method: PaymentMethod, response: Response
) -> Payment:
# Items are available and ready to be sold, make the order and pay for it.
order_service = OrderService(session, stats_service=stats_service)
payment_service = PaymentService(
Expand All @@ -62,14 +47,56 @@ async def make_payment(
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Payment failed"
)
else:
await session.commit()
response.status_code = (
status.HTTP_202_ACCEPTED
if payment.status == PaymentStatus.PENDING
else status.HTTP_201_CREATED

await session.commit()
response.status_code = (
status.HTTP_202_ACCEPTED
if payment.status == PaymentStatus.PENDING
else status.HTTP_201_CREATED
)
return payment


@router.post("", response_model=RetrievePayment)
async def make_payment(
session: SessionDep,
current_user: CurrentUserDep,
cart_service: CartServiceDep,
redlock: RedlockDep,
method: Annotated[PaymentMethod, Body(embed=True)],
request: Request,
response: Response,
):
cart = await cart_service.get_cart(cast(int, current_user.id))
if not cart or not cart.items:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="There are no items in the cart",
)
return payment

lock_keys = [f"ice-lock:{item.icecream_id}" for item in cart.items]
locks = []
try:
for lock_key in lock_keys:
lock = await redlock.lock(lock_key)
locks.append(lock)

cart_ok = await cart_service.check_items_against_stock(cart)
if not cart_ok:
# Redirect back to the cart so we get aware of the new state based on
# the available stock. And let the user decide if it continues with a
# payment.
return RedirectResponse(url=request.url_for("get_cart_items"))

return await _make_payment(
session, cart=cart, method=method, response=response
)
except LockError as exc:
logger.exception("Payment lock error with key %r: %s", lock_key, exc)
sentry_sdk.capture_exception(exc)
finally:
for lock in locks:
await redlock.unlock(lock)


@router.get("", response_model=list[RetrievePayment])
Expand Down
9 changes: 6 additions & 3 deletions deep_ice/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ class Settings(BaseSettings):
model_config = SettingsConfigDict(
# Use the top level .env file (one level above ./deep_ice/).
env_file=".env",
env_ignore_empty=True,
env_ignore_empty=False,
extra="ignore",
)

LOG_LEVEL: str = "INFO"
DEBUG: bool = False
PROJECT_NAME: str = "Deep Ice"
API_V1_STR: str = "/v1"

Expand All @@ -30,8 +31,10 @@ class Settings(BaseSettings):
POSTGRES_DB: str

REDIS_HOST: str = "localhost"
REDIS_PORT: int = 6379
REDLOCK_TTL: int = 30 # seconds for the lock to persists in Redis

TASK_MAX_RETRIES: int = 3
TASK_MAX_TRIES: int = 3
TASK_RETRY_DELAY: int = 1 # seconds between retries
TASK_BACKOFF_FACTOR: int = 5 # seconds to wait based on the job try counter

Expand All @@ -52,4 +55,4 @@ def SQLALCHEMY_DATABASE_URI(self) -> PostgresDsn:


settings = Settings() # type: ignore
redis_settings = RedisSettings(host=settings.REDIS_HOST)
redis_settings = RedisSettings(host=settings.REDIS_HOST, port=settings.REDIS_PORT)
2 changes: 1 addition & 1 deletion deep_ice/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from deep_ice.core.config import settings

async_engine = create_async_engine(
str(settings.SQLALCHEMY_DATABASE_URI), echo=True, future=True
str(settings.SQLALCHEMY_DATABASE_URI), echo=settings.DEBUG, future=True
)


Expand Down
15 changes: 13 additions & 2 deletions deep_ice/core/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from typing import Annotated
from typing import Annotated, AsyncGenerator

import jwt
import sentry_sdk
from aioredlock import Aioredlock
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import InvalidTokenError
from pydantic import ValidationError
from sqlmodel.ext.asyncio.session import AsyncSession

from deep_ice.core import logger, security
from deep_ice.core.config import settings
from deep_ice.core.config import redis_settings, settings
from deep_ice.core.database import get_async_session
from deep_ice.models import TokenPayload, User
from deep_ice.services.cart import CartService
Expand Down Expand Up @@ -51,5 +52,15 @@ async def get_cart_service(session: SessionDep) -> CartService:
return CartService(session)


async def get_lock_manager() -> AsyncGenerator[Aioredlock, None]:
lock_manager = Aioredlock(
[{"host": redis_settings.host, "port": redis_settings.port}],
internal_lock_timeout=settings.REDLOCK_TTL,
)
yield lock_manager
await lock_manager.destroy()


CurrentUserDep = Annotated[User, Depends(get_current_user)]
CartServiceDep = Annotated[CartService, Depends(get_cart_service)]
RedlockDep = Annotated[Aioredlock, Depends(get_lock_manager)]
2 changes: 1 addition & 1 deletion deep_ice/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class BaseIceCream(SQLModel):
class IceCream(BaseIceCream, FetchMixin, table=True):
id: Annotated[int | None, Field(primary_key=True)] = None
stock: int
blocked_quantity: int = 0 # reserved for payments only
blocked_quantity: int = 0 # reserved during payments
is_active: bool = True

cart_items: list["CartItem"] = Relationship(
Expand Down
8 changes: 8 additions & 0 deletions deep_ice/services/cart.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,16 @@ async def ensure_cart(self, user_id: int) -> Cart:

return cart

async def _refresh_icecream_stock(self, cart: Cart) -> None:
await self._session.refresh(cart)
for cart_item in cart.items:
await self._session.refresh(cart_item)
cart_item.icecream = await cart_item.awaitable_attrs.icecream
await self._session.refresh(cart_item.icecream)

async def check_items_against_stock(self, cart: Cart) -> bool:
# Ensure once again that we still have on stock the items we intend to buy.
await self._refresh_icecream_stock(cart)
cart_ok = True
for item in cart.items:
if item.quantity > item.icecream.available_stock:
Expand Down
5 changes: 2 additions & 3 deletions deep_ice/services/order.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ async def confirm_order(self, order_id: int):

icecream.stock -= item.quantity
icecream.blocked_quantity -= item.quantity
self._session.add(icecream)
await self._stats_service.acknowledge_icecream_demand(
cast(int, icecream.id), name=icecream.name, quantity=item.quantity
)

self._session.add_all(order.items)

async def cancel_order(self, order_id: int):
order = await self._get_order(order_id)
order.status = OrderStatus.CANCELLED
Expand All @@ -62,7 +61,7 @@ async def cancel_order(self, order_id: int):
continue

icecream.blocked_quantity -= item.quantity
self._session.add_all(order.items)
self._session.add(icecream)

async def make_order_from_cart(self, cart: Cart) -> Order:
# Creates and saves an order out of the current cart and returns it for later
Expand Down
12 changes: 8 additions & 4 deletions deep_ice/services/payment.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def make_payment_task(
msg = f"{method.value} payment for order #{order_id} failed, retrying..."
logger.warning(msg)
sentry_sdk.capture_message(msg, level="warning")
raise Retry(defer=attempts * settings.TASK_BACKOFF_FACTOR)
raise Retry(defer=attempts * settings.TASK_BACKOFF_FACTOR)

async for session in get_async_session():
order_service = OrderService(session, stats_service=stats_service)
Expand Down Expand Up @@ -82,7 +82,9 @@ class PaymentStub(PaymentInterface):

min_delay: int
max_delay: int
allow_failures: bool = False # enable failures or not
# Enable failures (or not) and at what rate.
allow_failures: bool = False
failure_rate: float = 0.2

async def make_payment(
self,
Expand Down Expand Up @@ -124,7 +126,9 @@ async def make_payment(
if self.allow_failures:
# Simulate payment result: 80% chance of success, 20% chance of failure.
payment_result = random.choices(
[PaymentStatus.SUCCESS, PaymentStatus.FAILED], weights=[80, 20], k=1
[PaymentStatus.SUCCESS, PaymentStatus.FAILED],
weights=[1 - self.failure_rate, self.failure_rate],
k=1,
)[0]
else:
payment_result = PaymentStatus.SUCCESS
Expand Down Expand Up @@ -196,4 +200,4 @@ async def set_order_payment_status(self, order_id: int, status: PaymentStatus):
self._session.add(payment)


payment_stub = PaymentStub(1, 3, allow_failures=True)
payment_stub = PaymentStub(1, 3, allow_failures=True, failure_rate=0.2)
6 changes: 4 additions & 2 deletions deep_ice/services/stats.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from collections import OrderedDict

import redis.asyncio as redis
import redis.asyncio as aioredis

from deep_ice.core.config import redis_settings

Expand All @@ -22,7 +22,9 @@ class StatsService(StatsInterface):
POPULARITY_KEY = "POPULAR_ICECREAM"

def __init__(self):
self._client = redis.Redis(host=redis_settings.host)
self._client = aioredis.Redis(
host=redis_settings.host, port=redis_settings.port
)

@staticmethod
def _get_product_key(*args: int | str) -> str:
Expand Down
Loading

0 comments on commit 02e1bcd

Please sign in to comment.