Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rework as a library with examples #1

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ develop-eggs/
dist/
downloads/
eggs/
lib/
lib64/
parts/
sdist/
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ pyseek
======

Python interface for Seek Thermal device

See pyseek/examples for example usage
184 changes: 184 additions & 0 deletions pyseek/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
'''pyseek thermal camera library'''

'''
This is based on pyseek by Fry-kun, which has these credits:

"Many thanks to the folks at eevblog, especially (in no particular order)
miguelvp, marshallh, mikeselectricstuff, sgstair and many others
for the inspiration to figure this out"

'''

import usb.core
import usb.util
from PIL import Image
from scipy.misc import toimage
import numpy, sys

class PySeekError(IOError):
'''pyseek thermal camera error'''
pass

class PySeek:
'''pyseek thermal camera control'''

def __init__(self):
self.calibration = None
self.dev = None
self.debug = False

def send_msg(self, bmRequestType, bRequest, wValue=0, wIndex=0, data_or_wLength=None, timeout=None):
'''send a message to the camera'''
ret = self.dev.ctrl_transfer(bmRequestType, bRequest, wValue, wIndex, data_or_wLength, timeout)
if ret != len(data_or_wLength):
raise PySeekError()

def receive_msg(self, bRequest, wValue, wIndex, data_or_wLength):
'''receive a message from camera'''
return self.dev.ctrl_transfer(0xC1, bRequest, wValue, wIndex, data_or_wLength)

def deinit(self):
'''Deinit the device'''
msg = '\x00\x00'
for i in range(3):
self.send_msg(0x41, 0x3C, 0, 0, msg)

def open(self):
'''find and open the camera. Raise PySeekError on error'''
# find our Seek Thermal device 289d:0010
self.dev = usb.core.find(idVendor=0x289d, idProduct=0x0010)
if not self.dev:
raise PySeekError()

# set the active configuration. With no arguments, the first configuration will be the active one
self.dev.set_configuration()

# get an endpoint instance
cfg = self.dev.get_active_configuration()
intf = cfg[(0,0)]

custom_match = lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT
ep = usb.util.find_descriptor(intf, custom_match=custom_match) # match the first OUT endpoint
assert ep is not None

# Setup device
try:
msg = '\x01'
self.send_msg(0x41, 0x54, 0, 0, msg)
except Exception as e:
self.deinit()
msg = '\x01'
self.send_msg(0x41, 0x54, 0, 0, msg)

# Some day we will figure out what all this init stuff is and
# what the returned values mean.
self.send_msg(0x41, 0x3C, 0, 0, '\x00\x00')
ret1 = self.receive_msg(0x4E, 0, 0, 4)
ret2 = self.receive_msg(0x36, 0, 0, 12)

self.send_msg(0x41, 0x56, 0, 0, '\x20\x00\x30\x00\x00\x00')
ret3 = self.receive_msg(0x58, 0, 0, 0x40)

self.send_msg(0x41, 0x56, 0, 0, '\x20\x00\x50\x00\x00\x00')
ret4 = self.receive_msg(0x58, 0, 0, 0x40)

self.send_msg(0x41, 0x56, 0, 0, '\x0C\x00\x70\x00\x00\x00')
ret5 = self.receive_msg(0x58, 0, 0, 0x18)

self.send_msg(0x41, 0x56, 0, 0, '\x06\x00\x08\x00\x00\x00')
ret6 = self.receive_msg(0x58, 0, 0, 0x0C)

self.send_msg(0x41, 0x3E, 0, 0, '\x08\x00')
ret7 = self.receive_msg(0x3D, 0, 0, 2)

self.send_msg(0x41, 0x3E, 0, 0, '\x08\x00')
self.send_msg(0x41, 0x3C, 0, 0, '\x01\x00')
ret8 = self.receive_msg(0x3D, 0, 0, 2)

def cal_ok(self, x, y):
value = self.calibration[x][y]
return value != 0 and value < 15000

def get_array(self):
'''return next image from the camera as a numpy array. Raise PySeekError on error'''
tries = 100
while tries:
tries -= 1
# Send read frame request
self.send_msg(0x41, 0x53, 0, 0, '\xC0\x7E\x00\x00')
try:
ret9 = self.dev.read(0x81, 0x3F60, 1000)
ret9 += self.dev.read(0x81, 0x3F60, 1000)
ret9 += self.dev.read(0x81, 0x3F60, 1000)
ret9 += self.dev.read(0x81, 0x3F60, 1000)
except usb.USBError as e:
raise PySeekError()

