"""A text based interface. For example use over serial ports like"/dev/ttyS1" or "/dev/ttyUSB0" on Linux machines or "COM1" on Windows.The interface is a simple implementation that has been used forrecording CAN traces.See the interface documentation for the format being used."""importioimportloggingimportstructfromcollections.abcimportSequencefromtypingimportAny,Optional,castfromcanimport(BusABC,CanInitializationError,CanInterfaceNotImplementedError,CanOperationError,CanProtocol,CanTimeoutError,Message,)fromcan.typecheckingimportAutoDetectedConfiglogger=logging.getLogger("can.serial")try:importserial.tools.list_portsexceptImportError:logger.warning("You won't be able to use the serial can backend without ""the `pyserial` package installed!")serial=NoneCAN_ERR_FLAG=0x20000000CAN_RTR_FLAG=0x40000000CAN_EFF_FLAG=0x80000000CAN_ID_MASK_EXT=0x1FFFFFFFCAN_ID_MASK_STD=0x7FF
[docs]classSerialBus(BusABC):""" Enable basic can communication over a serial device. .. note:: See :meth:`~_recv_internal` for some special semantics. """def__init__(self,channel:str,baudrate:int=115200,timeout:float=0.1,rtscts:bool=False,**kwargs:Any,)->None:""" :param channel: The serial device to open. For example "/dev/ttyS1" or "/dev/ttyUSB0" on Linux or "COM1" on Windows systems. :param baudrate: Baud rate of the serial device in bit/s (default 115200). .. warning:: Some serial port implementations don't care about the baudrate. :param timeout: Timeout for the serial device in seconds (default 0.1). :param rtscts: turn hardware handshake (RTS/CTS) on and off :raises ~can.exceptions.CanInitializationError: If the given parameters are invalid. :raises ~can.exceptions.CanInterfaceNotImplementedError: If the serial module is not installed. """ifnotserial:raiseCanInterfaceNotImplementedError("the serial module is not installed")ifnotchannel:raiseTypeError("Must specify a serial port.")self.channel_info=f"Serial interface: {channel}"self._can_protocol=CanProtocol.CAN_20try:self._ser=serial.serial_for_url(channel,baudrate=baudrate,timeout=timeout,rtscts=rtscts)exceptValueErroraserror:raiseCanInitializationError("could not create the serial device")fromerrorsuper().__init__(channel,**kwargs)defshutdown(self)->None:""" Close the serial interface. """super().shutdown()self._ser.close()defsend(self,msg:Message,timeout:Optional[float]=None)->None:""" Send a message over the serial device. :param msg: Message to send. .. note:: If the timestamp is a float value it will be converted to an integer. :param timeout: This parameter will be ignored. The timeout value of the channel is used instead. """# Pack timestamptry:timestamp=struct.pack("<I",int(msg.timestamp*1000))exceptstruct.error:raiseValueError(f"Timestamp is out of range: {msg.timestamp}")fromNone# Pack arbitration IDifmsg.is_extended_id:arbitration_id=msg.arbitration_id&CAN_ID_MASK_EXTarbitration_id|=CAN_EFF_FLAGelse:arbitration_id=msg.arbitration_id&CAN_ID_MASK_STDifmsg.is_error_frame:arbitration_id|=CAN_ERR_FLAGifmsg.is_remote_frame:arbitration_id|=CAN_RTR_FLAGarbitration_id_bytes=struct.pack("<I",arbitration_id)# Assemble messagebyte_msg=bytearray()byte_msg.append(0xAA)byte_msg+=timestampbyte_msg.append(msg.dlc)byte_msg+=arbitration_id_bytesbyte_msg+=msg.databyte_msg.append(0xBB)# Write to serial devicetry:self._ser.write(byte_msg)exceptserial.PortNotOpenErroraserror:raiseCanOperationError("writing to closed port")fromerrorexceptserial.SerialTimeoutExceptionaserror:raiseCanTimeoutError()fromerror
[docs]def_recv_internal(self,timeout:Optional[float])->tuple[Optional[Message],bool]:""" Read a message from the serial device. :param timeout: .. warning:: This parameter will be ignored. The timeout value of the channel is used. :returns: Received message and :obj:`False` (because no filtering as taken place). """try:rx_byte=self._ser.read()ifrx_byteandord(rx_byte)==0xAA:s=self._ser.read(4)timestamp=struct.unpack("<I",s)[0]dlc=ord(self._ser.read())ifdlc>8:raiseValueError("received DLC may not exceed 8 bytes")s=self._ser.read(4)arbitration_id=struct.unpack("<I",s)[0]is_extended_id=bool(arbitration_id&CAN_EFF_FLAG)is_error_frame=bool(arbitration_id&CAN_ERR_FLAG)is_remote_frame=bool(arbitration_id&CAN_RTR_FLAG)ifis_extended_id:arbitration_id=arbitration_id&CAN_ID_MASK_EXTelse:arbitration_id=arbitration_id&CAN_ID_MASK_STDdata=self._ser.read(dlc)delimiter_byte=ord(self._ser.read())ifdelimiter_byte==0xBB:# received message data okaymsg=Message(# TODO: We are only guessing that they are millisecondstimestamp=timestamp/1000,arbitration_id=arbitration_id,dlc=dlc,data=data,is_extended_id=is_extended_id,is_error_frame=is_error_frame,is_remote_frame=is_remote_frame,)returnmsg,Falseelse:raiseCanOperationError(f"invalid delimiter byte while reading message: {delimiter_byte}")else:returnNone,Falseexceptserial.SerialExceptionaserror:raiseCanOperationError("could not read from serial")fromerror
deffileno(self)->int:try:returncast("int",self._ser.fileno())exceptio.UnsupportedOperation:raiseNotImplementedError("fileno is not implemented using current CAN bus on this platform")fromNoneexceptExceptionasexception:raiseCanOperationError("Cannot fetch fileno")fromexception@staticmethoddef_detect_available_configs()->Sequence[AutoDetectedConfig]:configs:list[AutoDetectedConfig]=[]ifserialisNone:returnconfigsforportinserial.tools.list_ports.comports():configs.append({"interface":"serial","channel":port.device})returnconfigs