-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathconftest.py
341 lines (279 loc) · 10.6 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
import os
from pathlib import Path
import mock
import pytest
import vcr
from shared.config import ConfigHelper
from shared.storage.memory import MemoryStorageService
from shared.torngit import Github as GithubHandler
from sqlalchemy import event
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import Session
from sqlalchemy_utils import create_database, database_exists
from celery_config import initialize_logging
from database.base import Base
from database.engine import json_dumps
from helpers.environment import _get_cached_current_env
# @pytest.hookimpl(tryfirst=True)
def pytest_configure(config):
"""
Allows plugins and conftest files to perform initial configuration.
This hook is called for every plugin and initial conftest
file after command line options have been parsed.
"""
os.environ["CURRENT_ENVIRONMENT"] = "local"
os.environ["RUN_ENV"] = "DEV"
_get_cached_current_env.cache_clear()
initialize_logging()
def pytest_itemcollected(item):
"""logic that runs on the test collection step"""
if "codecov_vcr" in item.fixturenames:
# Tests with codecov_vcr fixtures are automatically 'integration'
item.add_marker("integration")
@pytest.fixture(scope="session")
def engine(request, sqlalchemy_connect_url, app_config):
"""Engine configuration.
See http://docs.sqlalchemy.org/en/latest/core/engines.html
for more details.
:sqlalchemy_connect_url: Connection URL to the database. E.g
postgresql://scott:tiger@localhost:5432/mydatabase
:app_config: Path to a ini config file containing the sqlalchemy.url
config variable in the DEFAULT section.
:returns: Engine instance
"""
if app_config:
from sqlalchemy import engine_from_config
engine = engine_from_config(app_config)
elif sqlalchemy_connect_url:
from sqlalchemy.engine import create_engine
engine = create_engine(sqlalchemy_connect_url, json_serializer=json_dumps)
else:
raise RuntimeError("Can not establish a connection to the database")
# Put a suffix like _gw0, _gw1 etc on xdist processes
xdist_suffix = getattr(request.config, "slaveinput", {}).get("slaveid")
if engine.url.database != ":memory:" and xdist_suffix is not None:
engine.url.database = "{}_{}".format(engine.url.database, xdist_suffix)
engine = create_engine(engine.url) # override engine
def fin():
print("Disposing engine")
engine.dispose()
request.addfinalizer(fin)
return engine
@pytest.fixture(scope="session")
def db(engine, sqlalchemy_connect_url):
database_url = sqlalchemy_connect_url
try:
if not database_exists(database_url):
create_database(database_url)
except OperationalError:
pytest.skip("No available db")
connection = engine.connect()
connection.execute("DROP SCHEMA IF EXISTS public CASCADE;")
connection.execute("CREATE SCHEMA public;")
Base.metadata.create_all(engine)
@pytest.fixture
def dbsession(db, engine):
connection = engine.connect()
connection_transaction = connection.begin()
# bind an individual Session to the connection
session = Session(bind=connection)
# start the session in a SAVEPOINT...
session.begin_nested()
# then each time that SAVEPOINT ends, reopen it
@event.listens_for(session, "after_transaction_end")
def restart_savepoint(session, transaction):
if transaction.nested and not transaction._parent.nested:
# ensure that state is expired the way
# session.commit() at the top level normally does
# (optional step)
session.expire_all()
session.begin_nested()
yield session
session.close()
connection_transaction.rollback()
connection.close()
@pytest.fixture
def mock_configuration(mocker):
m = mocker.patch("shared.config._get_config_instance")
mock_config = ConfigHelper()
m.return_value = mock_config
our_config = {
"bitbucket": {"bot": {"username": "codecov-io"}},
"services": {
"minio": {
"access_key_id": "codecov-default-key",
"bucket": "archive",
"hash_key": "88f572f4726e4971827415efa8867978",
"periodic_callback_ms": False,
"secret_access_key": "codecov-default-secret",
"verify_ssl": False,
},
"redis_url": "redis://redis:@localhost:6379/",
"smtp": {
"host": "mailhog",
"port": 1025,
"username": "username",
"password": "password",
},
},
"setup": {
"codecov_url": "https://codecov.io",
"encryption_secret": "zp^P9*i8aR3",
"telemetry": {
"endpoint_override": "abcde",
},
},
}
mock_config.set_params(our_config)
return mock_config
@pytest.fixture
def empty_configuration(mocker):
m = mocker.patch("shared.config._get_config_instance")
mock_config = ConfigHelper()
m.return_value = mock_config
return mock_config
@pytest.fixture
def codecov_vcr(request):
current_path = Path(request.node.fspath)
current_path_name = current_path.name.replace(".py", "")
cassete_path = current_path.parent / "cassetes" / current_path_name
if request.node.cls:
cls_name = request.node.cls.__name__
cassete_path = cassete_path / cls_name
current_name = request.node.name
casset_file_path = str(cassete_path / f"{current_name}.yaml")
with vcr.use_cassette(
casset_file_path,
record_mode="once",
filter_headers=["authorization"],
match_on=["method", "scheme", "host", "port", "path"],
) as cassete_maker:
yield cassete_maker
@pytest.fixture
def mock_redis(mocker):
m = mocker.patch("services.redis._get_redis_instance_from_url")
redis_server = mocker.MagicMock()
m.return_value = redis_server
yield redis_server
@pytest.fixture
def mock_storage(mocker):
m = mocker.patch("services.storage._cached_get_storage_client")
storage_server = MemoryStorageService({})
m.return_value = storage_server
yield storage_server
@pytest.fixture
def mock_smtp(mocker):
m = mocker.patch("services.smtp.SMTPService")
smtp_server = mocker.MagicMock()
m.return_value = smtp_server
yield smtp_server
@pytest.fixture
def mock_repo_provider(mocker):
m = mocker.patch("services.repository._get_repo_provider_service_instance")
provider_instance = mocker.MagicMock(
GithubHandler,
data={},
get_commit_diff=mock.AsyncMock(return_value={}),
get_distance_in_commits=mock.AsyncMock(
return_value={"behind_by": 0, "behind_by_commit": None}
),
)
m.return_value = provider_instance
yield provider_instance
@pytest.fixture
def mock_owner_provider(mocker):
provider_instance = mocker.MagicMock(GithubHandler)
def side_effect(*args, **kwargs):
provider_instance.data = {**kwargs}
return provider_instance
m = mocker.patch("services.owner._get_owner_provider_service_instance")
m.side_effect = side_effect
yield provider_instance
@pytest.fixture
def with_sql_functions(dbsession):
dbsession.execute(
"""CREATE FUNCTION array_append_unique(anyarray, anyelement) RETURNS anyarray
LANGUAGE sql IMMUTABLE
AS $_$
select case when $2 is null
then $1
else array_remove($1, $2) || array[$2]
end;
$_$;"""
)
dbsession.execute(
"""create or replace function try_to_auto_activate(int, int) returns boolean as $$
update owners
set plan_activated_users = (
case when coalesce(array_length(plan_activated_users, 1), 0) < plan_user_count -- we have credits
then array_append_unique(plan_activated_users, $2) -- add user
else plan_activated_users
end)
where ownerid=$1
returning (plan_activated_users @> array[$2]);
$$ language sql volatile strict;"""
)
dbsession.execute(
"""create or replace function get_gitlab_root_group(int) returns jsonb as $$
/* get root group by following parent_service_id to highest level */
with recursive tree as (
select o.service_id,
o.parent_service_id,
o.ownerid,
1 as depth
from owners o
where o.ownerid = $1
and o.service = 'gitlab'
and o.parent_service_id is not null
union all
select o.service_id,
o.parent_service_id,
o.ownerid,
depth + 1 as depth
from tree t
join owners o
on o.service_id = t.parent_service_id
/* avoid infinite loop in case of cycling (2 > 5 > 3 > 2 > 5...) up to Gitlab max subgroup depth of 20 */
where depth <= 20
), data as (
select t.ownerid,
t.service_id
from tree t
where t.parent_service_id is null
)
select to_jsonb(data) from data limit 1;
$$ language sql stable strict;"""
)
dbsession.flush()
# We don't want any tests submitting checkpoint logs to Sentry for real
@pytest.fixture(autouse=True)
def mock_checkpoint_submit(mocker, request):
# We mock sentry differently in the tests for CheckpointLogger
if request.node.get_closest_marker("real_checkpoint_logger"):
return
def mock_submit_fn(metric, start, end):
pass
mock_submit = mocker.Mock()
mock_submit.side_effect = mock_submit_fn
from helpers.checkpoint_logger import CheckpointLogger
return mocker.patch.object(
CheckpointLogger,
"submit_subflow",
mock_submit,
)
@pytest.fixture(autouse=True)
def mock_metric_context(mocker, request):
if request.node.get_closest_marker("real_metric_context"):
return
from helpers.telemetry import MetricContext
def populate(self):
self.populated = True
return mocker.patch.object(MetricContext, "populate", populate)
@pytest.fixture(autouse=True)
def mock_feature(mocker, request):
if request.node.get_closest_marker("real_feature"):
return
from shared.rollouts import Feature
def check_value(self, identifier, default=False):
return default
return mocker.patch.object(Feature, "check_value", check_value)