# Let's see what type of frame it is
# 1 is a Normal frame, 3 is a Calibration frame
# 6 may be a pre-calibration frame
# 5, 10 other... who knows.
status = ret9[20]
if self.debug:
print ('%5d'*21 ) % tuple([ret9[x] for x in range(21)])
print(status, len(ret9))

if status == 1:
# Convert the raw calibration data to a string array
calimg = Image.fromstring("I", (208,156), ret9, "raw", "I;16")

# Convert the string array to an unsigned numpy int16 array
im2arr = numpy.asarray(calimg)
self.calibration = im2arr.astype('uint16')

if status == 3 and self.calibration is not None:
# Convert the raw image data to a string array
img = Image.fromstring("I", (208,156), ret9, "raw", "I;16")

# Convert the string array to an unsigned numpy int16 array
im1arr = numpy.asarray(img)
im1arrF = im1arr.astype('uint16')

if self.debug:
# Subtract the calibration array from the image array and add an offset
print("Calibration:")
for x in range(30):
for y in range(10):
sys.stdout.write("%4u " % self.calibration[x][y])
print("")
print("Data:")
for x in range(30):
for y in range(10):
sys.stdout.write("%4u " % im1arrF[x][y])
print("")


ret = (im1arrF-self.calibration) + 800

# for some strange reason there are blank lines and
# gaps. This is a rough attempt to fill those in.
# it still leaves some speckling
bad_cal1 = numpy.where(self.calibration == 0)
bad_cal2 = numpy.where(self.calibration > 15000)
xidx = list(bad_cal1[0]) + list(bad_cal2[0])
yidx = list(bad_cal1[1]) + list(bad_cal2[1])
for i in range(len(xidx)):
x = xidx[i]
y = yidx[i]
if x > 0 and self.cal_ok(x-1,y):
ret[x][y] = ret[x-1][y]
elif x < 155 and self.cal_ok(x+1,y):
ret[x][y] = ret[x+1][y]
elif y > 0 and self.cal_ok(x,y-1):
ret[x][y] = ret[x][y-1]
elif y < 207 and self.cal_ok(x,y+1):
ret[x][y] = ret[x][y+1]
return ret

raise PySeekError()

def get_image(self):
'''return next image from the camera as a scipy image. Raise PySeekError on error'''
a = self.get_array()
return toimage(a)

21 changes: 21 additions & 0 deletions pyseek/examples/pyseek_capture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python

'''pyseek thermal camera library - capture example'''

import pyseek
from pyseek.lib.PGM import PGM_write
from pyseek.lib.heatmap import thermal_map

seek = pyseek.PySeek()
seek.open()
for i in range(100):
img = seek.get_array()
colour = thermal_map(img)

filename = 'seek-%u.pgm' % i
PGM_write(filename, img)
print("Saved %s" % filename)

filename = 'seek-%u.png' % i
colour.save(filename)
print("Saved colour %s" % filename)
53 changes: 53 additions & 0 deletions pyseek/examples/pyseek_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python

'''pyseek thermal camera library - viewer example'''

import Tkinter
import pyseek
from pyseek.lib.PGM import PGM_write
from PIL import ImageTk
import sys, os, time

def show_frame(seek, first=False):
global fps_t, fps_f

from scipy.misc import toimage

arr = seek.get_array()
disp_img = toimage(arr)

if first:
root.geometry('%dx%d' % (disp_img.size[0], disp_img.size[1]))
tkpi = ImageTk.PhotoImage(disp_img)
label_image.imgtk = tkpi
label_image.configure(image=tkpi)
label_image.place(x=0, y=0, width=disp_img.size[0], height=disp_img.size[1])

now = int(time.time())
fps_f += 1
if fps_t == 0:
fps_t = now
elif fps_t < now:
print '\rFPS: %.2f' % (1.0 * fps_f / (now-fps_t)),
sys.stdout.flush()
fps_t = now
fps_f = 0

label_image.after(1, show_frame, seek) # after 1ms, run show_frame again


seek = pyseek.PySeek()
seek.open()

