-
Notifications
You must be signed in to change notification settings - Fork 7
/
file.py
168 lines (140 loc) · 6.07 KB
/
file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import os, shutil, logging
from typing import Dict, List, Set
from typing import Optional, Any, Union
from datetime import datetime
import requests
import pandas as pd
import re
from .chrome import get_active_url_from_accessTree
logger = logging.getLogger("desktopenv.getters.file")
def get_content_from_vm_file(env, config: Dict[str, Any]) -> Any:
"""
Config:
path (str): absolute path on the VM to fetch
"""
path = config["path"]
file_path = get_vm_file(env, {"path": path, "dest": os.path.basename(path)})
file_type, file_content = config['file_type'], config['file_content']
if file_type == 'xlsx':
if file_content == 'last_row':
df = pd.read_excel(file_path)
last_row = df.iloc[-1]
last_row_as_list = last_row.astype(str).tolist()
return last_row_as_list
else:
raise NotImplementedError(f"File type {file_type} not supported")
def get_local_file(env, config: Dict[str, Any]) -> Any:
""" File is already on the local host machine.
Config:
path (str): absolute path on the local machine
dest (str): file name to be copied into the cache dir, by default, use the base name of the path
"""
path = config["path"]
dest = config.get('dest', os.path.basename(path))
dest = os.path.join(env.cache_dir, dest)
if not os.path.exists(path):
logger.error(f'[ERROR]: The specified file path {path} is not found on the local machine!')
return
if os.path.isdir(path): # recursively copy the directory (overwrite if exists)
if os.path.exists(dest) and os.path.isdir(dest):
shutil.rmtree(dest)
shutil.copytree(path, dest, dirs_exist_ok=True)
else: # directly copy the file (overwrite if exists)
shutil.copyfile(path, dest)
return dest
def get_cloud_file(env, config: Dict[str, Any]) -> Union[str, List[str]]:
"""
Config:
path (str|List[str]): the url to download from
dest (str|List[str])): file name of the downloaded file
multi (bool) : optional. if path and dest are lists providing
information of multiple files. defaults to False
gives (List[int]): optional. defaults to [0]. which files are directly
returned to the metric. if len==1, str is returned; else, list is
returned.
"""
if not config.get("multi", False):
paths: List[str] = [config["path"]]
dests: List[str] = [config["dest"]]
else:
paths: List[str] = config["path"]
dests: List[str] = config["dest"]
cache_paths: List[str] = []
gives: Set[int] = set(config.get("gives", [0]))
for i, (p, d) in enumerate(zip(paths, dests)):
_path = os.path.join(env.cache_dir, d)
if i in gives:
cache_paths.append(_path)
if os.path.exists(_path):
#return _path
continue
url = p
response = requests.get(url, stream=True)
response.raise_for_status()
with open(_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return cache_paths[0] if len(cache_paths)==1 else cache_paths
def get_vm_file(env, config: Dict[str, Any]) -> Union[Optional[str], List[Optional[str]]]:
"""
Config:
path (str): absolute path on the VM to fetch
dest (str): file name of the downloaded file
multi (bool) : optional. if path and dest are lists providing
information of multiple files. defaults to False
gives (List[int]): optional. defaults to [0]. which files are directly
returned to the metric. if len==1, str is returned; else, list is
returned.
only support for single file now:
time_suffix(bool): optional. defaults to False. if True, append the current time in required format.
time_format(str): optional. defaults to "%Y_%m_%d". format of the time suffix.
"""
time_format = "%Y_%m_%d"
if not config.get("multi", False):
paths: List[str] = [config["path"]]
dests: List[str] = [config["dest"]]
if "time_suffix" in config.keys() and config["time_suffix"]:
if "time_format" in config.keys():
time_format = config["time_format"]
# Insert time before . in file type suffix
paths = [p.split(".")[0] + datetime.now().strftime(time_format) + "." + p.split(".")[1] if "." in p else p for p in paths]
dests = [d.split(".")[0] + datetime.now().strftime(time_format) + "." + d.split(".")[1] if "." in d else d for d in dests]
else:
paths: List[str] = config["path"]
dests: List[str] = config["dest"]
cache_paths: List[str] = []
gives: Set[int] = set(config.get("gives", [0]))
for i, (p, d) in enumerate(zip(paths, dests)):
_path = os.path.join(env.cache_dir, d)
file = env.controller.get_file(p)
if file is None:
#return None
# raise FileNotFoundError("File not found on VM: {:}".format(config["path"]))
if i in gives:
cache_paths.append(None)
continue
if i in gives:
cache_paths.append(_path)
with open(_path, "wb") as f:
f.write(file)
return cache_paths[0] if len(cache_paths)==1 else cache_paths
def get_cache_file(env, config: Dict[str, str]) -> str:
"""
Config:
path (str): relative path in cache dir
"""
_path = os.path.join(env.cache_dir, config["path"])
assert os.path.exists(_path)
return _path
def get_googlesheet_active_file(env, config: Dict[str, Any]) -> Union[str, List[str]]:
"""
Config:
dest (str|List[str])): file name of the downloaded file
"""
url_config = {"goto_prefix": "https://docs.google.com/spreadsheets"}
active_url = get_active_url_from_accessTree(env, url_config)
config["path"] = active_url
extracted_content = re.search('d/(.*?)/edit', active_url).group(1)
config["path"] = f"https://docs.google.com/spreadsheets/d/{extracted_content}/export?format=xlsx"
return get_cloud_file(env, config)