forked from benjamin-lowry/steamarchiver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdepot_extractor.py
169 lines (162 loc) · 7.92 KB
/
depot_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python3
from argparse import ArgumentParser
from binascii import hexlify
from datetime import datetime
from fnmatch import fnmatch
from glob import glob
from hashlib import sha1
from io import BytesIO
from os import makedirs, remove
from os.path import dirname, exists
from pathlib import Path
from struct import unpack
from sys import argv
from zipfile import ZipFile
import lzma
if __name__ == "__main__": # exit before we import our shit if the args are wrong
parser = ArgumentParser(description='Extract downloaded depots.')
parser.add_argument('depotid', type=int)
parser.add_argument('manifestid', type=int)
parser.add_argument('depotkey', type=str, nargs='?')
parser.add_argument('-d', dest="dry_run", help="dry run: verify chunks without extracting", action="store_true")
parser.add_argument('-f', dest="files", help="List files to extract (can be used multiple times); if ommitted, all files will be extracted. Glob matching supported.", action="append")
parser.add_argument('-b', dest="backup", help="Path to a .csd backup file to extract (the manifest must also be present in the depots folder)", nargs='?')
parser.add_argument('--dest', help="directory to place extracted files in", type=str, default="extract")
args = parser.parse_args()
from steam.core.manifest import DepotManifest
from steam.core.crypto import symmetric_decrypt
from chunkstore import Chunkstore
if __name__ == "__main__":
path = "./depots/%s/" % args.depotid
keyfile = "./keys/%s.depotkey" % args.depotid
manifest = None
with open(path + "%s.zip" % args.manifestid, "rb") as f:
manifest = DepotManifest(f.read())
if args.depotkey:
args.depotkey = bytes.fromhex(args.depotkey)
if manifest.filenames_encrypted:
manifest.decrypt_filenames(args.depotkey)
elif manifest.filenames_encrypted:
## Using No-Intro's DepotKey format, which is
## a 32-byte/256-bit binary file.
## Examples require login to No-Intro to view.
if exists(keyfile):
with open(keyfile, "rb") as f:
args.depotkey = f.read()
manifest.decrypt_filenames(args.depotkey)
## If depotkey is not found, locate depot_keys.txt
## and check if key is located in there.
elif exists("./depot_keys.txt"):
with open("./depot_keys.txt", "r", encoding="utf-8") as f:
for line in f.read().split("\n"):
line = line.split("\t")
try:
if int(line[0]) == args.depotid:
args.depotkey = bytes.fromhex(line[2])
manifest.decrypt_filenames(args.depotkey)
break
except ValueError:
pass
if not args.depotkey:
print("ERROR: manifest has encrypted filenames, but no depot key was specified and no key for this depot exists in depot_keys.txt")
exit(1)
else:
print("ERROR: manifest has encrypted filenames, but no depot key was specified and no depot_keys.txt exists")
exit(1)
def is_match(file):
for pattern in args.files:
if fnmatch(file.filename, pattern): return True
return False
if args.backup:
chunkstores = {}
chunks_by_store = {}
for csm in glob(args.backup.replace("_1.csm","").replace("_1.csd","") + "_*.csm"):
chunkstore = Chunkstore(csm)
chunkstore.unpack()
for chunk, _ in chunkstore.chunks.items():
chunks_by_store[chunk] = csm
chunkstores[csm] = chunkstore
for file in manifest.iter_files():
if args.files and not is_match(file): continue
target = args.dest + "/" + dirname(file.filename)
if not args.dry_run:
try:
makedirs(target, exist_ok=True)
except FileExistsError:
remove(target)
makedirs(target, exist_ok=True)
except NotADirectoryError:
# bruh
while True:
try:
remove(Path(target).parent)
except IsADirectoryError:
pass
try:
makedirs(target, exist_ok=True)
except NotADirectoryError or FileExistsError:
continue
break
try:
for chunk in sorted(file.chunks, key = lambda chunk: chunk.offset):
chunkhex = hexlify(chunk.sha).decode()
if args.backup:
chunk_data = None
is_encrypted = False
try:
chunkstore = chunkstores[chunks_by_store[chunk.sha]]
chunk_data = chunkstore.get_chunk(chunk.sha)
is_encrypted = chunkstore.is_encrypted
except:
print("missing chunk " + hexlify(chunk.sha).decode())
breakpoint()
continue
if is_encrypted:
if args.depotkey:
decrypted = symmetric_decrypt(chunk_data, args.depotkey)
else:
print("ERROR: chunk %s is encrypted, but no depot key was specified" % chunkhex)
exit(1)
else:
decrypted = chunk_data
chunk_data = None
else:
if exists(path + chunkhex):
with open(path + chunkhex, "rb") as chunkfile:
if args.depotkey:
decrypted = symmetric_decrypt(chunkfile.read(), args.depotkey)
else:
print("ERROR: chunk %s is encrypted, but no depot key was specified" % chunkhex)
exit(1)
elif exists(path + chunkhex + "_decrypted"):
with open(path + chunkhex + "_decrypted", "rb") as chunkfile:
decrypted = chunkfile.read()
else:
print("missing chunk " + chunkhex)
continue
decompressed = None
if decrypted[:2] == b'VZ': # LZMA
if args.dry_run:
print("Testing", file.filename, "(LZMA) from chunk", chunkhex)
else:
print("Extracting", file.filename, "(LZMA) from chunk", chunkhex)
decompressed = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[lzma._decode_filter_properties(lzma.FILTER_LZMA1, decrypted[7:12])]).decompress(decrypted[12:-9])[:chunk.cb_original]
elif decrypted[:2] == b'PK': # Zip
if args.dry_run:
print("Testing", file.filename, "(Zip) from chunk", chunkhex)
else:
print("Extracting", file.filename, "(Zip) from chunk", chunkhex)
zipfile = ZipFile(BytesIO(decrypted))
decompressed = zipfile.read(zipfile.filelist[0])
else:
print("ERROR: unknown archive type", decrypted[:2].decode())
exit(1)
sha = sha1(decompressed)
if sha.digest() != chunk.sha:
print("ERROR: sha1 checksum mismatch (expected %s, got %s)" % (hexlify(chunk.sha).decode(), sha.hexdigest()))
if not args.dry_run:
with open(args.dest + "/" + file.filename, "ab") as f:
f.seek(chunk.offset)
f.write(decompressed)
except IsADirectoryError:
pass