Skip to content

Commit

Permalink
Merge pull request #424 from uzlonewolf/misc
Browse files Browse the repository at this point in the history
Require pyca/cryptography >= 3.1, and add tools/fake-v35-device.py
  • Loading branch information
jasonacox authored Nov 16, 2023
2 parents 5591ec7 + cd92a7d commit d17d031
Show file tree
Hide file tree
Showing 5 changed files with 585 additions and 8 deletions.
7 changes: 7 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# RELEASE NOTES

## v1.13.1 - Cryptography Version

* PyPI 1.13.1
* Require pyca/cryptography>=3.1 or fallback to PyCryptodome
* Add `tools/fake-v35-device.py` script to tools
* Allow pyca/cryptography to GCM decrypt without the tag (makes it match PyCryptodome) by @uzlonewolf in https://github.com/jasonacox/tinytuya/pull/424

## v1.13.0 - Crypto Library Update

* PyPI 1.13.0
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#
cryptography # Encryption - AES can also be provided via PyCryptodome or pyaes or pyca/cryptography
cryptography>=3.1 # Encryption - AES can also be provided via PyCryptodome or pyaes or pyca/cryptography
requests # Used for Setup Wizard - Tuya IoT Platform calls
colorama # Makes ANSI escape character sequences work under MS Windows.
17 changes: 10 additions & 7 deletions tinytuya/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,13 @@
Crypto = AES = CRYPTOLIB = None
try:
if clib == 'pyca/cryptography': # https://cryptography.io/en/latest/
from cryptography import __version__ as Crypto_version
if (Crypto_version[:2] in ('0.', '1.', '2.')) or (Crypto_version == '3.0'):
# cryptography <= 3.0 requires a backend= parameter
continue
from cryptography.hazmat.primitives.ciphers import Cipher as Crypto
from cryptography.hazmat.primitives.ciphers import modes as Crypto_modes
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from cryptography import __version__ as Crypto_version
elif clib == 'PyCryptodomex': # https://pycryptodome.readthedocs.io/en/latest/
# PyCryptodome is installed as "Cryptodome" when installed by
# `apt install python3-pycryptodome` or `pip install pycryptodomex`
Expand All @@ -120,7 +123,7 @@
# Colorama terminal color capability for all platforms
init()

version_tuple = (1, 13, 0)
version_tuple = (1, 13, 1)
version = __version__ = "%d.%d.%d" % version_tuple
__author__ = "jasonacox"

Expand Down Expand Up @@ -309,12 +312,12 @@ def decrypt(self, enc, use_base64=True, decode_text=True, verify_padding=False,
raise ValueError("invalid length")
if iv:
iv, enc = self.get_decryption_iv( iv, enc )
decryptor = Crypto( AES(self.key), Crypto_modes.GCM(iv, tag) ).decryptor()
if header:
if tag is None:
decryptor = Crypto( AES(self.key), Crypto_modes.CTR(iv + b'\x00\x00\x00\x02') ).decryptor()
else:
decryptor = Crypto( AES(self.key), Crypto_modes.GCM(iv, tag) ).decryptor()
if header and (tag is not None):
decryptor.authenticate_additional_data( header )
#if tag is None:
# raw = decryptor.update( enc )
#else:
raw = decryptor.update( enc ) + decryptor.finalize()
else:
decryptor = Crypto( AES(self.key), Crypto_modes.ECB() ).decryptor()
Expand Down
117 changes: 117 additions & 0 deletions tools/fake-v35-device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@

import ttcorefunc as tinytuya
import socket
import select
import time
import json
from hashlib import md5, sha256
import hmac

bind_host = ''
bind_port = 6668

# can also be set to the address of a hub/gateway device or phone running SmartLife
bcast_to = '127.0.0.1'

bcast_data = b'{"ip":"127.0.0.1","gwId":"eb0123456789abcdefghij","active":2,"ablilty":0,"encrypt":true,"productKey":"keydeadbeef12345","version":"3.5","token":true,"wf_cfg":true}'
real_key = b'thisisarealkey00'
local_nonce = str(time.time() * 1000000)[:16].encode('utf8') #b'0123456789abcdef'

msg = tinytuya.TuyaMessage(1, tinytuya.UDP_NEW, 0, bcast_data, 0, True, tinytuya.PREFIX_6699_VALUE, True)
bcast_data = tinytuya.pack_message(msg,hmac_key=tinytuya.udpkey)
print("broadcast encrypted=%r" % bcast_data.hex() )


srv = socket.socket( socket.AF_INET6, socket.SOCK_STREAM )
srv.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
srv.bind( (bind_host, bind_port) )
srv.listen( 1 )

bsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
bsock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)

