itla.itla

  1import serial
  2import yaml
  3from pkg_resources import resource_filename
  4from serial.serialutil import SerialException
  5
  6from . import logger
  7from .itla_errors import (
  8    AEAException,
  9    CIEError,
 10    CIIError,
 11    CIPError,
 12    CPException,
 13    EREError,
 14    EROError,
 15    ExecutionError,
 16    EXFError,
 17    IVCError,
 18    NOPException,
 19    RNIError,
 20    RNWError,
 21    RVEError,
 22    VSEError,
 23)
 24from .utils import compute_checksum
 25
 26
 27class ITLABase:
 28    """
 29    A class that represents the an ITLA
 30    and exposes a user friendly API for controlling functionality.
 31
 32    Things to figure out
 33
 34      * What should be exposed to the user?
 35      * Can we abstract away the concept of registers and stuff
 36        in here so you don't have to deal with it.
 37
 38    There are some functions that could be implemented like set_fatalstatus.
 39    I think this is probably a bad idea even though it isnt write only.
 40
 41    set_frequency is my platonic ideal for the higher level functions.
 42    """
 43
 44    _nop_errors = {
 45        0x00: "OK: No errors.",
 46        0x01: RNIError("RNI: Register not implemented."),
 47        0x02: RNWError("RNW: Register not writable"),
 48        0x03: RVEError("RVE: Register Value range Error."),
 49        0x04: CIPError("CIP: Command Ignored due to Pending operation"),
 50        0x05: CIIError("CII: Command Ignored Initializing"),
 51        0x06: EREError("ERE: Extended address Range Error (address invalid)"),
 52        0x07: EROError("ERO: Extended address is read only"),
 53        0x08: EXFError("EXF: Execution general failure"),
 54        0x09: CIEError("CIE: Command ignored while module's optical output is enabled"),
 55        0x0A: IVCError("IVC: Invalid configuration command ignored."),
 56        0x0B: NOPException(),
 57        0x0C: NOPException(),
 58        0x0D: NOPException(),
 59        0x0E: NOPException(),
 60        0x0F: VSEError("VSE: Vendor specific error"),
 61    }
 62
 63    _response_status = {
 64        0x00: "OK",
 65        0x01: ExecutionError("Command returned execution error."),
 66        0x02: AEAException(
 67            "AEA: automatic extended addressing " + "being returned or ready to write."
 68        ),
 69        0x03: CPException("CP: Command not complete, pending."),
 70    }
 71
 72    def __init__(self, serial_port, baudrate, timeout=0.5, register_files=None):
 73        """TODO describe function
 74
 75        :param serial_port:
 76        :param baudrate:
 77        :param timeout:
 78        :returns:
 79
 80        """
 81        self._port = serial_port
 82        self._baudrate = baudrate
 83        self._timeout = timeout
 84        self._device = None
 85
 86        if register_files is None:
 87            register_files = []
 88
 89        # this function creates register functions
 90        def mkfn(*, fnname, register, description, readonly, signed, **_):
 91            # _ here to absorb unused things. This way the yaml
 92            # can contain more info without causing errors here.
 93            if readonly:
 94
 95                def reg_fun(self):
 96                    self.send_command(register, signed=signed)
 97                    return self.get_response(register)
 98
 99            else:
