-
-
Notifications
You must be signed in to change notification settings - Fork 11
/
linger_counter.py
executable file
·190 lines (166 loc) · 6.56 KB
/
linger_counter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#!/usr/bin/env python
import argparse, os, platform, sys, time, logging, json, logging.config, signal
from argparse import RawTextHelpFormatter
import sqlite3 as lite
LOGLEVEL = logging.WARNING
# Load our config file
try:
with open('config.json') as f:
print "open"
config = json.load(f)
print config
if config['run_rx'] == False:
sys.exit();
# if config['loglevel'] == 'debug':
# LOGLEVEL = logging.DEBUG
# elif config['loglevel'] == 'info':
# LOGLEVEL = logging.INFO
# elif config['loglevel'] == 'warning':
# LOGLEVEL = logging.WARNING
# elif config['loglevel'] == 'error':
# LOGLEVEL = logging.ERROR
# elif config['loglevel'] == 'critical':
# LOGLEVEL = logging.CRITICAL
print "loaded"
try:
print "use level: ", config['log_level']
logging_config = {
'filename': '/var/log/linger_counter.log',
'format': '%(asctime)s [%(levelname)s] %(message)s',
'level': config['log_level']
}
except Exception, e:
print "exc: ", e
logging.config.dictConfig(**logging_config)
# logging.basicConfig(**logging_config)
print "do log"
logging.debug("debugggg")
logging.error("errrrrror")
logging.warning("warningg")
logging.critical("crit")
print "end log"
except Exception, e:
print "non"
print e
pass
#==============================================================================
logging_config = {
'filename': '/var/log/linger_counter.log',
'format': '%(asctime)s [%(levelname)s] %(message)s',
'level': LOGLEVEL
}
logging.basicConfig(**logging_config)
logging.debug("debugggg")
logging.error("errrrrror")
logging.warning("warningg")
#==============================================================================
try:
from lingerSettings import *
except:
# if no alternative path has been set, use the RPi path
lingerPath = "/home/pi/linger/"
#==============================================================================
# Handle arguments
#==============================================================================
PARSER = argparse.ArgumentParser(prog='linger', description=
'''This script checks the amount of devives saved in the
database and displays the number on a 7 segment display.
For more info on what Linger
does see README.md''',
formatter_class=RawTextHelpFormatter)
PARSER.add_argument('-db', default='probes', dest='db_name', metavar='filename',\
help='Database name. Defaults to probes.', action='store')
PARSER.add_argument('-v', dest='verbose', action='count',\
help='Verbose; can be used up to 3 times to set the verbose level.')
PARSER.add_argument('--version', action='version', version='%(prog)s version 0.1.0',\
help='Show program\'s version number and exit.')
ARGS = PARSER.parse_args()
#==============================================================================
# To be able to run this script for testing on a non-RPi device,
# we check if we're on RPi hardware and skip some code if not
onPi = True
if platform.machine() != "armv7l" and platform.machine() != "armv6l":
onPi = False
if ARGS.verbose > 2: print "onPi: False"
logging.info('Not a RPi, so running in limited mode')
else:
if ARGS.verbose > 2: print "onPi: True"
import tm1637
#==============================================================================
# Stop script if not running as root. Doing this after the argparse so you can still
# read the help info without sudo (using -h / --help flag)
if onPi and not os.geteuid() == 0:
sys.exit('Script must be run as root because of GPIO access')
#==============================================================================
# Add .sqlite to our database name if needed
if ARGS.db_name[-7:] != ".sqlite": ARGS.db_name += ".sqlite"
db_path = ''.join([lingerPath, ARGS.db_name])
print "dbpath: ", db_path
if onPi:
# Load our display so we can use it globally
Display = tm1637.TM1637(23,24,tm1637.BRIGHT_TYPICAL)
# Functions used to catch a kill signal so we can cleanly
# exit (like storing a database)
def set_exit_handler(func):
signal.signal(signal.SIGTERM, func)
def on_exit(sig, func=None):
if ARGS.verbose > 0: print "Received kill signal. Stop"
Display.Clear()
Display.SetBrightness(0)
sys.exit(1)
#=======================================================
# Get a user
def get_device_amount(con):
with con:
cur = con.cursor()
try:
cur.execute("SELECT COUNT(DISTINCT mac) AS amount FROM entries")
return cur.fetchone()[0]
except Exception, e: # simply return 0 if there was a problem
if ARGS.verbose > 0: print "Encountered a problem getting the device amount"
print e
logging.warning('Problem getting amount of devices from database')
return 0
#===========================================================
# Main program
#===========================================================
def main():
set_exit_handler(on_exit)
global Display
if onPi:
if ARGS.verbose > 2: print "On Pi: initiate display"
#Display = tm1637.TM1637(23,24,tm1637.BRIGHT_TYPICAL)
Display.Clear()
Display.SetBrightness(3)
Display.ShowInt(0)
#=========================================================
if ARGS.verbose > 1: print "Using database {}".format(db_path)
logging.info('Using database: {}'.format(db_path))
# Check if the database file exists, if not, retry every 10
# seconds, as the other scripts might need some time to
# create the file
firstError = True
while not os.path.isfile(db_path):
if firstError:
firstError = False;
logging.warning('Database file does not exist: {}'.format(db_path))
if ARGS.verbose > 0: print "Database {} does not exist".format(db_path)
logging.info('Retry in 10 seconds...')
if ARGS.verbose > 0: print "Retry in 10 seconds...".format(db_path)
time.sleep(10)
# Create a database connection, catch any trouble while connecting
try:
con = lite.connect("{}{}".format(lingerPath, ARGS.db_name))
cur = con.cursor()
except:
logging.error('Error connecting to database')
if ARGS.verbose > 0: print "Error connecting to database..."
while True:
amount = get_device_amount(con)
if amount > 9999:
amount = 9999
if onPi:
Display.ShowInt(amount)
time.sleep(5)
if __name__ == "__main__":
main()