summaryrefslogtreecommitdiffstats
path: root/tango/Uca
blob: 47f4021e949e4d2edd6cf94b39cec724a20cf635 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python

import sys
import time
import numpy as np
import PyTango
from gi.repository import Uca, GObject
from PyTango import Attr, AttrWriteType, DevState
from PyTango.server import Device, DeviceMeta, device_property, command, server_run

try:
    import tifffile
    HAVE_TIFFFILE = True
except ImportError:
    print("Could not import tifffile, consider to install it")
    HAVE_TIFFFILE = False


def get_tango_type(prop):
    mapping = {
        GObject.TYPE_BOOLEAN: PyTango.CmdArgType.DevBoolean,
        GObject.TYPE_CHAR: PyTango.CmdArgType.DevUChar,
        GObject.TYPE_UCHAR: PyTango.CmdArgType.DevUChar,
        GObject.TYPE_FLOAT: PyTango.CmdArgType.DevFloat,
        GObject.TYPE_INT: PyTango.CmdArgType.DevShort,      # DevInt is invalid?
        GObject.TYPE_UINT: PyTango.CmdArgType.DevUShort,    # check
        GObject.TYPE_LONG: PyTango.CmdArgType.DevLong,
        GObject.TYPE_DOUBLE: PyTango.CmdArgType.DevDouble,
        GObject.TYPE_STRING: PyTango.CmdArgType.DevString,
    }

    return mapping.get(prop, None)


def get_tango_write_type(prop):
    if prop.flags & GObject.ParamFlags.WRITABLE:
        if prop.flags & GObject.ParamFlags.READABLE:
            return AttrWriteType.READ_WRITE

        return AttrWriteType.WRITE

    if prop.flags & GObject.ParamFlags.READABLE:
        return AttrWriteType.READ

    raise RuntimeError("{} has no valid param flag".format(prop.name))


def prop_to_attr_name(name):
    return name.replace('-', '_')


def attr_to_prop_name(name):
    return name.replace('_', '-')


class Camera(Device):
    __metaclass__ = DeviceMeta

    camera = device_property(dtype=str, default_value='mock')

    def init_device(self):
        Device.init_device(self)
        self.set_state(DevState.ON)
        self.pm = Uca.PluginManager()
        self.device = self.pm.get_camerah(self.camera, None)
        self.attrs = {}

        for prop in self.device.props:
            tango_type = get_tango_type(prop.value_type)
            write_type = get_tango_write_type(prop)

            if tango_type:
                name = prop_to_attr_name(prop.name)
                attr = Attr(name, tango_type, write_type)
                attr_props = PyTango.UserDefaultAttrProp()
                attr_props.set_description(prop.blurb)
                attr.set_default_properties(attr_props)

                write_func = None

                if write_type in (AttrWriteType.WRITE, AttrWriteType.READ_WRITE):
                    write_func = self.do_write

                self.attrs[prop.name] = self.add_attribute(attr, self.do_read, write_func)

    def grab(self):
        array = np.empty(self.shape, dtype=self.dtype)
        data = array.__array_interface__['data'][0]
        self.device.grab(data)
        return array

    def do_read(self, attr):
        name = attr_to_prop_name(attr.get_name())
        attr.set_value(self.device.get_property(name))

    def do_write(self, attr):
        name = attr_to_prop_name(attr.get_name())
        self.device.set_property(name, attr.get_write_value())

    @command
    def Start(self):
        self.shape = (self.device.props.roi_height, self.device.props.roi_width)
        self.dtype = np.uint16 if self.device.props.sensor_bitdepth > 8 else np.uint8
        self.device.start_recording()

    @command
    def Stop(self):
        self.device.stop_recording()

    @command(dtype_in=str, doc_in="Path and filename to store frame")
    def Store(self, path):
        frame = self.grab()

        if HAVE_TIFFFILE:
            tifffile.imsave(path, frame)
        else:
            np.savez(open(path, 'wb'), frame)


if __name__ == '__main__':
    server_run((Camera,))