forked from aker-gateway/Aker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsession.py
76 lines (60 loc) · 1.97 KB
/
session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# -*- coding: utf-8 -*-
#
# Copyright 2016 Ahmed Nazmy
#
# Meta
__license__ = "AGPLv3"
__author__ = 'Ahmed Nazmy <[email protected]>'
import logging
import signal
import os
import time
import getpass
from SSHClient import SSHClient
class Session(object):
"""
Base Session class
"""
def __init__(self, aker_core, host, uuid):
self.aker = aker_core
self.host = host
self.host_user = self.aker.user.name
self.host_port = int(self.aker.port)
self.src_port = self.aker.config.src_port
self.uuid = uuid
logging.debug("Session: Base Session created")
def attach_sniffer(self, sniffer):
self._client.attach_sniffer(sniffer)
def stop_sniffer(self):
self._client.stop_sniffer()
def connect(self, size):
self._client.connect(self.host, self.host_port, size)
def start_session(self):
raise NotImplementedError
def close_session(self):
self.aker.session_end_callback(self)
def kill_session(self, signum, stack):
logging.debug("Session: Session ended")
self.close_session()
class SSHSession(Session):
""" Wrapper around SSHClient instantiating
a new SSHClient instance every time
"""
def __init__(self, aker_core, host, uuid):
super(SSHSession, self).__init__(aker_core, host, uuid)
self._client = SSHClient(self)
logging.debug("Session: SSHSession created")
def start_session(self):
try:
auth_secret = self.aker.user.get_priv_key()
# currently, if no SSH public key exists, an ``Exception``
# is raised. Catch it and try a password.
except Exception as exc:
if str(exc) == 'Core: Invalid Private Key':
auth_secret = getpass.getpass("Password: ")
else:
raise
try:
self._client.start_session(self.host_user, auth_secret)
except:
logging.debug("Session: SSHSession failed")