From fa95884beb5bc2bd9bed3dbe00bc3e1b4ec91476 Mon Sep 17 00:00:00 2001 From: swwgames <95527278+swwgames@users.noreply.github.com> Date: Thu, 21 Nov 2024 12:34:30 +0100 Subject: [PATCH] C1200 get_vpn_status and set_vpn (#57) * Update package_enum.py VPN lowecase version * Update c1200.py Added get_vpn_status and set_vpn for the c1200 router. * Update c1200.py * Update test_client_c1200.py Added vpn tests * Update test_client_c1200.py * Update test_client_c1200.py * Update c1200.py * Update test_client_c1200.py * Update test_client_c1200.py * Update c1200.py --- test/test_client_c1200.py | 195 ++++++++++++++++++++----- tplinkrouterc6u/client/c1200.py | 36 +++++ tplinkrouterc6u/common/package_enum.py | 5 + 3 files changed, 197 insertions(+), 39 deletions(-) diff --git a/test/test_client_c1200.py b/test/test_client_c1200.py index 39cf855..17b076c 100644 --- a/test/test_client_c1200.py +++ b/test/test_client_c1200.py @@ -3,41 +3,45 @@ from tplinkrouterc6u import ( TplinkC1200Router, Connection, - ClientException + ClientException, ) +from tplinkrouterc6u.common.package_enum import VPN class TestTPLinkC1200Client(TestCase): - def test_set_led_on(self) -> None: - - response_led_general_read = ''' + response_led_general_read = """ { "enable": "off", "time_set": "yes", "ledpm_support": "yes" } - ''' + """ - response_led_general_write = ''' + response_led_general_write = """ { "enable": "on", "time_set": "yes", "ledpm_support": "yes" } - ''' + """ class TPLinkRouterTest(TplinkC1200Router): - def request(self, path: str, data: str, - ignore_response: bool = False, ignore_errors: bool = False) -> dict | None: - if path == 'admin/ledgeneral?form=setting&operation=read': + def request( + self, + path: str, + data: str, + ignore_response: bool = False, + ignore_errors: bool = False, + ) -> dict | None: + if path == "admin/ledgeneral?form=setting&operation=read": return loads(response_led_general_read) - if path == 'admin/ledgeneral?form=setting&operation=write': + if path == "admin/ledgeneral?form=setting&operation=write": self.captured_path = path return loads(response_led_general_write) raise ClientException() - client = TPLinkRouterTest('', '') + client = TPLinkRouterTest("", "") client.set_led(True) @@ -46,34 +50,38 @@ def request(self, path: str, data: str, self.assertEqual(client.captured_path, expected_path) def test_set_led_off(self) -> None: - - response_led_general_read = ''' + response_led_general_read = """ { "enable": "on", "time_set": "yes", "ledpm_support": "yes" } - ''' + """ - response_led_general_write = ''' + response_led_general_write = """ { "enable": "off", "time_set": "yes", "ledpm_support": "yes" } - ''' + """ class TPLinkRouterTest(TplinkC1200Router): - def request(self, path: str, data: str, - ignore_response: bool = False, ignore_errors: bool = False) -> dict | None: - if path == 'admin/ledgeneral?form=setting&operation=read': + def request( + self, + path: str, + data: str, + ignore_response: bool = False, + ignore_errors: bool = False, + ) -> dict | None: + if path == "admin/ledgeneral?form=setting&operation=read": return loads(response_led_general_read) - elif path == 'admin/ledgeneral?form=setting&operation=write': + if path == "admin/ledgeneral?form=setting&operation=write": self.captured_path = path return loads(response_led_general_write) raise ClientException() - client = TPLinkRouterTest('', '') + client = TPLinkRouterTest("", "") client.set_led(False) @@ -82,37 +90,44 @@ def request(self, path: str, data: str, self.assertEqual(client.captured_path, expected_path) def test_led_status(self) -> None: - - response_led_general_read = ''' + response_led_general_read = """ { "enable": "on", "time_set": "yes", "ledpm_support": "yes" } - ''' + """ class TPLinkRouterTest(TplinkC1200Router): - def request(self, path: str, data: str, - ignore_response: bool = False, ignore_errors: bool = False) -> dict | None: - if path == 'admin/ledgeneral?form=setting&operation=read': + def request( + self, + path: str, + data: str, + ignore_response: bool = False, + ignore_errors: bool = False, + ) -> dict | None: + if path == "admin/ledgeneral?form=setting&operation=read": return loads(response_led_general_read) raise ClientException() - client = TPLinkRouterTest('', '') + client = TPLinkRouterTest("", "") led_status = client.get_led() self.assertTrue(led_status) def test_set_wifi(self) -> None: - class TPLinkRouterTest(TplinkC1200Router): - def request(self, path: str, data: str, - ignore_response: bool = False, ignore_errors: bool = False) -> dict | None: - + def request( + self, + path: str, + data: str, + ignore_response: bool = False, + ignore_errors: bool = False, + ) -> dict | None: self.captured_path = path self.captured_data = data - client = TPLinkRouterTest('', '') + client = TPLinkRouterTest("", "") client.set_wifi( Connection.HOST_5G, enable=True, @@ -126,17 +141,119 @@ def request(self, path: str, data: str, htmode="VHT20", channel=36, txpower="20", - disabled_all="no" + disabled_all="no", ) - expected_data = ("operation=write&enable=on&ssid=TestSSID&hidden=no&encryption=WPA3-PSK&" - "psk_version=2&psk_cipher=AES&psk_key=testkey123&hwmode=11ac&" - "htmode=VHT20&channel=36&txpower=20&disabled_all=no") + expected_data = ( + "operation=write&enable=on&ssid=TestSSID&hidden=no&encryption=WPA3-PSK&" + "psk_version=2&psk_cipher=AES&psk_key=testkey123&hwmode=11ac&" + "htmode=VHT20&channel=36&txpower=20&disabled_all=no" + ) expected_path = f"admin/wireless?form=wireless_5g&{expected_data}" self.assertEqual(client.captured_path, expected_path) self.assertEqual(client.captured_data, expected_data) + def test_vpn_status(self) -> None: + response_openvpn_read = """ + { + "enabled": "on", + "proto": "udp", + "access": "home", + "cert_exist": true, + "mask": "255.255.255.0", + "port": "1194", + "serverip": "10.8.0.0" + } + """ + + response_pptp_read = """ + { + "enabled": "off", + "unencrypted_access": "on", + "samba_access": "on", + "netbios_pass": "on", + "remoteip": "10.0.0.11-20" + } + """ + + respone_vpnconn_openvpn = """[ + {"username": "admin", "remote_ip": "192.168.0.200", "ipaddr": "10.0.0.11", + "extra": "7450", "vpntype": "openvpn", "key": "7450"}, + {"username": "admin", "remote_ip": "192.168.0.200", "ipaddr": "10.0.0.11", + "extra": "7450", "vpntype": "openvpn", "key": "7450"} + ]""" + + respone_vpnconn_pptpvpn = """[ + {"username": "admin", "remote_ip": "192.168.0.200", "ipaddr": "10.0.0.11", + "extra": "7450", "vpntype": "pptp", "key": "7450"}, + {"username": "admin", "remote_ip": "192.168.0.200", "ipaddr": "10.0.0.11", + "extra": "7450", "vpntype": "pptp", "key": "7450"}, + {"username": "admin", "remote_ip": "192.168.0.200", "ipaddr": "10.0.0.11", + "extra": "7450", "vpntype": "pptp", "key": "7450"} + ]""" + + class TPLinkRouterTest(TplinkC1200Router): + def request( + self, + path: str, + data: str, + ignore_response: bool = False, + ignore_errors: bool = False, + ) -> dict | None: + if path == "/admin/openvpn?form=config&operation=read": + return loads(response_openvpn_read) + if path == "/admin/pptpd?form=config&operation=read": + return loads(response_pptp_read) + if path == "/admin/vpnconn?form=config&operation=list&vpntype=openvpn": + return loads(respone_vpnconn_openvpn) + if path == "/admin/vpnconn?form=config&operation=list&vpntype=pptp": + return loads(respone_vpnconn_pptpvpn) + raise ClientException() + + client = TPLinkRouterTest("", "") + + vpn_status = client.get_vpn_status() + self.assertTrue(vpn_status.openvpn_enable) + self.assertFalse(vpn_status.pptpvpn_enable) + self.assertEqual(vpn_status.openvpn_clients_total, 2) + self.assertEqual(vpn_status.pptpvpn_clients_total, 3) + + def test_set_vpn(self) -> None: + response_openvpn_read = """ + { + "enabled": "on", + "proto": "udp", + "access": "home", + "cert_exist": true, + "mask": "255.255.255.0", + "port": "1194", + "serverip": "10.8.0.0" + } + """ + + class TPLinkRouterTest(TplinkC1200Router): + def request( + self, + path: str, + data: str, + ignore_response: bool = False, + ignore_errors: bool = False, + ) -> dict | None: + if path == "/admin/openvpn?form=config&operation=read": + return loads(response_openvpn_read) + self.captured_path = path + + client = TPLinkRouterTest("", "") + client.set_vpn(VPN.OPEN_VPN, True) + + expected_path = ( + "/admin/openvpn?form=config&operation=write&enabled=on" + "&proto=udp&access=home&cert_exist=True" + "&mask=255.255.255.0&port=1194&serverip=10.8.0.0" + ) + self.assertEqual(client.captured_path, expected_path) + -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/tplinkrouterc6u/client/c1200.py b/tplinkrouterc6u/client/c1200.py index 04f9ba9..35641a1 100644 --- a/tplinkrouterc6u/client/c1200.py +++ b/tplinkrouterc6u/client/c1200.py @@ -1,8 +1,11 @@ from re import search from requests import post, Response +from urllib.parse import urlencode from tplinkrouterc6u.common.encryption import EncryptionWrapper from tplinkrouterc6u.common.exception import ClientException, AuthorizeError from tplinkrouterc6u.client.c5400x import TplinkC5400XRouter +from tplinkrouterc6u.common.dataclass import VPNStatus +from tplinkrouterc6u.common.package_enum import VPN class TplinkC1200Router(TplinkC5400XRouter): @@ -100,3 +103,36 @@ def _try_login(self) -> Response: @staticmethod def _get_login_data(crypted_pwd: str) -> str: return 'operation=login&password={}'.format(crypted_pwd) + + def get_vpn_status(self) -> VPNStatus: + status = VPNStatus() + + values = [ + self.request("/admin/openvpn?form=config&operation=read", "operation=read"), + self.request("/admin/pptpd?form=config&operation=read", "operation=read"), + self.request("/admin/vpnconn?form=config&operation=list&vpntype=openvpn", + "operation=list&operation=list&vpntype=openvpn"), + self.request("/admin/vpnconn?form=config&operation=list&vpntype=pptp", + "operation=list&operation=list&vpntype=pptp"), + ] + + status.openvpn_enable = values[0]['enabled'] == 'on' + status.pptpvpn_enable = values[1]['enabled'] == 'on' + + if isinstance(values[2], list): + status.openvpn_clients_total = len(values[2]) + status.pptpvpn_clients_total = len(values[3]) + else: + status.openvpn_clients_total = 0 + status.pptpvpn_clients_total = 0 + + return status + + def set_vpn(self, vpn: VPN, enable: bool) -> None: + path = "/admin/{}?form=config&operation=read".format(vpn.lowercase) + current_config = self.request(path, "operation=read") + current_config['enabled'] = "on" if enable else "off" + data = urlencode(current_config) + data = "&operation=write&{}".format(data) + path = "/admin/{}?form=config{}".format(vpn.lowercase, data) + self.request(path, data) diff --git a/tplinkrouterc6u/common/package_enum.py b/tplinkrouterc6u/common/package_enum.py index e19592f..b2f42d5 100644 --- a/tplinkrouterc6u/common/package_enum.py +++ b/tplinkrouterc6u/common/package_enum.py @@ -49,3 +49,8 @@ def get_type(self) -> str | None: class VPN(Enum): OPEN_VPN = 'OPENVPN' PPTP_VPN = 'PPTPVPN' + + @property + def lowercase(self) -> str: + """Returns the lowercase version of the enum value. Needed for the c1200 router.""" + return self.value.lower()