itla.itla
1import serial 2import yaml 3from pkg_resources import resource_filename 4from serial.serialutil import SerialException 5 6from . import logger 7from .itla_errors import ( 8 AEAException, 9 CIEError, 10 CIIError, 11 CIPError, 12 CPException, 13 EREError, 14 EROError, 15 ExecutionError, 16 EXFError, 17 IVCError, 18 NOPException, 19 RNIError, 20 RNWError, 21 RVEError, 22 VSEError, 23) 24from .utils import compute_checksum 25 26 27class ITLABase: 28 """ 29 A class that represents the an ITLA 30 and exposes a user friendly API for controlling functionality. 31 32 Things to figure out 33 34 * What should be exposed to the user? 35 * Can we abstract away the concept of registers and stuff 36 in here so you don't have to deal with it. 37 38 There are some functions that could be implemented like set_fatalstatus. 39 I think this is probably a bad idea even though it isnt write only. 40 41 set_frequency is my platonic ideal for the higher level functions. 42 """ 43 44 _nop_errors = { 45 0x00: "OK: No errors.", 46 0x01: RNIError("RNI: Register not implemented."), 47 0x02: RNWError("RNW: Register not writable"), 48 0x03: RVEError("RVE: Register Value range Error."), 49 0x04: CIPError("CIP: Command Ignored due to Pending operation"), 50 0x05: CIIError("CII: Command Ignored Initializing"), 51 0x06: EREError("ERE: Extended address Range Error (address invalid)"), 52 0x07: EROError("ERO: Extended address is read only"), 53 0x08: EXFError("EXF: Execution general failure"), 54 0x09: CIEError("CIE: Command ignored while module's optical output is enabled"), 55 0x0A: IVCError("IVC: Invalid configuration command ignored."), 56 0x0B: NOPException(), 57 0x0C: NOPException(), 58 0x0D: NOPException(), 59 0x0E: NOPException(), 60 0x0F: VSEError("VSE: Vendor specific error"), 61 } 62 63 _response_status = { 64 0x00: "OK", 65 0x01: ExecutionError("Command returned execution error."), 66 0x02: AEAException( 67 "AEA: automatic extended addressing " + "being returned or ready to write." 68 ), 69 0x03: CPException("CP: Command not complete, pending."), 70 } 71 72 def __init__(self, serial_port, baudrate, timeout=0.5, register_files=None): 73 """TODO describe function 74 75 :param serial_port: 76 :param baudrate: 77 :param timeout: 78 :returns: 79 80 """ 81 self._port = serial_port 82 self._baudrate = baudrate 83 self._timeout = timeout 84 self._device = None 85 86 if register_files is None: 87 register_files = [] 88 89 # this function creates register functions 90 def mkfn(*, fnname, register, description, readonly, signed, **_): 91 # _ here to absorb unused things. This way the yaml 92 # can contain more info without causing errors here. 93 if readonly: 94 95 def reg_fun(self): 96 self.send_command(register, signed=signed) 97 return self.get_response(register) 98 99 else: 100 101 def reg_fun(self, data=None): 102 self.send_command(register, data, signed=signed) 103 return self.get_response(register) 104 105 reg_fun.__doc__ = description 106 reg_fun.__name__ = fnname 107 return reg_fun 108 109 for register_file in register_files: 110 register_file = resource_filename("itla", "registers/" + register_file) 111 with open(register_file, "r") as register_yaml: 112 register_spec = yaml.safe_load(register_yaml) 113 114 for register_name in register_spec: 115 register_data = register_spec[register_name] 116 setattr( 117 ITLABase, "_" + register_data["fnname"], mkfn(**register_data) 118 ) 119 120 def __enter__(self): 121 """TODO describe function 122 123 :returns: 124 125 """ 126 self.connect() 127 128 def __exit__(self, exc_type, exc_value, traceback): 129 """TODO describe function 130 131 :param exc_type: 132 :param exc_value: 133 :param traceback: 134 :returns: 135 136 """ 137 self.disconnect() 138 139 def __del__(self): 140 """TODO describe function 141 142 :returns: 143 144 """ 145 if self._device is not None: 146 self.disconnect() 147 148 def connect(self): 149 """Establishes a serial connection with the port provided 150 151 **For some reason on Linux opening the serial port causes some 152 power output from the laser before it has been activated. This behavior 153 does not occur on Windows.** 154 155 """ 156 try: 157 self._device = serial.Serial( 158 self._port, self._baudrate, timeout=self._timeout 159 ) 160 except SerialException: 161 raise SerialException("Connection to " + self._port + " unsuccessful.") 162 163 def disconnect(self, leave_on=False): 164 """Ends the serial connection to the laser 165 166 :param leave_on: 167 :returns: 168 169 """ 170 if not self._device.is_open: 171 return 172 173 if not leave_on: 174 self.disable() 175 176 try: 177 self._device.close() 178 except AttributeError: 179 # When does this error occur? 180 # There are a few ways disconnect can be called. 181 # 1) It can be called purposefully. 182 # 2) It can be called by ending a `with` (ie __exit__) 183 # 3) It can be called by exiting a repl or a script ending (ie. __del__). 184 pass 185 186 def send_command(self, register, data=None, signed=False): 187 """Sends commands to a device. 188 This function takes the hexstring, turns it into a bytestring, 189 and writes it to the device. 190 This function should probably be hidden from the user. 191 192 :param device: Should be a Serial object that you can write to. 193 :param hexstring: a hexstring to send to the device 194 :returns: nothing 195 """ 196 197 write = data is not None 198 199 # convert to register to a bytestring 200 register_bytes = register.to_bytes(1, "big") 201 202 # convert data to bytestring 203 if write: 204 data_bytes = data.to_bytes(2, "big", signed=signed) 205 206 else: 207 data_bytes = (0).to_bytes(2, "big") 208 209 # compute the checksum 210 checksum = compute_checksum( 211 (write.to_bytes(1, "big") + register_bytes + data_bytes).hex() 212 ) 213 214 # compute and convery header to bytestring 215 header = checksum * 16 + write 216 header_bytes = header.to_bytes(1, "big") 217 218 # form full command and send. 219 command = header_bytes + register_bytes + data_bytes 220 self._device.write(command) 221 222 def get_response(self, register): 223 """This function should read from self._device. This should be called 224 225 :param register: 226 :returns: ??? 227 228 """ 229 # read four bytes 230 response = self._device.read(4) 231 232 logger.debug(f"response: {response.hex()}") 233 234 # get the checksum and ... check it. 235 checksum = int(response.hex()[0], 16) 236 computed_checksum = compute_checksum(response.hex()) 237 238 if computed_checksum != checksum: 239 raise Exception( 240 f"Communication error expected {checksum} got " + f"{computed_checksum}" 241 ) 242 243 status = int(f"{response[0]:08b}"[-2:], 2) 244 logger.debug(f"status: {status}") 245 246 try: 247 raise self._response_status[status] 248 249 except TypeError: 250 # a type error occurs if we try to raise "OK" 251 # in this situation we should just pass. 252 pass 253 254 except AEAException: 255 response = self.read_aea() 256 return response 257 258 if register != response[1]: 259 raise Exception( 260 "The returned register does not match " 261 + "the register sent to the device. " 262 + f"Got {response[1]} expected {register}." 263 ) 264 265 return response[2:] 266 267 def upgrade_firmware(self, firmware_file): 268 """This function should update the firmware for the laser.""" 269 270 # STEPS: 271 # 1) release 272 # 2) set baudrate to 115200 273 # 3) disconnect and reconnect at update baudrate 274 # 4) send dlconfig signal 275 # 5) ???? 276 # 6) profit 277 raise Warning("this is not implemented yet.") 278 279 280class ITLA: 281 """ 282 The class that users should interface and create ITLA objects from. 283 Basically just used to select the appropriate subclass of ITLABase. 284 """ 285 286 def __new__(cls, *args, version="1.3", **kwargs): 287 """ 288 this function is executed on initialization of an object before 289 __init__ here we divert the initialization process to initialize 290 one of the subclasses of ITLABase. 291 292 I would love it if there were a way to make everything a 293 subclass of ITLA and merge the ITLA and ITLABase classes. 294 This would streamline some things and make it easier to do type 295 checking since it only makes sense that all ITLA laser objects would 296 be subclasses of ITLA not some random arbitrary ITLABase. 297 298 The current issue in doing this is that we end up with a circular 299 import I think. 300 """ 301 from .itla12 import ITLA12 302 from .itla13 import ITLA13 303 304 class_dict = { 305 "1.3": ITLA13, 306 "1.2": ITLA12, 307 } 308 309 return class_dict[version](*args, **kwargs)
28class ITLABase: 29 """ 30 A class that represents the an ITLA 31 and exposes a user friendly API for controlling functionality. 32 33 Things to figure out 34 35 * What should be exposed to the user? 36 * Can we abstract away the concept of registers and stuff 37 in here so you don't have to deal with it. 38 39 There are some functions that could be implemented like set_fatalstatus. 40 I think this is probably a bad idea even though it isnt write only. 41 42 set_frequency is my platonic ideal for the higher level functions. 43 """ 44 45 _nop_errors = { 46 0x00: "OK: No errors.", 47 0x01: RNIError("RNI: Register not implemented."), 48 0x02: RNWError("RNW: Register not writable"), 49 0x03: RVEError("RVE: Register Value range Error."), 50 0x04: CIPError("CIP: Command Ignored due to Pending operation"), 51 0x05: CIIError("CII: Command Ignored Initializing"), 52 0x06: EREError("ERE: Extended address Range Error (address invalid)"), 53 0x07: EROError("ERO: Extended address is read only"), 54 0x08: EXFError("EXF: Execution general failure"), 55 0x09: CIEError("CIE: Command ignored while module's optical output is enabled"), 56 0x0A: IVCError("IVC: Invalid configuration command ignored."), 57 0x0B: NOPException(), 58 0x0C: NOPException(), 59 0x0D: NOPException(), 60 0x0E: NOPException(), 61 0x0F: VSEError("VSE: Vendor specific error"), 62 } 63 64 _response_status = { 65 0x00: "OK", 66 0x01: ExecutionError("Command returned execution error."), 67 0x02: AEAException( 68 "AEA: automatic extended addressing " + "being returned or ready to write." 69 ), 70 0x03: CPException("CP: Command not complete, pending."), 71 } 72 73 def __init__(self, serial_port, baudrate, timeout=0.5, register_files=None): 74 """TODO describe function 75 76 :param serial_port: 77 :param baudrate: 78 :param timeout: 79 :returns: 80 81 """ 82 self._port = serial_port 83 self._baudrate = baudrate 84 self._timeout = timeout 85 self._device = None 86 87 if register_files is None: 88 register_files = [] 89 90 # this function creates register functions 91 def mkfn(*, fnname, register, description, readonly, signed, **_): 92 # _ here to absorb unused things. This way the yaml 93 # can contain more info without causing errors here. 94 if readonly: 95 96 def reg_fun(self): 97 self.send_command(register, signed=signed) 98 return self.get_response(register) 99 100 else: 101 102 def reg_fun(self, data=None): 103 self.send_command(register, data, signed=signed) 104 return self.get_response(register) 105 106 reg_fun.__doc__ = description 107 reg_fun.__name__ = fnname 108 return reg_fun 109 110 for register_file in register_files: 111 register_file = resource_filename("itla", "registers/" + register_file) 112 with open(register_file, "r") as register_yaml: 113 register_spec = yaml.safe_load(register_yaml) 114 115 for register_name in register_spec: 116 register_data = register_spec[register_name] 117 setattr( 118 ITLABase, "_" + register_data["fnname"], mkfn(**register_data) 119 ) 120 121 def __enter__(self): 122 """TODO describe function 123 124 :returns: 125 126 """ 127 self.connect() 128 129 def __exit__(self, exc_type, exc_value, traceback): 130 """TODO describe function 131 132 :param exc_type: 133 :param exc_value: 134 :param traceback: 135 :returns: 136 137 """ 138 self.disconnect() 139 140 def __del__(self): 141 """TODO describe function 142 143 :returns: 144 145 """ 146 if self._device is not None: 147 self.disconnect() 148 149 def connect(self): 150 """Establishes a serial connection with the port provided 151 152 **For some reason on Linux opening the serial port causes some 153 power output from the laser before it has been activated. This behavior 154 does not occur on Windows.** 155 156 """ 157 try: 158 self._device = serial.Serial( 159 self._port, self._baudrate, timeout=self._timeout 160 ) 161 except SerialException: 162 raise SerialException("Connection to " + self._port + " unsuccessful.") 163 164 def disconnect(self, leave_on=False): 165 """Ends the serial connection to the laser 166 167 :param leave_on: 168 :returns: 169 170 """ 171 if not self._device.is_open: 172 return 173 174 if not leave_on: 175 self.disable() 176 177 try: 178 self._device.close() 179 except AttributeError: 180 # When does this error occur? 181 # There are a few ways disconnect can be called. 182 # 1) It can be called purposefully. 183 # 2) It can be called by ending a `with` (ie __exit__) 184 # 3) It can be called by exiting a repl or a script ending (ie. __del__). 185 pass 186 187 def send_command(self, register, data=None, signed=False): 188 """Sends commands to a device. 189 This function takes the hexstring, turns it into a bytestring, 190 and writes it to the device. 191 This function should probably be hidden from the user. 192 193 :param device: Should be a Serial object that you can write to. 194 :param hexstring: a hexstring to send to the device 195 :returns: nothing 196 """ 197 198 write = data is not None 199 200 # convert to register to a bytestring 201 register_bytes = register.to_bytes(1, "big") 202 203 # convert data to bytestring 204 if write: 205 data_bytes = data.to_bytes(2, "big", signed=signed) 206 207 else: 208 data_bytes = (0).to_bytes(2, "big") 209 210 # compute the checksum 211 checksum = compute_checksum( 212 (write.to_bytes(1, "big") + register_bytes + data_bytes).hex() 213 ) 214 215 # compute and convery header to bytestring 216 header = checksum * 16 + write 217 header_bytes = header.to_bytes(1, "big") 218 219 # form full command and send. 220 command = header_bytes + register_bytes + data_bytes 221 self._device.write(command) 222 223 def get_response(self, register): 224 """This function should read from self._device. This should be called 225 226 :param register: 227 :returns: ??? 228 229 """ 230 # read four bytes 231 response = self._device.read(4) 232 233 logger.debug(f"response: {response.hex()}") 234 235 # get the checksum and ... check it. 236 checksum = int(response.hex()[0], 16) 237 computed_checksum = compute_checksum(response.hex()) 238 239 if computed_checksum != checksum: 240 raise Exception( 241 f"Communication error expected {checksum} got " + f"{computed_checksum}" 242 ) 243 244 status = int(f"{response[0]:08b}"[-2:], 2) 245 logger.debug(f"status: {status}") 246 247 try: 248 raise self._response_status[status] 249 250 except TypeError: 251 # a type error occurs if we try to raise "OK" 252 # in this situation we should just pass. 253 pass 254 255 except AEAException: 256 response = self.read_aea() 257 return response 258 259 if register != response[1]: 260 raise Exception( 261 "The returned register does not match " 262 + "the register sent to the device. " 263 + f"Got {response[1]} expected {register}." 264 ) 265 266 return response[2:] 267 268 def upgrade_firmware(self, firmware_file): 269 """This function should update the firmware for the laser.""" 270 271 # STEPS: 272 # 1) release 273 # 2) set baudrate to 115200 274 # 3) disconnect and reconnect at update baudrate 275 # 4) send dlconfig signal 276 # 5) ???? 277 # 6) profit 278 raise Warning("this is not implemented yet.")
A class that represents the an ITLA and exposes a user friendly API for controlling functionality.
Things to figure out
- What should be exposed to the user?
- Can we abstract away the concept of registers and stuff in here so you don't have to deal with it.
There are some functions that could be implemented like set_fatalstatus. I think this is probably a bad idea even though it isnt write only.
set_frequency is my platonic ideal for the higher level functions.
73 def __init__(self, serial_port, baudrate, timeout=0.5, register_files=None): 74 """TODO describe function 75 76 :param serial_port: 77 :param baudrate: 78 :param timeout: 79 :returns: 80 81 """ 82 self._port = serial_port 83 self._baudrate = baudrate 84 self._timeout = timeout 85 self._device = None 86 87 if register_files is None: 88 register_files = [] 89 90 # this function creates register functions 91 def mkfn(*, fnname, register, description, readonly, signed, **_): 92 # _ here to absorb unused things. This way the yaml 93 # can contain more info without causing errors here. 94 if readonly: 95 96 def reg_fun(self): 97 self.send_command(register, signed=signed) 98 return self.get_response(register) 99 100 else: 101 102 def reg_fun(self, data=None): 103 self.send_command(register, data, signed=signed) 104 return self.get_response(register) 105 106 reg_fun.__doc__ = description 107 reg_fun.__name__ = fnname 108 return reg_fun 109 110 for register_file in register_files: 111 register_file = resource_filename("itla", "registers/" + register_file) 112 with open(register_file, "r") as register_yaml: 113 register_spec = yaml.safe_load(register_yaml) 114 115 for register_name in register_spec: 116 register_data = register_spec[register_name] 117 setattr( 118 ITLABase, "_" + register_data["fnname"], mkfn(**register_data) 119 )
TODO describe function
Parameters
- serial_port:
- baudrate:
- timeout: :returns:
149 def connect(self): 150 """Establishes a serial connection with the port provided 151 152 **For some reason on Linux opening the serial port causes some 153 power output from the laser before it has been activated. This behavior 154 does not occur on Windows.** 155 156 """ 157 try: 158 self._device = serial.Serial( 159 self._port, self._baudrate, timeout=self._timeout 160 ) 161 except SerialException: 162 raise SerialException("Connection to " + self._port + " unsuccessful.")
Establishes a serial connection with the port provided
For some reason on Linux opening the serial port causes some power output from the laser before it has been activated. This behavior does not occur on Windows.
164 def disconnect(self, leave_on=False): 165 """Ends the serial connection to the laser 166 167 :param leave_on: 168 :returns: 169 170 """ 171 if not self._device.is_open: 172 return 173 174 if not leave_on: 175 self.disable() 176 177 try: 178 self._device.close() 179 except AttributeError: 180 # When does this error occur? 181 # There are a few ways disconnect can be called. 182 # 1) It can be called purposefully. 183 # 2) It can be called by ending a `with` (ie __exit__) 184 # 3) It can be called by exiting a repl or a script ending (ie. __del__). 185 pass
Ends the serial connection to the laser
Parameters
- leave_on: :returns:
187 def send_command(self, register, data=None, signed=False): 188 """Sends commands to a device. 189 This function takes the hexstring, turns it into a bytestring, 190 and writes it to the device. 191 This function should probably be hidden from the user. 192 193 :param device: Should be a Serial object that you can write to. 194 :param hexstring: a hexstring to send to the device 195 :returns: nothing 196 """ 197 198 write = data is not None 199 200 # convert to register to a bytestring 201 register_bytes = register.to_bytes(1, "big") 202 203 # convert data to bytestring 204 if write: 205 data_bytes = data.to_bytes(2, "big", signed=signed) 206 207 else: 208 data_bytes = (0).to_bytes(2, "big") 209 210 # compute the checksum 211 checksum = compute_checksum( 212 (write.to_bytes(1, "big") + register_bytes + data_bytes).hex() 213 ) 214 215 # compute and convery header to bytestring 216 header = checksum * 16 + write 217 header_bytes = header.to_bytes(1, "big") 218 219 # form full command and send. 220 command = header_bytes + register_bytes + data_bytes 221 self._device.write(command)
Sends commands to a device. This function takes the hexstring, turns it into a bytestring, and writes it to the device. This function should probably be hidden from the user.
Parameters
- device: Should be a Serial object that you can write to.
- hexstring: a hexstring to send to the device :returns: nothing
223 def get_response(self, register): 224 """This function should read from self._device. This should be called 225 226 :param register: 227 :returns: ??? 228 229 """ 230 # read four bytes 231 response = self._device.read(4) 232 233 logger.debug(f"response: {response.hex()}") 234 235 # get the checksum and ... check it. 236 checksum = int(response.hex()[0], 16) 237 computed_checksum = compute_checksum(response.hex()) 238 239 if computed_checksum != checksum: 240 raise Exception( 241 f"Communication error expected {checksum} got " + f"{computed_checksum}" 242 ) 243 244 status = int(f"{response[0]:08b}"[-2:], 2) 245 logger.debug(f"status: {status}") 246 247 try: 248 raise self._response_status[status] 249 250 except TypeError: 251 # a type error occurs if we try to raise "OK" 252 # in this situation we should just pass. 253 pass 254 255 except AEAException: 256 response = self.read_aea() 257 return response 258 259 if register != response[1]: 260 raise Exception( 261 "The returned register does not match " 262 + "the register sent to the device. " 263 + f"Got {response[1]} expected {register}." 264 ) 265 266 return response[2:]
This function should read from self._device. This should be called
Parameters
- register: :returns: ???
268 def upgrade_firmware(self, firmware_file): 269 """This function should update the firmware for the laser.""" 270 271 # STEPS: 272 # 1) release 273 # 2) set baudrate to 115200 274 # 3) disconnect and reconnect at update baudrate 275 # 4) send dlconfig signal 276 # 5) ???? 277 # 6) profit 278 raise Warning("this is not implemented yet.")
This function should update the firmware for the laser.
281class ITLA: 282 """ 283 The class that users should interface and create ITLA objects from. 284 Basically just used to select the appropriate subclass of ITLABase. 285 """ 286 287 def __new__(cls, *args, version="1.3", **kwargs): 288 """ 289 this function is executed on initialization of an object before 290 __init__ here we divert the initialization process to initialize 291 one of the subclasses of ITLABase. 292 293 I would love it if there were a way to make everything a 294 subclass of ITLA and merge the ITLA and ITLABase classes. 295 This would streamline some things and make it easier to do type 296 checking since it only makes sense that all ITLA laser objects would 297 be subclasses of ITLA not some random arbitrary ITLABase. 298 299 The current issue in doing this is that we end up with a circular 300 import I think. 301 """ 302 from .itla12 import ITLA12 303 from .itla13 import ITLA13 304 305 class_dict = { 306 "1.3": ITLA13, 307 "1.2": ITLA12, 308 } 309 310 return class_dict[version](*args, **kwargs)
The class that users should interface and create ITLA objects from. Basically just used to select the appropriate subclass of ITLABase.
287 def __new__(cls, *args, version="1.3", **kwargs): 288 """ 289 this function is executed on initialization of an object before 290 __init__ here we divert the initialization process to initialize 291 one of the subclasses of ITLABase. 292 293 I would love it if there were a way to make everything a 294 subclass of ITLA and merge the ITLA and ITLABase classes. 295 This would streamline some things and make it easier to do type 296 checking since it only makes sense that all ITLA laser objects would 297 be subclasses of ITLA not some random arbitrary ITLABase. 298 299 The current issue in doing this is that we end up with a circular 300 import I think. 301 """ 302 from .itla12 import ITLA12 303 from .itla13 import ITLA13 304 305 class_dict = { 306 "1.3": ITLA13, 307 "1.2": ITLA12, 308 } 309 310 return class_dict[version](*args, **kwargs)
this function is executed on initialization of an object before __init__ here we divert the initialization process to initialize one of the subclasses of ITLABase.
I would love it if there were a way to make everything a subclass of ITLA and merge the ITLA and ITLABase classes. This would streamline some things and make it easier to do type checking since it only makes sense that all ITLA laser objects would be subclasses of ITLA not some random arbitrary ITLABase.
The current issue in doing this is that we end up with a circular import I think.