forked from Jimbles/RPi_Photobooth
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdropboxsync.py
254 lines (223 loc) · 8.49 KB
/
dropboxsync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
"""Upload the contents of your Downloads folder to Dropbox.
This is an example app for API v2.
"""
from __future__ import print_function
import argparse
import contextlib
import datetime
import os
import six
import sys
import time
import unicodedata
import config
if sys.version.startswith('2'):
input = raw_input
import dropbox
from dropbox.files import FileMetadata, FolderMetadata
# OAuth2 access token. TODO: login etc.
TOKEN = config.dropbox_TOKEN
dbx_folder = 'photoboothbackup'
foldertobackup = config.backup_path
parser = argparse.ArgumentParser(description='Sync ~/Downloads to Dropbox')
parser.add_argument('folder', nargs='?', default=dbx_folder,
help='Folder name in your Dropbox')
parser.add_argument('rootdir', nargs='?', default=foldertobackup,
help='Local directory to upload')
parser.add_argument('--token', default=TOKEN,
help='Access token '
'(see https://www.dropbox.com/developers/apps)')
parser.add_argument('--yes', '-y', action='store_true',
help='Answer yes to all questions')
parser.add_argument('--no', '-n', action='store_true',
help='Answer no to all questions')
parser.add_argument('--default', '-d', action='store_true',
help='Take default answer on all questions')
def main():
"""Main program.
Parse command line, then iterate over files and directories under
rootdir and upload all files. Skips some temporary files and
directories, and avoids duplicate uploads by comparing size and
mtime with the server.
"""
args = parser.parse_args()
if sum([bool(b) for b in (args.yes, args.no, args.default)]) > 1:
print('At most one of --yes, --no, --default is allowed')
sys.exit(2)
if not args.token:
print('--token is mandatory')
sys.exit(2)
folder = args.folder
rootdir = os.path.expanduser(args.rootdir)
print('Dropbox folder name:', folder)
print('Local directory:', rootdir)
if not os.path.exists(rootdir):
print(rootdir, 'does not exist on your filesystem')
sys.exit(1)
elif not os.path.isdir(rootdir):
print(rootdir, 'is not a foldder on your filesystem')
sys.exit(1)
dbx = dropbox.Dropbox(args.token)
for dn, dirs, files in os.walk(rootdir):
subfolder = dn[len(rootdir):].strip(os.path.sep)
listing = list_folder(dbx, folder, subfolder)
print('Descending into', subfolder, '...')
# First do all the files.
for name in files:
fullname = os.path.join(dn, name)
if not isinstance(name, six.text_type):
name = name.decode('utf-8')
nname = unicodedata.normalize('NFC', name)
if name.startswith('.'):
print('Skipping dot file:', name)
elif name.startswith('@') or name.endswith('~'):
print('Skipping temporary file:', name)
elif name.endswith('.pyc') or name.endswith('.pyo'):
print('Skipping generated file:', name)
elif nname in listing:
md = listing[nname]
mtime = os.path.getmtime(fullname)
mtime_dt = datetime.datetime(*time.gmtime(mtime)[:6])
size = os.path.getsize(fullname)
if (isinstance(md, dropbox.files.FileMetadata) and
mtime_dt == md.client_modified and size == md.size):
print(name, 'is already synced [stats match]')
else:
print(name, 'exists with different stats, downloading')
res = download(dbx, folder, subfolder, name)
with open(fullname) as f:
data = f.read()
if res == data:
print(name, 'is already synced [content match]')
else:
print(name, 'has changed since last sync')
if yesno('Refresh %s' % name, False, args):
upload(dbx, fullname, folder, subfolder, name,
overwrite=True)
elif yesno('Upload %s' % name, True, args):
upload(dbx, fullname, folder, subfolder, name)
# Then choose which subdirectories to traverse.
keep = []
for name in dirs:
if name.startswith('.'):
print('Skipping dot directory:', name)
elif name.startswith('@') or name.endswith('~'):
print('Skipping temporary directory:', name)
elif name == '__pycache__':
print('Skipping generated directory:', name)
elif yesno('Descend into %s' % name, True, args):
print('Keeping directory:', name)
keep.append(name)
else:
print('OK, skipping directory:', name)
dirs[:] = keep
def list_folder(dbx, folder, subfolder):
"""List a folder.
Return a dict mapping unicode filenames to
FileMetadata|FolderMetadata entries.
"""
path = '/%s/%s' % (folder, subfolder.replace(os.path.sep, '/'))
while '//' in path:
path = path.replace('//', '/')
path = path.rstrip('/')
try:
with stopwatch('list_folder'):
res = dbx.files_list_folder(path)
except dropbox.exceptions.ApiError as err:
print('Folder listing failed for', path, '-- assumped empty:', err)
return {}
else:
rv = {}
for entry in res.entries:
rv[entry.name] = entry
return rv
def download(dbx, folder, subfolder, name):
"""Download a file.
Return the bytes of the file, or None if it doesn't exist.
"""
path = '/%s/%s/%s' % (folder, subfolder.replace(os.path.sep, '/'), name)
while '//' in path:
path = path.replace('//', '/')
with stopwatch('download'):
try:
md, res = dbx.files_download(path)
except dropbox.exceptions.HttpError as err:
print('*** HTTP error', err)
return None
data = res.content
print(len(data), 'bytes; md:', md)
return data
def upload(dbx, fullname, folder, subfolder, name, overwrite=False):
"""Upload a file.
Return the request response, or None in case of error.
"""
path = '/%s/%s/%s' % (folder, subfolder.replace(os.path.sep, '/'), name)
while '//' in path:
path = path.replace('//', '/')
mode = (dropbox.files.WriteMode.overwrite
if overwrite
else dropbox.files.WriteMode.add)
mtime = os.path.getmtime(fullname)
with open(fullname, 'rb') as f:
data = f.read()
with stopwatch('upload %d bytes' % len(data)):
try:
res = dbx.files_upload(
data, path, mode,
client_modified=datetime.datetime(*time.gmtime(mtime)[:6]),
mute=True)
except dropbox.exceptions.ApiError as err:
print('*** API error', err)
return None
print('uploaded as', res.name.encode('utf8'))
return res
def yesno(message, default, args):
"""Handy helper function to ask a yes/no question.
Command line arguments --yes or --no force the answer;
--default to force the default answer.
Otherwise a blank line returns the default, and answering
y/yes or n/no returns True or False.
Retry on unrecognized answer.
Special answers:
- q or quit exits the program
- p or pdb invokes the debugger
"""
if args.default:
print(message + '? [auto]', 'Y' if default else 'N')
return default
if args.yes:
print(message + '? [auto] YES')
return True
if args.no:
print(message + '? [auto] NO')
return False
if default:
message += '? [Y/n] '
else:
message += '? [N/y] '
while True:
answer = input(message).strip().lower()
if not answer:
return default
if answer in ('y', 'yes'):
return True
if answer in ('n', 'no'):
return False
if answer in ('q', 'quit'):
print('Exit')
raise SystemExit(0)
if answer in ('p', 'pdb'):
import pdb
pdb.set_trace()
print('Please answer YES or NO.')
@contextlib.contextmanager
def stopwatch(message):
"""Context manager to print how long a block of code took."""
t0 = time.time()
try:
yield
finally:
t1 = time.time()
print('Total elapsed time for %s: %.3f' % (message, t1 - t0))
if __name__ == '__main__':
main()