forked from david-m-m/SMA-EM
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspeedwiredecoder.py
120 lines (112 loc) · 4.39 KB
/
speedwiredecoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
*
* by david-m-m 2019-Mar-17
*
* this software is released under GNU General Public License, version 2.
* This program is free software;
* you can redistribute it and/or modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; version 2 of the License.
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this program;
* if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/
"""
import binascii
# unit definitions with scaling
sma_units={
"W": 0.1,
"Wh": 1.0/3600.0,
"A": 0.001,
"V": 0.001,
"cosphi": 0.001
}
# map of all defined SMA channels
# format: <channel_number>:(<name>,<emparts_name>,<unit_actual>,<unit_total>)
sma_channels={
# totals
1:('active_power_consumption','pregard','W','Wh'),
2:('active_power_supply','psurplus','W','Wh'),
3:('apparent_power_consumption','sregard','W','Wh'),
4:('apparent_power_supply','ssurplus','W','Wh'),
9:('reactive_power_consumption','qregard','W','Wh'),
10:('reactive_power_supply','qsurplus','W','Wh'),
13:('powerfactor','cosphi','cosphi'),
# phase 1
21:('p1_active_power_consumption','p1regard','W','Wh'),
22:('p1_active_power_supply','p1surplus','W','Wh'),
23:('p1_apparent_power_consumption','s1regard','W','Wh'),
24:('p1_apparent_power_supply','s1surplus','W','Wh'),
29:('p1_reactive_power_consumption','q1regard','W','Wh'),
30:('p1_reactive_power_supply','q1surplus','W','Wh'),
31:('p1_current','thd1','A'),
32:('p1_voltage','v1','V'),
33:('p1_powerfactor','cosphi1','cosphi'),
# phase 2
41:('p2_active_power_consumption','p2regard','W','Wh'),
42:('p2_active_power_supply','p2surplus','W','Wh'),
43:('p2_apparent_power_consumption','s2regard','W','Wh'),
44:('p2_apparent_power_supply','s2surplus','W','Wh'),
49:('p2_reactive_power_consumption','q2regard','W','Wh'),
50:('p2_reactive_power_supply','q2surplus','W','Wh'),
51:('p2_current','thd2','A'),
52:('p2_voltage','v2','V'),
53:('p2_powerfactor','cosphi2','cosphi'),
# phase 3
61:('p3_active_power_consumption','p3regard','W','Wh'),
62:('p3_active_power_supply','p3surplus','W','Wh'),
63:('p3_apparent_power_consumption','s3regard','W','Wh'),
64:('p3_apparent_power_supply','s3surplus','W','Wh'),
69:('p3_reactive_power_consumption','q3regard','W','Wh'),
70:('p3_reactive_power_supply','q3surplus','W','Wh'),
71:('p3_current','thd3','A'),
72:('p3_voltage','v3','V'),
73:('p3_powerfactor','cosphi3','cosphi'),
}
def decode_OBIS(obis):
measurement=int.from_bytes(obis[1:2], byteorder='big' )
raw_type=int.from_bytes(obis[2:3], byteorder='big')
if raw_type==4:
datatype='actual'
elif raw_type==8:
datatype='counter'
else:
print("unknown datatype")
datatype='unknown'
return (measurement,datatype)
def decode_speedwire(datagram):
emparts={}
# process data only of SMA header is present
if datagram[0:3]==b'SMA':
# datagram length
datalength=int.from_bytes(datagram[12:14],byteorder='big')+8
# serial number
emID=int.from_bytes(datagram[20:24],byteorder='big')
emparts['serial']=emID
# timestamp
timestamp=int.from_bytes(datagram[24:28],byteorder='big')
# decode OBIS data blocks
# start with header
position=28
while position<datalength:
# decode header
(measurement,datatype)=decode_OBIS(datagram[position:position+4])
# decode values
# actual values
if datatype=='actual':
value=int.from_bytes( datagram[position+4:position+8], byteorder='big' )
position+=8
if measurement in sma_channels.keys():
emparts[sma_channels[measurement][1]]=value*sma_units[sma_channels[measurement][2]]
# counter values
elif datatype=='counter':
value=int.from_bytes( datagram[position+4:position+12], byteorder='big' )
position+=12
if measurement in sma_channels.keys():
emparts[sma_channels[measurement][1]+'counter']=value*sma_units[sma_channels[measurement][3]]*0.001
else:
position+=8
return emparts