root = Tkinter.Tk()
root.title('Seek Thermal camera')
root.bind("<Escape>", lambda e: root.quit())

label_image = Tkinter.Label(root)
label_image.pack()

fps_t = 0
fps_f = 0

show_frame(seek, first=True)
root.mainloop() # UI has control until user presses <<Escape>>
74 changes: 74 additions & 0 deletions pyseek/lib/PGM.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
'''16 bit PGM read/write code'''

import numpy

class PGMError(Exception):
'''PGMLink error class'''
def __init__(self, msg):
Exception.__init__(self, msg)

def PGM_read(filename):
'''read a 8/16 bit PGM image, returning a numpy array'''
f = open(filename, mode='rb')
fmt = f.readline()
if fmt.strip() != 'P5':
raise PGMError('Expected P5 image in %s' % filename)
dims = f.readline()
dims = dims.split(' ')
width = int(dims[0])
height = int(dims[1])
line = f.readline()
if line[0] == '#':
# discard comment
line = f.readline()
line = line.strip()
if line == "65535":
eightbit = False
elif line == "255":
eightbit = True
else:
raise PGMError('Expected 8/16 bit image image in %s - got %s' % (filename, line))
if eightbit:
rawdata = numpy.fromfile(f, dtype='uint8')
rawdata = numpy.reshape(rawdata, (height,width))
else:
rawdata = numpy.fromfile(f, dtype='uint16')
rawdata = rawdata.byteswap(True)
rawdata = numpy.reshape(rawdata, (height, width))
f.close()
return rawdata


def PGM_write(filename, rawdata):
'''write a 8/16 bit PGM image given a numpy array'''
if rawdata.dtype == numpy.dtype('uint8'):
numvalues = 255
elif rawdata.dtype == numpy.dtype('uint16'):
numvalues = 65535
else:
raise PGMError("Invalid array data type '%s'" % rawdata.dtype)
shape = rawdata.shape
if len(shape) != 2:
raise PGMError("Invalid array shape '%s'" % shape)
height = shape[0]
width = shape[1]

f = open(filename, mode='wb')
f.write('''P5
%u %u
%u
''' % (width, height, numvalues))

rawdata = rawdata.byteswap(False)
rawdata = rawdata.tofile(f)
f.close()

if __name__ == "__main__":
import sys
filename = sys.argv[1]
print("Reading %s" % filename)
a = PGM_read(filename)

filename = filename + ".test"
print("Writing %s" % filename)
PGM_write(filename, a)
1 change: 1 addition & 0 deletions pyseek/lib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
'''pyseek thermal camera library - library code'''
45 changes: 45 additions & 0 deletions pyseek/lib/heatmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
'''map a greyscale thermal image to a color heatmap'''

import numpy, PIL

def thermal_map(rawimage, green_threshold=0.4, blue_threshold=0.75, clip_high=None, clip_low=None):
'''take a greyscale thermal image and return a colour heatmap'''

(width,height) = rawimage.shape
minv = numpy.amin(rawimage)
maxv = numpy.amax(rawimage)

if clip_high is None:
clip_high = maxv

# remove the bottom part of the range to reduce noise
if clip_low is None:
clip_low = minv + (clip_high-minv)/20.0

# create blank RGB image
rgb = numpy.empty((width,height,3), dtype=numpy.uint8)

# clip input to range
clipped = numpy.clip(rawimage, clip_low, clip_high)

# scale to 0..1
scaled = (clipped - clip_low) / (clip_high - clip_low)

# red component linear with input
r = scaled

# green as triangle function about green_threshold
g = numpy.where((scaled>green_threshold), 1.0 - (scaled - green_threshold)/(1.0-green_threshold), 0)
g += numpy.where((scaled<=green_threshold), 1.0 - (green_threshold-scaled)/green_threshold, 0)

# blue as triangle function about blue_threshold
b = numpy.where((scaled>blue_threshold), 1.0 - (scaled - blue_threshold)/(1.0-blue_threshold), 0)
b += numpy.where((scaled<=blue_threshold), 1.0 - (blue_threshold-scaled)/blue_threshold, 0)

# fill in RGB array
rgb[...,0] = r*255
rgb[...,1] = g*255
rgb[...,2] = b*255

ret = PIL.Image.fromarray(rgb, 'RGB')
return ret
Loading