client = None

bcast_time = 0

while True:
r = [srv]
if client: r.append( client )
w = []
x = []

r, w, x = select.select( r, w, x, 1 )
#print('select')

if( bcast_time < time.time() ):
bcast_time = time.time() + 8
#print( 'bcast' )
bsock.sendto( bcast_data, (bcast_to, 6667) )

for sock in r:
if sock is srv:
if client:
client.close()
client = None
client, addr = sock.accept()
client.setblocking( False )
tmp_key = real_key
seqno = 1
print( 'new client connected:', addr )
continue

if sock is not client:
print('not:', sock)
continue

data = sock.recv( 4096 )
#print( 'client data: %r' % data )
if not data:
client.close()
client = None
continue

print('')
print('client sent:', data)
#print(data.hex())
m = tinytuya.unpack_message(data,hmac_key=tmp_key, no_retcode=True)
#print('payload len:', len(m.payload), 'tuya message:', m)
print('decoded message:', m)

if m.cmd == tinytuya.SESS_KEY_NEG_START:
tmp_key = real_key
payload = m.payload
remote_nonce = payload
miv = remote_nonce[:12]
hmac_check = hmac.new(real_key, remote_nonce, sha256).digest()
msg = tinytuya.TuyaMessage(seqno, tinytuya.SESS_KEY_NEG_RESP, 0, local_nonce+hmac_check, 0, True, tinytuya.PREFIX_6699_VALUE, True)
seqno += 1
data = tinytuya.pack_message(msg, hmac_key=tmp_key)
print( 'session neg start:', msg )
client.sendall( data )
elif m.cmd == tinytuya.SESS_KEY_NEG_FINISH:
rkey_hmac = hmac.new(real_key, local_nonce, sha256).digest()
print('neg fin. success:', rkey_hmac == m.payload)
print('want hmac:', rkey_hmac.hex())
print('got hmac: ', m.payload.hex())
tmp_key = bytes( [ a^b for (a,b) in zip(remote_nonce,local_nonce) ] )
print( 'sess nonce:', tmp_key.hex() )
cipher = tinytuya.AESCipher( real_key )
print( 'sess iv:', m.iv.hex() )
tmp_key = cipher.encrypt( tmp_key, use_base64=False, pad=False, iv=miv )[12:28]
print( 'sess key:', tmp_key.hex(), tmp_key)
elif m.cmd == tinytuya.DP_QUERY_NEW:
print('got status request')
resp = {'protocol': 4, 't': int(time.time()), 'data': {'dps': {'20': True, '21': 'white', '22': 946, '23': 3, '24': '014a03e803a9', '25': '04464602007803e803e800000000464602007803e8000a00000000', '26': 0, '34': False}} }
msg = tinytuya.TuyaMessage(seqno, 16, 0, json.dumps(resp).encode('ascii'), 0, True, tinytuya.PREFIX_6699_VALUE, True)
seqno += 1
data = tinytuya.pack_message(msg, hmac_key=tmp_key)
client.sendall( data )
else:
print('unhandled command', m.cmd)
msg = tinytuya.TuyaMessage(seqno, 16, 0, b'json obj data unvalid', 0, True, tinytuya.PREFIX_6699_VALUE, True)
seqno += 1
data = tinytuya.pack_message(msg, hmac_key=tmp_key)
client.sendall( data )


Loading

0 comments on commit d17d031

Please sign in to comment.