100
101                def reg_fun(self, data=None):
102                    self.send_command(register, data, signed=signed)
103                    return self.get_response(register)
104
105            reg_fun.__doc__ = description
106            reg_fun.__name__ = fnname
107            return reg_fun
108
109        for register_file in register_files:
110            register_file = resource_filename("itla", "registers/" + register_file)
111            with open(register_file, "r") as register_yaml:
112                register_spec = yaml.safe_load(register_yaml)
113
114                for register_name in register_spec:
115                    register_data = register_spec[register_name]
116                    setattr(
117                        ITLABase, "_" + register_data["fnname"], mkfn(**register_data)
118                    )
119
120    def __enter__(self):
121        """TODO describe function
122
123        :returns:
124
125        """
126        self.connect()
127
128    def __exit__(self, exc_type, exc_value, traceback):
129        """TODO describe function
130
131        :param exc_type:
132        :param exc_value:
133        :param traceback:
134        :returns:
135
136        """
137        self.disconnect()
138
139    def __del__(self):
140        """TODO describe function
141
142        :returns:
143
144        """
145        if self._device is not None:
146            self.disconnect()
147
148    def connect(self):
149        """Establishes a serial connection with the port provided
150
151        **For some reason on Linux opening the serial port causes some
152        power output from the laser before it has been activated. This behavior
153        does not occur on Windows.**
154
155        """
156        try:
157            self._device = serial.Serial(
158                self._port, self._baudrate, timeout=self._timeout
159            )
160        except SerialException:
161            raise SerialException("Connection to " + self._port + " unsuccessful.")
162
163    def disconnect(self, leave_on=False):
164        """Ends the serial connection to the laser
165
166        :param leave_on:
167        :returns:
168
169        """
170        if not self._device.is_open:
171            return
172
173        if not leave_on:
174            self.disable()
175
176        try:
177            self._device.close()
178        except AttributeError:
179            # When does this error occur?
180            # There are a few ways disconnect can be called.
181            # 1) It can be called purposefully.
182            # 2) It can be called by ending a `with` (ie __exit__)
183            # 3) It can be called by exiting a repl or a script ending (ie. __del__).
184            pass
185
186    def send_command(self, register, data=None, signed=False):
187        """Sends commands to a device.
188        This function takes the hexstring, turns it into a bytestring,
189        and writes it to the device.
190        This function should probably be hidden from the user.
191
192        :param device: Should be a Serial object that you can write to.
193        :param hexstring: a hexstring to send to the device
194        :returns: nothing
195        """
196
197        write = data is not None
198
199        # convert to register to a bytestring
200        register_bytes = register.to_bytes(1, "big")
201
202        # convert data to bytestring
203        if write:
204            data_bytes = data.to_bytes(2, "big", signed=signed)
205
206        else:
207            data_bytes = (0).to_bytes(2, "big")
208
209        # compute the checksum
210        checksum = compute_checksum(
211            (write.to_bytes(1, "big") + register_bytes + data_bytes).hex()
212        )
213
214        # compute and convery header to bytestring
215        header = checksum * 16 + write
216        header_bytes = header.to_bytes(1, "big")
217
218        # form full command and send.
219        command = header_bytes + register_bytes + data_bytes
220        self._device.write(command)
221
222    def get_response(self, register):
223        """This function should read from self._device. This should be called
224
225        :param register:
226        :returns: ???
227
228        """
229        # read four bytes
230        response = self._device.read(4)
231
232        logger.debug(f"response: {response.hex()}")
233
234        # get the checksum and ... check it.
235        checksum = int(response.hex()[0], 16)
236        computed_checksum = compute_checksum(response.hex())
237
238        if computed_checksum != checksum:
239            raise Exception(
240                f"Communication error expected {checksum} got " + f"{computed_checksum}"
241            )
242
243        status = int(f"{response[0]:08b}"[-2:], 2)
244        logger.debug(f"status: {status}")
245
246        try:
247            raise self._response_status[status]
248
249        except TypeError:
250            # a type error occurs if we try to raise "OK"
251            # in this situation we should just pass.
252            pass
253
254        except AEAException:
255            response = self.read_aea()
256            return response
257
258        if register != response[1]:
259            raise Exception(
260                "The returned register does not match "
261                + "the register sent to the device. "
262                + f"Got {response[1]} expected {register}."
263            )
264
265        return response[2:]
266
267    def upgrade_firmware(self, firmware_file):
268        """This function should update the firmware for the laser."""
269
270        # STEPS:
271        # 1) release
272        # 2) set baudrate to 115200
273        # 3) disconnect and reconnect at update baudrate
274        # 4) send dlconfig signal
275        # 5) ????
276        # 6) profit
277        raise Warning("this is not implemented yet.")
278
279
280class ITLA:
281    """
282    The class that users should interface and create ITLA objects from.
283    Basically just used to select the appropriate subclass of ITLABase.
284    """
285
286    def __new__(cls, *args, version="1.3", **kwargs):
287        """
288        this function is executed on initialization of an object before
289        __init__ here we divert the initialization process to initialize
290        one of the subclasses of ITLABase.
291
292        I would love it if there were a way to make everything a
293        subclass of ITLA and merge the ITLA and ITLABase classes.
294        This would streamline some things and make it easier to do type
295        checking since it only makes sense that all ITLA laser objects would
296        be subclasses of ITLA not some random arbitrary ITLABase.
297
298        The current issue in doing this is that we end up with a circular
299        import I think.
300        """
301        from .itla12 import ITLA12
302        from .itla13 import ITLA13
303
304        class_dict = {
305            "1.3": ITLA13,
306            "1.2": ITLA12,
307        }
308
309        return class_dict[version](*args, **kwargs)
class ITLABase:
 28class ITLABase:
 29    """
 30    A class that represents the an ITLA
 31    and exposes a user friendly API for controlling functionality.
 32
 33    Things to figure out
 34
 35      * What should be exposed to the user?
 36      * Can we abstract away the concept of registers and stuff
 37        in here so you don't have to deal with it.
 38
 39    There are some functions that could be implemented like set_fatalstatus.
 40    I think this is probably a bad idea even though it isnt write only.
 41
 42    set_frequency is my platonic ideal for the higher level functions.
 43    """
 44
 45    _nop_errors = {
 46        0x00: "OK: No errors.",
 47        0x01: RNIError("RNI: Register not implemented."),
 48        0x02: RNWError("RNW: Register not writable"),
 49        0x03: RVEError("RVE: Register Value range Error."),
 50        0x04: CIPError("CIP: Command Ignored due to Pending operation"),
 51        0x05: CIIError("CII: Command Ignored Initializing"),
 52        0x06: EREError("ERE: Extended address Range Error (address invalid)"),
 53        0x07: EROError("ERO: Extended address is read only"),
 54        0x08: EXFError("EXF: Execution general failure"),
 55        0x09: CIEError("CIE: Command ignored while module's optical output is enabled"),
 56        0x0A: IVCError("IVC: Invalid configuration command ignored."),
 57        0x0B: NOPException(),
 58        0x0C: NOPException(),
 59        0x0D: NOPException(),
 60        0x0E: NOPException(),
 61        0x0F: VSEError("VSE: Vendor specific error"),
 62    }
 63
 64    _response_status = {
 65        0x00: "OK",
 66        0x01: ExecutionError("Command returned execution error."),
 67        0x02: AEAException(
 68            "AEA: automatic extended addressing " + "being returned or ready to write."
 69        ),
 70        0x03: CPException("CP: Command not complete, pending."),
 71    }
 72
 73    def __init__(self, serial_port, baudrate, timeout=0.5, register_files=None):
 74        """TODO describe function
 75
 76        :param serial_port:
 77        :param baudrate:
 78        :param timeout:
 79        :returns:
 80
 81        """
 82        self._port = serial_port
 83        self._baudrate = baudrate
 84        self._timeout = timeout
 85        self._device = None
 86
 87        if register_files is None:
 88            register_files = []
 89
 90        # this function creates register functions
 91        def mkfn(*, fnname, register, description, readonly, signed, **_):
 92            # _ here to absorb unused things. This way the yaml
 93            # can contain more info without causing errors here.
 94            if readonly:
 95
 96                def reg_fun(self):
 97                    self.send_command(register, signed=signed)
 98                    return self.get_response(register)
 99
