summaryrefslogtreecommitdiffstats
path: root/tango/Uca
diff options
context:
space:
mode:
authorMatthias Vogelgesang <matthias.vogelgesang@kit.edu>2015-03-17 09:00:11 +0100
committerMatthias Vogelgesang <matthias.vogelgesang@kit.edu>2015-03-17 09:03:30 +0100
commit76479e9860ee7875b46de1ab47c5062179cffdc4 (patch)
tree7d61cb0e679857ae727a2769cc42a1816288cb8e /tango/Uca
parent36cf88b4b8a0cb149922c8276adc6010acb76dac (diff)
downloaduca-76479e9860ee7875b46de1ab47c5062179cffdc4.tar.gz
uca-76479e9860ee7875b46de1ab47c5062179cffdc4.tar.bz2
uca-76479e9860ee7875b46de1ab47c5062179cffdc4.tar.xz
uca-76479e9860ee7875b46de1ab47c5062179cffdc4.zip
Add TANGO server
Diffstat (limited to 'tango/Uca')
-rwxr-xr-xtango/Uca121
1 files changed, 121 insertions, 0 deletions
diff --git a/tango/Uca b/tango/Uca
new file mode 100755
index 0000000..47f4021
--- /dev/null
+++ b/tango/Uca
@@ -0,0 +1,121 @@
+#!/usr/bin/env python
+
+import sys
+import time
+import numpy as np
+import PyTango
+from gi.repository import Uca, GObject
+from PyTango import Attr, AttrWriteType, DevState
+from PyTango.server import Device, DeviceMeta, device_property, command, server_run
+
+try:
+ import tifffile
+ HAVE_TIFFFILE = True
+except ImportError:
+ print("Could not import tifffile, consider to install it")
+ HAVE_TIFFFILE = False
+
+
+def get_tango_type(prop):
+ mapping = {
+ GObject.TYPE_BOOLEAN: PyTango.CmdArgType.DevBoolean,
+ GObject.TYPE_CHAR: PyTango.CmdArgType.DevUChar,
+ GObject.TYPE_UCHAR: PyTango.CmdArgType.DevUChar,
+ GObject.TYPE_FLOAT: PyTango.CmdArgType.DevFloat,
+ GObject.TYPE_INT: PyTango.CmdArgType.DevShort, # DevInt is invalid?
+ GObject.TYPE_UINT: PyTango.CmdArgType.DevUShort, # check
+ GObject.TYPE_LONG: PyTango.CmdArgType.DevLong,
+ GObject.TYPE_DOUBLE: PyTango.CmdArgType.DevDouble,
+ GObject.TYPE_STRING: PyTango.CmdArgType.DevString,
+ }
+
+ return mapping.get(prop, None)
+
+
+def get_tango_write_type(prop):
+ if prop.flags & GObject.ParamFlags.WRITABLE:
+ if prop.flags & GObject.ParamFlags.READABLE:
+ return AttrWriteType.READ_WRITE
+
+ return AttrWriteType.WRITE
+
+ if prop.flags & GObject.ParamFlags.READABLE:
+ return AttrWriteType.READ
+
+ raise RuntimeError("{} has no valid param flag".format(prop.name))
+
+
+def prop_to_attr_name(name):
+ return name.replace('-', '_')
+
+
+def attr_to_prop_name(name):
+ return name.replace('_', '-')
+
+
+class Camera(Device):
+ __metaclass__ = DeviceMeta
+
+ camera = device_property(dtype=str, default_value='mock')
+
+ def init_device(self):
+ Device.init_device(self)
+ self.set_state(DevState.ON)
+ self.pm = Uca.PluginManager()
+ self.device = self.pm.get_camerah(self.camera, None)
+ self.attrs = {}
+
+ for prop in self.device.props:
+ tango_type = get_tango_type(prop.value_type)
+ write_type = get_tango_write_type(prop)
+
+ if tango_type:
+ name = prop_to_attr_name(prop.name)
+ attr = Attr(name, tango_type, write_type)
+ attr_props = PyTango.UserDefaultAttrProp()
+ attr_props.set_description(prop.blurb)
+ attr.set_default_properties(attr_props)
+
+ write_func = None
+
+ if write_type in (AttrWriteType.WRITE, AttrWriteType.READ_WRITE):
+ write_func = self.do_write
+
+ self.attrs[prop.name] = self.add_attribute(attr, self.do_read, write_func)
+
+ def grab(self):
+ array = np.empty(self.shape, dtype=self.dtype)
+ data = array.__array_interface__['data'][0]
+ self.device.grab(data)
+ return array
+
+ def do_read(self, attr):
+ name = attr_to_prop_name(attr.get_name())
+ attr.set_value(self.device.get_property(name))
+
+ def do_write(self, attr):
+ name = attr_to_prop_name(attr.get_name())
+ self.device.set_property(name, attr.get_write_value())
+
+ @command
+ def Start(self):
+ self.shape = (self.device.props.roi_height, self.device.props.roi_width)
+ self.dtype = np.uint16 if self.device.props.sensor_bitdepth > 8 else np.uint8
+ self.device.start_recording()
+
+ @command
+ def Stop(self):
+ self.device.stop_recording()
+
+ @command(dtype_in=str, doc_in="Path and filename to store frame")
+ def Store(self, path):
+ frame = self.grab()
+
+ if HAVE_TIFFFILE:
+ tifffile.imsave(path, frame)
+ else:
+ np.savez(open(path, 'wb'), frame)
+
+
+if __name__ == '__main__':
+ server_run((Camera,))