Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
boxysean committed Nov 11, 2020
0 parents commit efa8df0
Show file tree
Hide file tree
Showing 24 changed files with 1,407 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
site/
build/
dist/
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [1.0.0] - 2020-11-11
### Added
- Initial release of pylateral.

[Unreleased]: https://github.com/boxysean/pylateral/compare/v1.0.0...HEAD
[1.0.0]: https://github.com/boxysean/pylateral/releases/tag/v1.0.0

21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2020 Sean McIntyre

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.PHONY: test
test: ## Run the tests
pytest tests/
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
pylateral
=========

Intuitive multi-threaded task processing in python.

## Example

import urllib.request

@pylateral.task
def request_and_print(url):
response = urllib.request.urlopen(url)
print(response.read())
URLS = [
"https://www.nytimes.com/",
"https://www.cnn.com/",
"https://europe.wsj.com/",
"https://www.bbc.co.uk/",
"https://some-made-up-domain.com/",
]

with pylateral.task_pool():
for url in URLS:
request_and_print(url)

print("Complete!")

201 changes: 201 additions & 0 deletions docs/comparison.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
Comparison with other python libraries
======================================

There's lots of way to skin the threading cat!

### When to use *pylateral*

- Your workload is network-bound and/or IO-bound (e.g., API calls, database queries, read/write to FTP, read/write to files).