100            else:
101
102                def reg_fun(self, data=None):
103                    self.send_command(register, data, signed=signed)
104                    return self.get_response(register)
105
106            reg_fun.__doc__ = description
107            reg_fun.__name__ = fnname
108            return reg_fun
109
110        for register_file in register_files:
111            register_file = resource_filename("itla", "registers/" + register_file)
112            with open(register_file, "r") as register_yaml:
113                register_spec = yaml.safe_load(register_yaml)
114
115                for register_name in register_spec:
116                    register_data = register_spec[register_name]
117                    setattr(
118                        ITLABase, "_" + register_data["fnname"], mkfn(**register_data)
119                    )
120
121    def __enter__(self):
122        """TODO describe function
123
124        :returns:
125
126        """
127        self.connect()
128
129    def __exit__(self, exc_type, exc_value, traceback):
130        """TODO describe function
131
132        :param exc_type:
133        :param exc_value:
134        :param traceback:
135        :returns:
136
137        """
138        self.disconnect()
139
140    def __del__(self):
141        """TODO describe function
142
143        :returns:
144
145        """
146        if self._device is not None:
147            self.disconnect()
148
149    def connect(self):
150        """Establishes a serial connection with the port provided
151
152        **For some reason on Linux opening the serial port causes some
153        power output from the laser before it has been activated. This behavior
154        does not occur on Windows.**
155
156        """
157        try:
158            self._device = serial.Serial(
159                self._port, self._baudrate, timeout=self._timeout
160            )
161        except SerialException:
162            raise SerialException("Connection to " + self._port + " unsuccessful.")
163
164    def disconnect(self, leave_on=False):
165        """Ends the serial connection to the laser
166
167        :param leave_on:
168        :returns:
169
170        """
171        if not self._device.is_open:
172            return
173
174        if not leave_on:
175            self.disable()
176
177        try:
178            self._device.close()
179        except AttributeError:
180            # When does this error occur?
181            # There are a few ways disconnect can be called.
182            # 1) It can be called purposefully.
183            # 2) It can be called by ending a `with` (ie __exit__)
184            # 3) It can be called by exiting a repl or a script ending (ie. __del__).
185            pass
186
187    def send_command(self, register, data=None, signed=False):
188        """Sends commands to a device.
189        This function takes the hexstring, turns it into a bytestring,
190        and writes it to the device.
191        This function should probably be hidden from the user.
192
193        :param device: Should be a Serial object that you can write to.
194        :param hexstring: a hexstring to send to the device
195        :returns: nothing
196        """
197
198        write = data is not None
199
200        # convert to register to a bytestring
201        register_bytes = register.to_bytes(1, "big")
202
203        # convert data to bytestring
204        if write:
205            data_bytes = data.to_bytes(2, "big", signed=signed)
206
207        else:
208            data_bytes = (0).to_bytes(2, "big")
209
210        # compute the checksum
211        checksum = compute_checksum(
212            (write.to_bytes(1, "big") + register_bytes + data_bytes).hex()
213        )
214
215        # compute and convery header to bytestring
216        header = checksum * 16 + write
217        header_bytes = header.to_bytes(1, "big")
218
219        # form full command and send.
220        command = header_bytes + register_bytes + data_bytes
221        self._device.write(command)
222
223    def get_response(self, register):
224        """This function should read from self._device. This should be called
225
226        :param register:
227        :returns: ???
228
229        """
230        # read four bytes
231        response = self._device.read(4)
232
233        logger.debug(f"response: {response.hex()}")
234
235        # get the checksum and ... check it.
236        checksum = int(response.hex()[0], 16)
237        computed_checksum = compute_checksum(response.hex())
238
239        if computed_checksum != checksum:
240            raise Exception(
241                f"Communication error expected {checksum} got " + f"{computed_checksum}"
242            )
243
244        status = int(f"{response[0]:08b}"[-2:], 2)
245        logger.debug(f"status: {status}")
246
247        try:
248            raise self._response_status[status]
249
250        except TypeError:
251            # a type error occurs if we try to raise "OK"
252            # in this situation we should just pass.
253            pass
254
255        except AEAException:
256            response = self.read_aea()
257            return response
258
259        if register != response[1]:
260            raise Exception(
261                "The returned register does not match "
262                + "the register sent to the device. "
263                + f"Got {response[1]} expected {register}."
264            )
265
266        return response[2:]
267
268    def upgrade_firmware(self, firmware_file):
269        """This function should update the firmware for the laser."""
270
271        # STEPS:
272        # 1) release
273        # 2) set baudrate to 115200
274        # 3) disconnect and reconnect at update baudrate
275        # 4) send dlconfig signal
276        # 5) ????
277        # 6) profit
278        raise Warning("this is not implemented yet.")

