Skip to content

Commit

Permalink
Python SDK: Refactoring response from the server and adding warning l…
Browse files Browse the repository at this point in the history
…ogs (#215)
  • Loading branch information
maksimu authored Feb 2, 2022
1 parent ce7be60 commit a0b1bda
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 12 deletions.
45 changes: 34 additions & 11 deletions sdk/python/core/keeper_secrets_manager_core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from keeper_secrets_manager_core import utils, helpers
from keeper_secrets_manager_core.configkeys import ConfigKeys
from keeper_secrets_manager_core.crypto import CryptoUtils
from keeper_secrets_manager_core.dto.dtos import Folder, Record, RecordCreate
from keeper_secrets_manager_core.dto.dtos import Folder, Record, RecordCreate, SecretsManagerResponse, AppData
from keeper_secrets_manager_core.dto.payload import GetPayload, UpdatePayload, TransmissionKey, \
EncryptedPayload, KSMHttpResponse, CreatePayload
from keeper_secrets_manager_core.exceptions import KeeperError
Expand Down Expand Up @@ -494,11 +494,31 @@ def fetch_and_decrypt_secrets(self, record_filter=None):

self.logger.debug("Total record count: {}".format(len(records)))

return {
'records': records,
'folders': shared_folders,
'justBound': just_bound
}
sm_response = SecretsManagerResponse()

if 'appData' in decrypted_response_dict:
app_data_json = CryptoUtils.decrypt_aes(
url_safe_str_to_bytes(decrypted_response_dict['appData']),
base64_to_bytes(self.config.get(ConfigKeys.KEY_APP_KEY))
)

app_data_dict = utils.json_to_dict(app_data_json)

sm_response.appData = AppData(title=app_data_dict['title'], app_type=app_data_dict['type'])
else:
sm_response.appData = AppData()

if 'expiresOn' in decrypted_response_dict:
sm_response.expiresOn = decrypted_response_dict.get('expiresOn')

if 'warnings' in decrypted_response_dict:
sm_response.warnings = decrypted_response_dict.get('warnings')

sm_response.records = records
sm_response.folders = shared_folders
sm_response.justBound = just_bound

return sm_response

def get_secrets(self, uids=None, full_response=False):
"""
Expand All @@ -510,15 +530,19 @@ def get_secrets(self, uids=None, full_response=False):

records_resp = self.fetch_and_decrypt_secrets(uids)

just_bound = records_resp.get('justBound')

if just_bound:
if records_resp.justBound:
records_resp = self.fetch_and_decrypt_secrets(uids)

# Log warnings we got from the server
# Will only be displayed if logging is enabled:
if records_resp.warnings:
for warning in records_resp.warnings:
self.logger.warning(warning)

if full_response:
return records_resp
else:
records = records_resp.get('records') or []
records = records_resp.records or []

return records

Expand Down Expand Up @@ -562,7 +586,6 @@ def create_secret(self, folder_uid, record_data):
'folder folder that you know exists, make sure that at least one record is present in '
'the prior to adding a record to the folder.')


payload = SecretsManager.prepare_create_payload(self.config, folder_uid, record_data_json_str, found_folder.key)

self._post_query('create_secret', payload)
Expand Down
33 changes: 33 additions & 0 deletions sdk/python/core/keeper_secrets_manager_core/dto/dtos.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# Contact: [email protected]
import json
import os
from datetime import datetime

import requests
from keeper_secrets_manager_core.crypto import CryptoUtils
Expand Down Expand Up @@ -411,3 +412,35 @@ def to_json(self):

json_object = json.dumps(self.to_dict(), indent=4)
return json_object


class AppData:
"""
Application info
"""
def __init__(self, title="", app_type=""):
self.title = title
self.app_type = app_type


class SecretsManagerResponse:

"""
Server response contained details about the application and the records
that were requested to be returned
"""
def __init__(self):
self.appData = None
# self.encryptedAppKey = None
# self.appOwnerPublicKey = None
self.folders = None
self.records = None
self.expiresOn = None
self.warnings = None
self.justBound = False

def expires_on_str(self, date_format='%Y-%m-%d %H:%M:%S'):
"""
Retrieve string formatted expiration date
"""
return datetime.fromtimestamp(self.expiresOn/1000).strftime(date_format)
2 changes: 1 addition & 1 deletion sdk/python/core/keeper_secrets_manager_core/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def obj_to_dict(obj):

def get_folder_key(folder_uid, secrets_and_folders):

folders = secrets_and_folders.get('folders')
folders = secrets_and_folders.folders

for f in folders:
if f.uid == folder_uid:
Expand Down

0 comments on commit a0b1bda

Please sign in to comment.