forked from louisa-uno/dmarket_bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdmarketapi.py
368 lines (331 loc) · 16.4 KB
/
dmarketapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
import asyncio
import aiohttp
import requests
import json
from datetime import datetime
from asyncio import CancelledError
from typing import List, Union
from nacl.bindings import crypto_sign
from furl import furl
from config import API_URL, API_URL_TRADING, logger
from api.exceptions import *
from api.schemas import Balance, Games, LastSales, SalesHistory, MarketOffers, AggregatedTitle, \
UserTargets, ClosedTargets, Target, UserItems, CreateOffers, CreateOffersResponse, EditOffers, EditOffersResponse, \
DeleteOffers, CreateTargets, CumulativePrices, OfferDetails, OfferDetailsResponse, ClosedOffers
class DMarketApi:
def __init__(self, public_key: str, secret_key: str):
self.PUBLIC_KEY = public_key
self.SECRET_KEY = secret_key
self.SELL_FEE = 7
self.balance = 0
self.session = aiohttp.ClientSession()
async def close(self):
return await self.session.close()
def generate_headers(self, method: str, api_path: str, params: dict = None, body: dict = None) -> dict:
nonce = str(round(datetime.now().timestamp()))
string_to_sign = method + api_path
string_to_sign = str(furl(string_to_sign).add(params))
logger.debug(f"string_to_sign: {string_to_sign}")
if body:
string_to_sign += json.dumps(body)
string_to_sign += nonce
signature_prefix = "dmar ed25519 "
encoded = string_to_sign.encode('utf-8')
secret_bytes = bytes.fromhex(self.SECRET_KEY)
signature_bytes = crypto_sign(encoded, secret_bytes)
signature = signature_bytes[:64].hex()
headers = {
"X-Api-Key": self.PUBLIC_KEY,
"X-Request-Sign": signature_prefix + signature,
"X-Sign-Date": nonce
}
return headers
@staticmethod
def catch_exception(response_status: int, headers: dict, response_text: str):
if response_status == 400:
raise BadRequestError()
if response_status == 502 or response_status == 500:
raise BadGatewayError()
if response_status == 429:
raise TooManyRequests()
if response_status == 401:
raise BadAPIKeyException()
if response_status != 200 and 'application/json' not in headers['content-type']:
raise WrongResponseException(response_text)
async def validate_response(self, response: aiohttp.ClientResponse or requests.Response) -> dict:
"""
Checks the response for errors.
:param response: Received response.
:raises BadAPIKey: Bad api key used.
:return: JSON like dict from response.
"""
headers = dict(response.headers)
if 'RateLimit-Remaining' not in headers:
await asyncio.sleep(5)
if 'RateLimit-Remaining' in headers and headers['RateLimit-Remaining'] in ['1', '0']:
await asyncio.sleep(int(headers['RateLimit-Reset']))
if isinstance(response, requests.Response):
response_status = response.status_code
self.catch_exception(response_status, headers, response.text)
body = response.json()
else:
response_status = response.status
self.catch_exception(response_status, headers, response.text)
body = await response.json()
return body
async def api_call(self, url: str, method: str, headers: dict, params: dict = None, body: dict = None,
aio: bool = True) -> dict:
if not aio:
response = requests.get(url, params=params, headers=headers)
await asyncio.sleep(0.001)
return await self.validate_response(response)
if method == 'GET':
async with self.session.get(url, params=params, headers=headers) as response:
data = await self.validate_response(response)
return data
elif method == 'DELETE':
async with self.session.delete(url, params=params, json=body, headers=headers) as response:
data = await self.validate_response(response)
return data
else:
async with self.session.post(url, params=params, json=body, headers=headers) as response:
data = await self.validate_response(response)
return data
# ACCOUNT
# ----------------------------------------------------------------
async def user(self):
method = 'GET'
url_path = '/account/v1/user'
headers = self.generate_headers(method, url_path)
url = API_URL + url_path
response = await self.api_call(url, method, headers)
return response
async def get_balance(self):
method = 'GET'
url_path = '/account/v1/balance'
headers = self.generate_headers(method, url_path)
url = API_URL + url_path
response = await self.api_call(url, method, headers=headers)
if 'usd' in response:
self.balance = Balance(**response).usd
logger.debug(f'BALANCE: {self.balance}')
return self.balance
else:
logger.debug(f'{response}')
async def get_money_loop(self) -> None:
while True:
try:
logger.debug('get money_loop')
await self.get_balance()
logger.debug('get money_loop sleep')
await asyncio.sleep(60*5)
except KeyboardInterrupt:
break
except CancelledError:
break
except Exception as e:
logger.error(f' Failed to get balance: {e}. Sleep for 5 seconds.')
await asyncio.sleep(5)
continue
return
# MARKET METHODS
# ------------------------------------------------------------------
async def last_sales(self, item_name: str, game: Games = Games.RUST) -> LastSales:
"""Method for receiving and processing a response for recent sales."""
method = 'GET'
params = {'gameId': game.value, 'title': item_name}
url_path = '/trade-aggregator/v1/last-sales'
headers = self.generate_headers(method, url_path, params)
url = API_URL_TRADING + url_path
response = await self.api_call(url, method, headers, params)
return LastSales(**response)
async def sales_history(self, item_name: str, game: Games = Games.RUST, currency: str = 'USD',
period: str = '1M') -> SalesHistory:
"""Method for receiving and processing a response for recent sales."""
method = 'GET'
params = {'GameID': game.value, 'Title': item_name, 'Currency': currency, 'Period': period}
url_path = '/marketplace-api/v1/sales-history'
headers = self.generate_headers(method, url_path, params)
url = API_URL_TRADING + url_path
response = await self.api_call(url, method, headers, params)
return SalesHistory(**response)
async def market_offers(self, game: Games = Games.RUST, name: str = '', limit: int = 100, offset: int = 0,
orderby: str = 'price', orderdir: str = 'asc', tree_filters: str = '',
currency: str = 'USD', price_from: int = 0, price_to: int = 0, types: str = 'dmarket',
cursor: str = '') -> MarketOffers:
method = 'GET'
url_path = '/exchange/v1/market/items'
params = {'gameId': game.value, 'title': name, 'limit': limit, 'orderBy': orderby, 'currency': currency,
'offset': offset, 'orderDir': orderdir, 'treeFilters': tree_filters, 'priceFrom': price_from,
'priceTo': price_to, 'types': types, 'cursor': cursor}
headers = self.generate_headers(method, url_path, params)
url = API_URL + url_path
response = await self.api_call(url, method, headers, params)
return MarketOffers(**response)
async def agregated_prices(self, names: List[str], limit: int = 100, offset: str = None) -> List[AggregatedTitle]:
method = "GET"
url_path = '/price-aggregator/v1/aggregated-prices'
if len(names) > 100:
addiction_items = await self.agregated_prices(names[100:])
else:
addiction_items = []
params = {'Titles': names[:100], 'Limit': limit}
if offset:
params['Offset'] = offset
headers = self.generate_headers(method, url_path, params)
url = API_URL + url_path
response = await self.api_call(url, method, headers, params, aio=False)
return [AggregatedTitle(**i) for i in response['AggregatedTitles']] + addiction_items
async def offers_by_title(self, name: str, limit: int = 100, cursor: str = '') -> MarketOffers:
method = 'GET'
url_path = '/exchange/v1/offers-by-title'
params = {'Title': name, 'Limit': limit, 'Cursor': cursor}
headers = self.generate_headers(method, url_path, params)
url = API_URL + url_path
response = await self.api_call(url, method, headers, params)
return MarketOffers(**response)
async def user_targets(self, game: Games = Games.RUST, price_from: float = None, price_to: float = None,
title: str = None, target_id: str = None, status: str = 'TargetStatusActive',
limit: str = '100', cursor: str = '', currency: str = 'USD'):
method = 'GET'
url_path = '/marketplace-api/v1/user-targets'
params = {'BasicFilters.Status': status, 'GameId': game.value, 'BasicFilters.Currency': currency,
'Limit': limit, 'SortType': 'UserTargetsSortTypeDefault'}
if price_from:
params['BasicFilters.PriceFrom'] = price_from
if price_to:
params['BasicFilters.PriceTo'] = price_to
if title:
params['BasicFilters.Title'] = title
if target_id:
params['BasicFilters.TargetID'] = target_id
if cursor:
params['Cursor'] = cursor
headers = self.generate_headers(method, url_path, params)
url = API_URL + url_path
response = await self.api_call(url, method, headers, params)
logger.debug(f"user_targets() count: {len(response['Items'])}")
logger.debug(f"{response['Items'][0].items()}")
return UserTargets(**response)
async def closed_targets(self, limit: str = '100', order_dir: str = 'desc') -> ClosedTargets:
logger.debug(f"closed_targets()")
method = 'GET'
url_path = '/marketplace-api/v1/user-targets/closed'
params = {'Limit': limit, 'OrderDir': order_dir}
headers = self.generate_headers(method, url_path, params)
url = API_URL + url_path
logger.debug(f"{url}")
logger.debug(f"{params}")
response = await self.api_call(url, method, headers, params)
# logger.debug(f"{response}")
logger.debug(f"closed_targets() count: {len(response['Trades'])}")
logger.debug(f"{response['Trades'][0].items()}")
return ClosedTargets(**response)
async def create_target(self, body: CreateTargets):
method = 'POST'
url_path = '/marketplace-api/v1/user-targets/create'
headers = self.generate_headers(method, url_path, body=body.dict())
url = API_URL + url_path
response = await self.api_call(url, method, headers, body=body.dict())
return response
async def delete_target(self, targets: List[Target]):
method = 'POST'
url_path = '/marketplace-api/v1/user-targets/delete'
if len(targets) > 150:
addiction_items = await self.delete_target(targets[150:])
else:
addiction_items = []
targets = [{'TargetID': i.TargetID} for i in targets[:150]]
body = {"Targets": targets}
headers = self.generate_headers(method, url_path, body=body)
url = API_URL + url_path
# logger.write(f'delete_targets() {url}', 'debug')
response = await self.api_call(url, method, headers, body=body)
return response['Result'] + addiction_items
async def cumulative_price(self, name: str, game: str):
method = 'GET'
url_path = '/marketplace-api/v1/cumulative-price-levels'
params = {'Title': name, 'GameID': game}
headers = self.generate_headers(method, url_path, params)
url = API_URL + url_path
response = await self.api_call(url, method, headers, params)
return CumulativePrices(**response)
# SELL ITEMS
# ---------------------------------------------
async def user_inventory(self, game: Games = Games.RUST, in_market: bool = 'true', limit: str = '100') -> UserItems:
method = 'GET'
url_path = '/marketplace-api/v1/user-inventory'
params = {'GameID': game.value, 'BasicFilters.InMarket': in_market, 'Limit': limit}
headers = self.generate_headers(method, url_path, params=params)
url = API_URL + url_path
response = await self.api_call(url, method, headers, params=params)
return UserItems(**response)
async def user_items(self, game: Games = Games.RUST) -> MarketOffers:
method = 'GET'
url_path = '/exchange/v1/user/items'
params = {'GameId': game.value, 'currency': 'USD', 'limit': '50'}
headers = self.generate_headers(method, url_path, params)
url = API_URL + url_path
response = await self.api_call(url, method, headers, params=params)
# logger.debug(response)
return MarketOffers(**response)
async def user_offers(self, game: Games = Games.RUST, status: str = 'OfferStatusDefault',
sort_type: str = 'UserOffersSortTypeDateNewestFirst', limit: str = '20'):
method = 'GET'
url_path = '/marketplace-api/v1/user-offers'
params = {'GameId': game.value, 'Status': status, 'Limit': limit, 'SortType': sort_type}
headers = self.generate_headers(method, url_path, params)
url = API_URL + url_path
response = await self.api_call(url, method, headers, params=params)
return UserItems(**response)
async def user_offers_closed(self, game: Games = Games.RUST, limit: str = '20'):
method = 'GET'
url_path = '/marketplace-api/v1/user-offers/closed'
params = {'GameId': game.value, 'Limit': limit, 'OrderDir': "desc"}
headers = self.generate_headers(method, url_path, params)
url = API_URL + url_path
response = await self.api_call(url, method, headers, params=params)
return ClosedOffers(**response)
async def user_offers_closed(self, game: Games = Games.RUST, limit: str = '20'):
method = 'GET'
url_path = '/marketplace-api/v1/user-offers/closed'
params = {'GameId': game.value, 'Limit': limit, 'OrderDir': "desc"}
headers = self.generate_headers(method, url_path, params)
url = API_URL + url_path
response = await self.api_call(url, method, headers, params=params)
return ClosedOffers(**response)
async def user_offers_create(self, body: CreateOffers):
method = 'POST'
url_path = '/marketplace-api/v1/user-offers/create'
body = body.model_dump()
headers = self.generate_headers(method, url_path, body=body)
logger.debug(f'user_offers_create() body: {body}')
url = API_URL + url_path
response = await self.api_call(url, method, headers, body=body)
logger.debug(f'user_offers_create() response: {response}')
return CreateOffersResponse(**response)
async def user_offers_edit(self, body: EditOffers):
method = 'POST'
url_path = '/marketplace-api/v1/user-offers/edit'
body = body.model_dump()
headers = self.generate_headers(method, url_path, body=body)
url = API_URL + url_path
response = await self.api_call(url, method, headers, body=body)
return EditOffersResponse(**response)
async def user_offers_delete(self, body: DeleteOffers):
method = 'DELETE'
url_path = '/exchange/v1/offers'
body = body.dict()
headers = self.generate_headers(method, url_path, body=body)
url = API_URL + url_path
response = await self.api_call(url, method, headers, body=body)
return response
async def user_offers_details(self, body: OfferDetails):
method = 'POST'
url_path = '/exchange/v1/offers/details'
body = body.dict()
headers = self.generate_headers(method, url_path, body=body)
url = API_URL + url_path
response = await self.api_call(url, method, headers, body=body)
return OfferDetailsResponse(**response)
#return response