A class that represents the an ITLA and exposes a user friendly API for controlling functionality.

Things to figure out

  • What should be exposed to the user?
  • Can we abstract away the concept of registers and stuff in here so you don't have to deal with it.

There are some functions that could be implemented like set_fatalstatus. I think this is probably a bad idea even though it isnt write only.

set_frequency is my platonic ideal for the higher level functions.

ITLABase(serial_port, baudrate, timeout=0.5, register_files=None)
 73    def __init__(self, serial_port, baudrate, timeout=0.5, register_files=None):
 74        """TODO describe function
 75
 76        :param serial_port:
 77        :param baudrate:
 78        :param timeout:
 79        :returns:
 80
 81        """
 82        self._port = serial_port
 83        self._baudrate = baudrate
 84        self._timeout = timeout
 85        self._device = None
 86
 87        if register_files is None:
 88            register_files = []
 89
 90        # this function creates register functions
 91        def mkfn(*, fnname, register, description, readonly, signed, **_):
 92            # _ here to absorb unused things. This way the yaml
 93            # can contain more info without causing errors here.
 94            if readonly:
 95
 96                def reg_fun(self):
 97                    self.send_command(register, signed=signed)
 98                    return self.get_response(register)
 99
100            else:
101
102                def reg_fun(self, data=None):
103                    self.send_command(register, data, signed=signed)
104                    return self.get_response(register)
105
106            reg_fun.__doc__ = description
107            reg_fun.__name__ = fnname
108            return reg_fun
109
110        for register_file in register_files:
111            register_file = resource_filename("itla", "registers/" + register_file)
112            with open(register_file, "r") as register_yaml:
113                register_spec = yaml.safe_load(register_yaml)
114
115                for register_name in register_spec:
116                    register_data = register_spec[register_name]
117                    setattr(
118                        ITLABase, "_" + register_data["fnname"], mkfn(**register_data)
119                    )

