-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbulk_updates.py
64 lines (51 loc) · 1.58 KB
/
bulk_updates.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""This series of tests illustrates different ways to UPDATE a large number
of rows in bulk.
"""
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from profiler import Profiler
Base = declarative_base()
engine = None
class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True)
name = Column(String(255))
description = Column(String(255))
Profiler.init("bulk_updates", num=100000)
@Profiler.setup
def setup_database(dburl, echo, num):
global engine
engine = create_engine(dburl, echo=echo)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
s = Session(engine)
for chunk in range(0, num, 10000):
s.bulk_insert_mappings(
Customer,
[
{
"name": "customer name %d" % i,
"description": "customer description %d" % i,
}
for i in range(chunk, chunk + 10000)
],
)
s.commit()
@Profiler.profile
def test_orm_flush(n):
"""UPDATE statements via the ORM flush process."""
session = Session(bind=engine)
for chunk in range(0, n, 1000):
customers = (
session.query(Customer)
.filter(Customer.id.between(chunk, chunk + 1000))
.all()
)
for customer in customers:
customer.description += "updated"
session.flush()
session.commit()