- Your workload can be run [embarrassingly parallel](https://en.wikipedia.org/wiki/Embarrassingly_parallel).

- You are writing a script or prototype that isn't very large nor complex.

### When not to use *pylateral*

- Your workload is CPU-bound and blocked by the [Global Interpreter Lock](https://en.wikipedia.org/wiki/CPython#Design). *python* threading will not help speed up your workload, consider using [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) or [concurrent.futures.ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) instead.

- The complexity of your program would benefit from thinking about it in terms of [futures and promises](https://en.wikipedia.org/wiki/Futures_and_promises). Consider using [asyncio](https://docs.python.org/3/library/asyncio.html) or [concurrent.futures.ThreadPoolExecutors](https://docs.python.org/3/library/concurrent.futures.html) instead.

- When you want to have tighter controls around the lifecycle of your thread. Consider using [threading](https://docs.python.org/3/library/threading.html) instead.

- For larger workloads, consider using [dask.distributed](https://distributed.dask.org/en/latest/#), [Airflow](https://airflow.apache.org/), [Dagster](https://github.com/dagster-io/dagster/) or [Prefect](https://www.prefect.io/) to perform work across many nodes.

- You would benefit from a web UI for viewing and interacting with your tasks. For that, consider using [Airflow](https://airflow.apache.org/) or [Prefect](https://www.prefect.io/).

Feature comparison
------------------

| Feature | pylateral | [asyncio](https://docs.python.org/3/library/asyncio.html) | [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) | [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) | [threading](https://docs.python.org/3/library/threading.html) |
| ---------------------------------- | --------- | ------- | ------------------------ | --------------- | --------- |
| Easy to adapt single-threaded code ||||||
| [Simple nested tasks](usage.md#working-with-nested-tasks) ||||||
| Concurrent IO-bound workloads ||||||
| Concurrent CPU-bound workloads ||| ✅ (Process Pool) |||
| Flexibility in using return values ||||||

Code comparison
----------

[PEP-3148 -- futures - execute computations asynchronously](https://www.python.org/dev/peps/pep-3148/#id13) introduces `concurrent.futures` and illustrates it by example. Here I show that example in *pylateral*, stacked up against the main threading libraries offered in python.

### `asyncio`

```python
import aiohttp
import asyncio
import sqlite3

URLS = [
'http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/',
]

async def extract_and_load(url, timeout=30):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as response:
web_result = await response.text()
print(f"{url} is {len(web_result)} bytes")

with sqlite3.connect('example.db') as conn, conn as cursor:
cursor.execute('CREATE TABLE IF NOT EXISTS web_results (url text, length int);')
cursor.execute('INSERT INTO web_results VALUES (?, ?)', (url, len(web_result)))
except Exception as e:
print(f"{url} generated an exception: {e}")
return False
else:
return True

async def main():
succeeded = await asyncio.gather(*[
extract_and_load(url)
for url in URLS
])

print(f"Successfully completed {sum(1 for result in succeeded if result)}")

asyncio.run(main())
```

### `concurrent.futures.ThreadPoolExecutor`

```python
import concurrent.futures
import requests
import sqlite3

URLS = [
'http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/',
]

def extract_and_load(url, timeout=30):
try:
web_result = requests.get(url, timeout=timeout).text
print(f"{url} is {len(web_result)} bytes")

with sqlite3.connect('example.db') as conn, conn as cursor:
cursor.execute('CREATE TABLE IF NOT EXISTS web_results (url text, length int);')
cursor.execute('INSERT INTO web_results VALUES (?, ?)', (url, len(web_result)))
except Exception as e:
print(f"{url} generated an exception: {e}")
return False
else:
return True

succeeded = []

with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_url = dict(
(executor.submit(extract_and_load, url), url)
for url in URLS
)

for future in concurrent.futures.as_completed(future_to_url):
succeeded.append(future.result())

print(f"Successfully completed {sum(1 for result in succeeded if result)}")
```

### `pylateral`

```python
import requests
import sqlite3

import pylateral

URLS = [
'http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/',
]

@pylateral.task(has_return_value=True)
def extract_and_load(url, timeout=30):
try:
web_result = requests.get(url, timeout=timeout).text
print(f"{url} is {len(web_result)} bytes")

with sqlite3.connect('example.db') as conn, conn as cursor:
cursor.execute('CREATE TABLE IF NOT EXISTS web_results (url text, length int);')
cursor.execute('INSERT INTO web_results VALUES (?, ?)', (url, len(web_result)))
except Exception as e:
print(f"{url} generated an exception: {e}")
return False
else:
return True

with pylateral.task_pool() as pool:
for url in URLS:
extract_and_load(url)

succeeded = pool.results

print(f"Successfully completed {sum(1 for result in succeeded if result)}")
```

### Unthreaded

```python
import requests
import sqlite3

URLS = [
'http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/',
]

def extract_and_load(url, timeout=30):
try:
web_result = requests.get(url, timeout=timeout).text
print(f"{url} is {len(web_result)} bytes")

with sqlite3.connect('example.db') as conn, conn as cursor:
cursor.execute('CREATE TABLE IF NOT EXISTS web_results (url text, length int);')
cursor.execute('INSERT INTO web_results VALUES (?, ?)', (url, len(web_result)))
except Exception as e:
print(f"{url} generated an exception: {e}")
return False
else:
return True

succeeded = [
extract_and_load(url)
for url in URLs
]

print(f"Successfully completed {sum(1 for result in succeeded if result)}")
```
69 changes: 69 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
pylateral
=========

**Simple multi-threaded task processing in python**

Example
-------

import urllib.request

import pylateral

@pylateral.task
def request_and_print(url):
response = urllib.request.urlopen(url)
print(response.read())
URLS = [
"https://www.nytimes.com/",
"https://www.cnn.com/",
"https://europe.wsj.com/",
"https://www.bbc.co.uk/",
"https://some-made-up-domain.com/",
]

with pylateral.task_pool():
for url in URLS:
request_and_print(url)

print("Complete!")

### What's going on here

- `def request_and_print(url)` is a *pylateral* task that, when called, is run on a task pool thread rather than on the main thread.

- `with pylateral.task_pool()` allocates threads and a task pool. The context manager may exit only when there are no remaining tasks.

- Each call to `request_and_print(url)` adds that task to the task pool. Meanwhile, the main thread continues execution.

- The `Complete!` statement is printed after all the `request_and_print()` task invocations are complete by the pool threads.

To learn more about the features of *pylateral*, check out the [usage](usage.md) section.

Background
----------

A couple of years ago, I inherited my company's codebase to get data into our data warehouse using an ELT approach (extract-and-loads done in python, transforms done in [dbt](https://www.getdbt.com/)/SQL). The codebase has dozens of python scripts to integrate first-party and third-party data from databases, FTPs, and APIs, which are run on a scheduler (typically daily or hourly). The scripts I inherited were single-threaded procedural scripts, looking like glue code, and spending most of their time in network I/O. This got my company pretty far!

As my team and I added more and more integrations with more and more data, we wanted to have faster and faster scripts to reduce our dev cycles and reduce our multi-hour nightly jobs to minutes. Because our scripts were network-bound, multi-threading was a good way to accomplish this, and so I looked into `concurrent.futures` and `asyncio`, but I decided against these options because:

1. It wasn't immediately apparently how to adapt my codebase to use these libraries without either some fundamental changes to our execution platform and/or reworking of our scripts from the ground up and/or adding significant lines of multi-threading code to each script.

2. I believe the procedural style glue code we have is quite easy to comprehend, which I think has a positive impact on the scale of supporting a wide-variety of programs.

And so, I designed *pylateral*, a simple interface to `concurrent.futures.ThreadPoolExecutor` for extract-and-load workloads. The design considerations of this interface include:

- The usage is minimally-invasive to the original un-threaded approach of my company's codebase. (And so, teaching the library has been fairly straightforward despite the multi-threaded paradigm shift.)

- The `@pylateral.task` decorator should be used to encapsulate a homogeneous method accepting different parameters. The contents of the method should be primarily I/O to achieve the concurrency gains of python multi-threading.

- If no `pylateral.pool` context manager has been entered, or if it has been disabled by an environment variable, the `@pylateral.task` decorator does nothing (and the code runs serially).

- While it's possible to return a value from a `@pylateral.task` method, I encourage my team to use the decorator to start-and-complete work; think of writing "embarrassingly parallel" methods that can be "mapped".

### Why not other libraries?

I think that *pylateral* meets an unmet need in python's concurrency eco-system: a simple way to gain the benefits of multi-threading without radically transforming either mindset or codebase.

That said, I don't think *pylateral* is a [silver bullet](https://en.wikipedia.org/wiki/No_Silver_Bullet). See my [comparison](comparison.md) of *pylateral* against other concurrency offerings.
6 changes: 6 additions & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mkdocs==1.0.4
markdown==3.2
mkdocs-exclude==1.0.2
mkdocs-material==4.6.2
markdown-include==0.5.1
pygments==2.5.2
Loading

0 comments on commit efa8df0

Please sign in to comment.