TODO describe function

Parameters
  • serial_port:
  • baudrate:
  • timeout: :returns:
def connect(self):
149    def connect(self):
150        """Establishes a serial connection with the port provided
151
152        **For some reason on Linux opening the serial port causes some
153        power output from the laser before it has been activated. This behavior
154        does not occur on Windows.**
155
156        """
157        try:
158            self._device = serial.Serial(
159                self._port, self._baudrate, timeout=self._timeout
160            )
161        except SerialException:
162            raise SerialException("Connection to " + self._port + " unsuccessful.")

Establishes a serial connection with the port provided

For some reason on Linux opening the serial port causes some power output from the laser before it has been activated. This behavior does not occur on Windows.

def disconnect(self, leave_on=False):
164    def disconnect(self, leave_on=False):
165        """Ends the serial connection to the laser
166
167        :param leave_on:
168        :returns:
169
170        """
171        if not self._device.is_open:
172            return
173
174        if not leave_on:
175            self.disable()
176
177        try:
178            self._device.close()
179        except AttributeError:
180            # When does this error occur?
181            # There are a few ways disconnect can be called.
182            # 1) It can be called purposefully.
183            # 2) It can be called by ending a `with` (ie __exit__)
184            # 3) It can be called by exiting a repl or a script ending (ie. __del__).
185            pass

Ends the serial connection to the laser

Parameters
  • leave_on: :returns:
def send_command(self, register, data=None, signed=False):
187    def send_command(self, register, data=None, signed=False):
188        """Sends commands to a device.
189        This function takes the hexstring, turns it into a bytestring,
190        and writes it to the device.
191        This function should probably be hidden from the user.
192
193        :param device: Should be a Serial object that you can write to.
194        :param hexstring: a hexstring to send to the device
195        :returns: nothing
196        """
197
198        write = data is not None
199
200        # convert to register to a bytestring
201        register_bytes = register.to_bytes(1, "big")
202
203        # convert data to bytestring
204        if write:
205            data_bytes = data.to_bytes(2, "big", signed=signed)
206
207        else:
208            data_bytes = (0).to_bytes(2, "big")
209
210        # compute the checksum
211        checksum = compute_checksum(
212            (write.to_bytes(1, "big") + register_bytes + data_bytes).hex()
213        )
214
215        # compute and convery header to bytestring
216        header = checksum * 16 + write
217        header_bytes = header.to_bytes(1, "big")
218
219        # form full command and send.
220        command = header_bytes + register_bytes + data_bytes
221        self._device.write(command)

Sends commands to a device. This function takes the hexstring, turns it into a bytestring, and writes it to the device. This function should probably be hidden from the user.

