Skip to content

Commit

Permalink
Merge pull request #951 from Johann-PLW/main
Browse files Browse the repository at this point in the history
New `get_sqlite_db_records(path, query)` function
  • Loading branch information
Johann-PLW authored Nov 24, 2024
2 parents 61e4149 + e056cf8 commit b234253
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 18 deletions.
35 changes: 17 additions & 18 deletions scripts/artifacts/subscriberInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,39 @@
}


from scripts.ilapfuncs import artifact_processor, open_sqlite_db_readonly, convert_ts_human_to_timezone_offset, device_info
from scripts.ilapfuncs import artifact_processor, get_sqlite_db_records, convert_cocoa_core_data_ts_to_utc, device_info

@artifact_processor
def subscriberInfo(files_found, report_folder, seeker, wrap_text, timezone_offset):
data_list = []
db_file = ''
db_records = []

query = '''
SELECT
last_update_time,
slot_id,
subscriber_id,
subscriber_mdn
FROM subscriber_info
'''

for file_found in files_found:
if file_found.endswith('CellularUsage.db'):
db_file = file_found
db_records = get_sqlite_db_records(db_file, query)
break

with open_sqlite_db_readonly(db_file) as db:
cursor = db.cursor()
cursor.execute('''
SELECT
datetime(last_update_time+978307200,'unixepoch'),
slot_id,
subscriber_id,
subscriber_mdn
FROM subscriber_info
''')

all_rows = cursor.fetchall()

for row in all_rows:
last_update_time = convert_ts_human_to_timezone_offset(row[0], timezone_offset)
data_list.append((last_update_time, row[1], row[2], row[3]))
device_info("Cellular", "SIM Cards", f"Slot {row[1]} -> ICCID: {row[2]} | MSISDN: {row[3]} | Last Update: {last_update_time}", db_file)
for record in db_records:
last_update_time = convert_cocoa_core_data_ts_to_utc(record[0])
data_list.append((last_update_time, record[1], record[2], record[3]))
device_info("Cellular", "SIM Cards", f"Slot {record[1]} -> ICCID: {record[2]} | MSISDN: {record[3]} | Last Update: {last_update_time}", db_file)

data_headers = (
('Last update time', 'datetime'),
'Slot ID',
'ICCID',
'MSISDN',
)

return data_headers, data_list, db_file
26 changes: 26 additions & 0 deletions scripts/ilapfuncs.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ def convert_ts_int_to_timezone(time, time_offset):
#return the converted value
return timezone_time

def convert_cocoa_core_data_ts_to_utc(cocoa_core_data_ts):
unix_timestamp = cocoa_core_data_ts + 978307200
finaltime = datetime.fromtimestamp(unix_timestamp, tz=timezone.utc)
return(finaltime)

def webkit_timestampsconv(webkittime):
unix_timestamp = webkittime + 978307200
finaltime = datetime.fromtimestamp(unix_timestamp, tz=timezone.utc)
Expand Down Expand Up @@ -357,6 +362,27 @@ def open_sqlite_db_readonly(path):
path = "%5C%5C%3F%5C" + path
return sqlite3.connect(f"file:{path}?mode=ro", uri=True)

def get_sqlite_db_records(path, query):
'''Opens an sqlite db in read-only mode, so original db (and -wal/journal are intact)'''
if is_platform_windows():
if path.startswith('\\\\?\\UNC\\'): # UNC long path
path = "%5C%5C%3F%5C" + path[4:]
elif path.startswith('\\\\?\\'): # normal long path
path = "%5C%5C%3F%5C" + path[4:]
elif path.startswith('\\\\'): # UNC path
path = "%5C%5C%3F%5C\\UNC" + path[1:]
else: # normal path
path = "%5C%5C%3F%5C" + path
try:
with sqlite3.connect(f"file:{path}?mode=ro", uri=True) as db:
cursor = db.cursor()
cursor.execute(query)
records = cursor.fetchall()
return records
except sqlite3.OperationalError as e:
logfunc(f"Error with {path}:")
logfunc(f" - {str(e)}")
return []

def does_column_exist_in_db(db, table_name, col_name):
'''Checks if a specific col exists'''
Expand Down

0 comments on commit b234253

Please sign in to comment.