Skip to content

Commit

Permalink
KSM-495 Added query option to ksm secret list command (#569)
Browse files Browse the repository at this point in the history
  • Loading branch information
idimov-keeper authored Apr 12, 2024
1 parent 53a1906 commit b838826
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,11 @@ def secret_command(ctx):
@click.option('--uid', '-u', type=str, multiple=True, help='List specific records by Record UID', cls=Mutex, not_required_if=[('folder',)])
@click.option('--folder', '-f', type=str, help='List only records in specified folder UID')
@click.option('--recursive', '-r', is_flag=True, help='List recursively all records including subfolders of the folder UID')
@click.option('--query', '-q', type=str, help='List records matching the JSONPath query')
@click.option('--show-value', '-v', is_flag=True, help='Print matching value instead of record title')
@click.option('--json', is_flag=True, help='Return secret as JSON')
@click.pass_context
def secret_list_command(ctx, uid, folder, recursive, json):
def secret_list_command(ctx, uid, folder, recursive, query, show_value, json):
"""List all secrets"""

output = "text"
Expand All @@ -546,6 +548,8 @@ def secret_list_command(ctx, uid, folder, recursive, json):
uids=uid,
folder=folder,
recursive=recursive,
query=query,
show_value=show_value,
output_format=output,
use_color=ctx.obj["cli"].use_color
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def _record_to_dict(self, record, load_references=False, unmask=False, use_color
"uid": record.uid,
"title": record.title,
"type": record.type,
"notes": record.dict.get("notes", ""),
"fields": standard_fields,
"custom_fields": custom_fields,
"files": [{
Expand Down Expand Up @@ -359,6 +360,20 @@ def _query_jsonpath(self, jsonpath_query, records, force_array, raw):
except Exception as err:
raise KsmCliException("JSONPath failed: {}".format(err))

def _query_jsonpath_list(self, jsonpath_query, records):
record_list = Secret._adjust_records(records, True)
try:
results = []
jpe = parse(jsonpath_query)
for rec in record_list:
val = jpe.find(rec)
if val and val[0].value:
rec["query_result"] = val[0].value[0] if isinstance(val[0].value, list) else val[0].value
results.append(rec)
return results
except Exception as err:
raise KsmCliException(f"JSONPath failed: {err}") from err

def query(self, uids=None, folder=None, recursive=False, titles=None, field=None,
output_format='json', jsonpath_query=None, force_array=False,
load_references=False, unmask=False, use_color=None, inflate=True, raw=False):
Expand Down Expand Up @@ -439,27 +454,50 @@ def query(self, uids=None, folder=None, recursive=False, titles=None, field=None
use_color=use_color)

@staticmethod
def _format_list(record_dict, use_color=True):
def _format_list(record_dict, use_color=True, columns=None):
table = Table(use_color=use_color)
table.add_column("UID", data_color=Fore.GREEN)
table.add_column("Record Type")
table.add_column("Title", data_color=Fore.YELLOW)
for record in record_dict:
table.add_row([record["uid"], record["type"], record["title"]])
if columns:
for col in columns:
table.add_column(col[1], data_color=col[2])
for record in record_dict:
table.add_row([record.get(x[0], "") for x in columns])
else:
table.add_column("UID", data_color=Fore.GREEN)
table.add_column("Record Type")
table.add_column("Title", data_color=Fore.YELLOW)
for record in record_dict:
table.add_row([record["uid"], record["type"], record["title"]])
return "\n" + table.get_string() + "\n"

def secret_list(self, uids=None, folder=None, recursive=False, output_format='json', use_color=None):
def secret_list(self, uids=None, folder=None, recursive=False, query=None, show_value=False, output_format='json', use_color=None):

if use_color is None:
use_color = self.cli.user_color

record_dict = self.query(uids=uids, folder=folder, recursive=recursive, output_format='dict', unmask=True, use_color=use_color)
if output_format == 'text':
self.cli.output(self._format_list(record_dict, use_color=use_color))
elif output_format == 'json':
records = [{"uid": x.get("uid"), "title": x.get("title"), "record_type": x.get("type")}
for x in record_dict]
self.cli.output(json.dumps(records, indent=4))
loadrefs = True if query else False # to load fields[] and custom[]
record_dict = self.query(uids=uids, folder=folder, recursive=recursive, output_format='dict', load_references=loadrefs, unmask=True, use_color=use_color)
if query:
items = self._query_jsonpath_list(query, record_dict)
if output_format == 'text':
columns = [("uid", "UID", Fore.GREEN), ("type", "Record Type", Style.RESET_ALL), ("query_result", "Value", Fore.YELLOW)] if show_value else []
self.cli.output(self._format_list(items, use_color=use_color, columns=columns))
elif output_format == 'json':
records = [{
"uid": x.get("uid", ""),
"record_type": x.get("type", ""),
"title": x.get("title", ""),
"value": x.get("query_result", "")}
for x in items]
if not show_value:
records = [{k: v for k, v in x.items() if k != "value"} for x in records]
self.cli.output(json.dumps(records, indent=4))
else:
if output_format == 'text':
self.cli.output(self._format_list(record_dict, use_color=use_color))
elif output_format == 'json':
records = [{"uid": x.get("uid"), "title": x.get("title"), "record_type": x.get("type")}
for x in record_dict]
self.cli.output(json.dumps(records, indent=4))

def download(self, uid, name, file_uid, file_output, create_folders=False):

Expand Down

0 comments on commit b838826

Please sign in to comment.