-
Notifications
You must be signed in to change notification settings - Fork 0
/
neo4j_dump.py
102 lines (95 loc) · 3.94 KB
/
neo4j_dump.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import paramiko
import subprocess
import os
import io
import time
from scp import SCPClient
from bento.common.s3 import upload_log_file
from bento.common.utils import get_logger, get_time_stamp, LOG_PREFIX, APP_NAME
import signal
class TimeoutError(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutError("Code execution timed out")
def uplaod_s3(s3_bucket, s3_folder, log, file_key):
# Upload to s3 bucket
dest = f"s3://{s3_bucket}/{s3_folder}"
try:
upload_log_file(dest, file_key)
log.info(f'Uploading neo4j dump file {os.path.basename(file_key)} succeeded!')
except Exception as e:
log.error(e)
def wait_for_complete(log, channel, recv_timeout):
output_buffer = ""
signal.signal(signal.SIGALRM, timeout_handler)
while not (output_buffer.endswith("~]$ ") or output_buffer.endswith("]# ")):
try:
# Attempt to receive data from the channel
signal.alarm(recv_timeout)
recv_data = channel.recv(1024).decode()
log.info(recv_data)
signal.alarm(0)
except TimeoutError as e:
#log.info("ouput timeout")
recv_data = ""
finally:
signal.alarm(0)
output_buffer += recv_data
return output_buffer
def neo4j_dump(dump_file, neo4j_ip, neo4j_user, neo4j_key, s3_bucket, s3_folder):
dump_fail = False
is_shell = True
TMP = "/tmp/"
if LOG_PREFIX not in os.environ:
os.environ[LOG_PREFIX] = 'Neo4j_Dump_Generator'
os.environ[APP_NAME] = 'Neo4j_Dump_Generator'
log = get_logger('Neo4j Dump Generator')
file_key = os.path.join(TMP, dump_file)
#host = get_host(neo4j_ip)
host = neo4j_ip
command = f"sudo rm -f {file_key} && sudo neo4j-admin dump --database=neo4j --to={file_key}"
if host in ['localhost', '127.0.0.1']:
try:
subprocess.call(command, shell = is_shell)
except Exception as e:
dump_fail = True
log.error(e)
else:
#cmd_list = ["sudo su - commonsdocker","sudo -i", "systemctl stop neo4j", command, "systemctl start neo4j"]
cmd_list = ["sudo systemctl stop neo4j", command, f"sudo chmod 666 {file_key}", "sudo systemctl start neo4j"]
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
pkey = paramiko.RSAKey.from_private_key(io.StringIO(neo4j_key))
log.info("Start connecting to remote neo4j server")
#client.connect(host, username=neo4j_user, password=neo4j_password, timeout=30)
client.connect(host, username=neo4j_user, pkey=pkey, timeout=30)
log.info("Connect to the remote server successfully")
channel = client.invoke_shell()
''''''
try:
for cmd in cmd_list:
channel.send(cmd + "\n")
while not channel.recv_ready():
time.sleep(0.1)
#set up timer because channel.recv() will stuck when there is no more output
recv_timeout = 3
output_buffer = wait_for_complete(log, channel, recv_timeout)
log.info(output_buffer)
except Exception as e:
dump_fail = True
log.error(e)
# Download the file
log.info(f"Start downloading from {file_key}")
#timestamp = get_time_stamp()
#local_file_key = os.path.join('tmp', dump_file.replace(os.path.splitext(dump_file)[1], "_" + timestamp + ".dump"))
local_file_key = os.path.join('tmp', dump_file)
if not dump_fail:
#download_cmd = f"scp {neo4j_ip}:{file_key} ."
scp = SCPClient(client.get_transport())
scp.get(file_key, local_file_key)
else:
log.error("Can not create neo4j dump file")
client.close()
if not dump_fail:
log.info(f"Start uploading file {local_file_key} to S3://{s3_bucket}/{s3_folder}")
uplaod_s3(s3_bucket, s3_folder, log, local_file_key)