Parameters
  • device: Should be a Serial object that you can write to.
  • hexstring: a hexstring to send to the device :returns: nothing
def get_response(self, register):
223    def get_response(self, register):
224        """This function should read from self._device. This should be called
225
226        :param register:
227        :returns: ???
228
229        """
230        # read four bytes
231        response = self._device.read(4)
232
233        logger.debug(f"response: {response.hex()}")
234
235        # get the checksum and ... check it.
236        checksum = int(response.hex()[0], 16)
237        computed_checksum = compute_checksum(response.hex())
238
239        if computed_checksum != checksum:
240            raise Exception(
241                f"Communication error expected {checksum} got " + f"{computed_checksum}"
242            )
243
244        status = int(f"{response[0]:08b}"[-2:], 2)
245        logger.debug(f"status: {status}")
246
247        try:
248            raise self._response_status[status]
249
250        except TypeError:
251            # a type error occurs if we try to raise "OK"
252            # in this situation we should just pass.
253            pass
254
255        except AEAException:
256            response = self.read_aea()
257            return response
258
259        if register != response[1]:
260            raise Exception(
261                "The returned register does not match "
262                + "the register sent to the device. "
263                + f"Got {response[1]} expected {register}."
264            )
265
266        return response[2:]

This function should read from self._device. This should be called

Parameters
  • register: :returns: ???
def upgrade_firmware(self, firmware_file):
268    def upgrade_firmware(self, firmware_file):
269        """This function should update the firmware for the laser."""
270
271        # STEPS:
272        # 1) release
273        # 2) set baudrate to 115200
274        # 3) disconnect and reconnect at update baudrate
275        # 4) send dlconfig signal
276        # 5) ????
277        # 6) profit
278        raise Warning("this is not implemented yet.")

This function should update the firmware for the laser.

class ITLA:
281class ITLA:
282    """
283    The class that users should interface and create ITLA objects from.
284    Basically just used to select the appropriate subclass of ITLABase.
285    """
286
287    def __new__(cls, *args, version="1.3", **kwargs):
288        """
289        this function is executed on initialization of an object before
290        __init__ here we divert the initialization process to initialize
291        one of the subclasses of ITLABase.
292
293        I would love it if there were a way to make everything a
294        subclass of ITLA and merge the ITLA and ITLABase classes.
295        This would streamline some things and make it easier to do type
296        checking since it only makes sense that all ITLA laser objects would
297        be subclasses of ITLA not some random arbitrary ITLABase.
298
299        The current issue in doing this is that we end up with a circular
300        import I think.
301        """
302        from .itla12 import ITLA12
303        from .itla13 import ITLA13
304
305        class_dict = {
306            "1.3": ITLA13,
307            "1.2": ITLA12,
308        }
309
310        return class_dict[version](*args, **kwargs)

The class that users should interface and create ITLA objects from. Basically just used to select the appropriate subclass of ITLABase.

ITLA(*args, version='1.3', **kwargs)
287    def __new__(cls, *args, version="1.3", **kwargs):
288        """
289        this function is executed on initialization of an object before
290        __init__ here we divert the initialization process to initialize
291        one of the subclasses of ITLABase.
292
293        I would love it if there were a way to make everything a
294        subclass of ITLA and merge the ITLA and ITLABase classes.
295        This would streamline some things and make it easier to do type
296        checking since it only makes sense that all ITLA laser objects would
297        be subclasses of ITLA not some random arbitrary ITLABase.
298
299        The current issue in doing this is that we end up with a circular
300        import I think.
301        """
302        from .itla12 import ITLA12
303        from .itla13 import ITLA13
304
305        class_dict = {
306            "1.3": ITLA13,
307            "1.2": ITLA12,
308        }
309
310        return class_dict[version](*args, **kwargs)

this function is executed on initialization of an object before __init__ here we divert the initialization process to initialize one of the subclasses of ITLABase.

I would love it if there were a way to make everything a subclass of ITLA and merge the ITLA and ITLABase classes. This would streamline some things and make it easier to do type checking since it only makes sense that all ITLA laser objects would be subclasses of ITLA not some random arbitrary ITLABase.

The current issue in doing this is that we end up with a circular import I think.