forked from snowflakedb/snowflake-connector-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
constants.py
128 lines (98 loc) · 3.4 KB
/
constants.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2018 Snowflake Computing Inc. All right reserved.
#
"""
Various constants
"""
from collections import defaultdict
from enum import Enum
from six import PY2
from collections import namedtuple
DBAPI_TYPE_STRING = 0
DBAPI_TYPE_BINARY = 1
DBAPI_TYPE_NUMBER = 2
DBAPI_TYPE_TIMESTAMP = 3
FIELD_TYPES = [
{'name': 'FIXED', 'dbapi_type': [DBAPI_TYPE_NUMBER]},
{'name': 'REAL', 'dbapi_type': [DBAPI_TYPE_NUMBER]},
{'name': 'TEXT', 'dbapi_type': [DBAPI_TYPE_STRING]},
{'name': 'DATE', 'dbapi_type': [DBAPI_TYPE_TIMESTAMP]},
{'name': 'TIMESTAMP', 'dbapi_type': [DBAPI_TYPE_TIMESTAMP]},
{'name': 'VARIANT', 'dbapi_type': [DBAPI_TYPE_BINARY]},
{'name': 'TIMESTAMP_LTZ', 'dbapi_type': [DBAPI_TYPE_TIMESTAMP]},
{'name': 'TIMESTAMP_TZ', 'dbapi_type': [DBAPI_TYPE_TIMESTAMP]},
{'name': 'TIMESTAMP_NTZ', 'dbapi_type': [DBAPI_TYPE_TIMESTAMP]},
{'name': 'OBJECT', 'dbapi_type': [DBAPI_TYPE_BINARY]},
{'name': 'ARRAY', 'dbapi_type': [DBAPI_TYPE_BINARY]},
{'name': 'BINARY', 'dbapi_type': [DBAPI_TYPE_BINARY]},
{'name': 'TIME', 'dbapi_type': [DBAPI_TYPE_TIMESTAMP]},
{'name': 'BOOLEAN', 'dbapi_type': []},
]
FIELD_NAME_TO_ID = defaultdict(int)
FIELD_ID_TO_NAME = defaultdict(unicode if PY2 else str) # noqa: F821
__binary_types = []
__binary_type_names = []
__string_types = []
__string_type_names = []
__number_types = []
__number_type_names = []
__timestamp_types = []
__timestamp_type_names = []
for idx, type in enumerate(FIELD_TYPES):
FIELD_ID_TO_NAME[idx] = type['name']
FIELD_NAME_TO_ID[type['name']] = idx
dbapi_types = type['dbapi_type']
for dbapi_type in dbapi_types:
if dbapi_type == DBAPI_TYPE_BINARY:
__binary_types.append(idx)
__binary_type_names.append(type['name'])
elif dbapi_type == DBAPI_TYPE_TIMESTAMP:
__timestamp_types.append(idx)
__timestamp_type_names.append(type['name'])
elif dbapi_type == DBAPI_TYPE_NUMBER:
__number_types.append(idx)
__number_type_names.append(type['name'])
elif dbapi_type == DBAPI_TYPE_STRING:
__string_types.append(idx)
__string_type_names.append(type['name'])
def get_binary_types():
return __binary_types
def is_binary_type_name(type_name):
return type_name in __binary_type_names
def get_string_types():
return __string_types
def is_string_type_name(type_name):
return type_name in __string_type_names
def get_number_types():
return __number_types
def is_number_type_name(type_name):
return type_name in __number_type_names
def get_timestamp_types():
return __timestamp_types
def is_timestamp_type_name(type_name):
return type_name in __timestamp_type_names
# Log format
LOG_FORMAT = (u'%(asctime)s - %(filename)s:%(lineno)d - '
u'%(funcName)s() - %(levelname)s - %(message)s')
# String literals
UTF8 = u'utf-8'
SHA256_DIGEST = u'sha256_digest'
class ResultStatus(Enum):
ERROR = u'ERROR'
UPLOADED = u'UPLOADED'
DOWNLOADED = u'DOWNLOADED'
COLLISION = u'COLLISION'
SKIPPED = u'SKIPPED'
RENEW_TOKEN = u'RENEW_TOKEN'
NOT_FOUND_FILE = u'NOT_FOUND_FILE'
NEED_RETRY = u'NEED_RETRY'
NEED_RETRY_WITH_LOWER_CONCURRENCY = u'NEED_RETRY_WITH_LOWER_CONCURRENCY'
FileHeader = namedtuple(
"FileReader", [
"digest",
"content_length",
"encryption_metadata"
]
)