Source code for secsgem.gem.handler

#####################################################################
# handler.py
#
# (c) Copyright 2013-2021, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#####################################################################
"""Handler for GEM commands. Used in combination with :class:`secsgem.hsms.HsmsConnectionManager`."""
import logging
import threading
import typing

import secsgem.common
import secsgem.secs


[docs]class GemHandler(secsgem.secs.SecsHandler): # pylint: disable=too-many-instance-attributes """Baseclass for creating Host/Equipment models. This layer contains GEM functionality.""" def __init__(self, connection: secsgem.common.Protocol): """ Initialize a gem handler. Inherit from this class and override required functions. :param connection: connection to use """ super().__init__(connection) self._protocol.events.hsms_selected += self._on_hsms_select self._mdln = "secsgem" #: model number returned by S01E13/14 self._softrev = "0.1.0" #: software version returned by S01E13/14 self._logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) self._is_host = True # not going to HOST_INITIATED_CONNECT because fysom doesn't support two states. # but there is a transistion to get out of EQUIPMENT_INITIATED_CONNECT when the HOST_INITIATED_CONNECT happens self._communication_state = secsgem.common.Fysom({ 'initial': 'DISABLED', # 1 'events': [ {'name': 'enable', 'src': 'DISABLED', 'dst': 'ENABLED'}, # 2 {'name': 'disable', 'src': ['ENABLED', 'NOT_COMMUNICATING', 'COMMUNICATING', 'EQUIPMENT_INITIATED_CONNECT', 'WAIT_DELAY', 'WAIT_CRA', "HOST_INITIATED_CONNECT", "WAIT_CR_FROM_HOST"], 'dst': 'DISABLED'}, # 3 {'name': 'select', 'src': 'NOT_COMMUNICATING', 'dst': 'EQUIPMENT_INITIATED_CONNECT'}, # 5 {'name': 'communicationreqfail', 'src': 'WAIT_CRA', 'dst': 'WAIT_DELAY'}, # 6 {'name': 'delayexpired', 'src': 'WAIT_DELAY', 'dst': 'WAIT_CRA'}, # 7 {'name': 'messagereceived', 'src': 'WAIT_DELAY', 'dst': 'WAIT_CRA'}, # 8 {'name': 's1f14received', 'src': 'WAIT_CRA', 'dst': 'COMMUNICATING'}, # 9 {'name': 'communicationfail', 'src': 'COMMUNICATING', 'dst': 'NOT_COMMUNICATING'}, # 14 # 15 (WAIT_CR_FROM_HOST is running in background - AND state - # so if s1f13 is received we go all communicating) {'name': 's1f13received', 'src': ['WAIT_CR_FROM_HOST', 'WAIT_DELAY', 'WAIT_CRA'], 'dst': 'COMMUNICATING'}, ], 'callbacks': { 'onWAIT_CRA': self._on_state_wait_cra, 'onWAIT_DELAY': self._on_state_wait_delay, 'onleaveWAIT_CRA': self._on_state_leave_wait_cra, 'onleaveWAIT_DELAY': self._on_state_leave_wait_delay, 'onCOMMUNICATING': self._on_state_communicating, # 'onselect': self.onStateSelect, }, 'autoforward': [ {'src': 'ENABLED', 'dst': 'NOT_COMMUNICATING'}, # 4 {'src': 'EQUIPMENT_INITIATED_CONNECT', 'dst': 'WAIT_CRA'}, # 5 {'src': 'HOST_INITIATED_CONNECT', 'dst': 'WAIT_CR_FROM_HOST'}, # 10 ] }) self._wait_cra_timer = None self._comm_delay_timer = None self._establish_communication_timeout = 10 self._report_id_counter = 1000 self._wait_event_list: typing.List[threading.Event] = [] def __repr__(self) -> str: """Generate textual representation for an object of this class.""" return f"{self.__class__.__name__} {str(self.serialize_data())}" @property def communication_state(self) -> secsgem.common.Fysom: """Get the communication state model.""" return self._communication_state
[docs] def serialize_data(self) -> typing.Dict[str, typing.Any]: """ Get serialized data. :returns: data to serialize for this object :rtype: dict """ data = self.protocol.serialize_data() data.update({'communicationState': self._communication_state.current, 'commDelayTimeout': self._establish_communication_timeout, 'reportIDCounter': self._report_id_counter}) return data
[docs] def enable(self) -> None: """Enable the connection.""" self.protocol.enable() self._communication_state.enable() # type: ignore self._logger.info("Connection enabled")
[docs] def disable(self) -> None: """Disable the connection.""" self.protocol.disable() self._communication_state.disable() # type: ignore self._logger.info("Connection disabled")
def _on_hsms_packet_received(self, data: typing.Dict[str, typing.Any]): """ Packet received from hsms layer. :param data: received event data """ packet = data["packet"] if self._communication_state.isstate('WAIT_CRA'): if packet.header.stream == 1 and packet.header.function == 13: if self._is_host: self.send_response(self.stream_function(1, 14)({"COMMACK": self.on_commack_requested(), "MDLN": []}), packet.header.system) else: self.send_response(self.stream_function(1, 14)({"COMMACK": self.on_commack_requested(), "MDLN": [self._mdln, self._softrev]}), packet.header.system) self._communication_state.s1f13received() # type: ignore elif packet.header.stream == 1 and packet.header.function == 14: self._communication_state.s1f14received() # type: ignore elif self._communication_state.isstate('WAIT_DELAY'): pass elif self._communication_state.isstate('COMMUNICATING'): threading.Thread(target=self._handle_stream_function, args=(packet, ), name=f"secsgem_gemHandler_callback_S{packet.header.stream}F{packet.header.function}" ).start() def _on_hsms_select(self, _): """Selected received from hsms layer.""" self._communication_state.select() def _on_wait_cra_timeout(self): """Linktest time timed out, so send linktest request.""" self._communication_state.communicationreqfail() def _on_wait_comm_delay_timeout(self): """Linktest time timed out, so send linktest request.""" self._communication_state.delayexpired() def _on_state_wait_cra(self, _): """ Connection state model changed to state WAIT_CRA. :param data: event attributes :type data: object """ self._logger.debug("connectionState -> WAIT_CRA") self._wait_cra_timer = threading.Timer(self.protocol.timeouts.t3, self._on_wait_cra_timeout) self._wait_cra_timer.start() if self._is_host: self.send_stream_function(self.stream_function(1, 13)()) else: self.send_stream_function(self.stream_function(1, 13)([self._mdln, self._softrev])) def _on_state_wait_delay(self, _): """ Connection state model changed to state WAIT_DELAY. :param data: event attributes :type data: object """ self._logger.debug("connectionState -> WAIT_DELAY") self._comm_delay_timer = threading.Timer(self._establish_communication_timeout, self._on_wait_comm_delay_timeout) self._comm_delay_timer.start() def _on_state_leave_wait_cra(self, _): """ Connection state model changed to state WAIT_CRA. :param data: event attributes :type data: object """ if self._wait_cra_timer is not None: self._wait_cra_timer.cancel() def _on_state_leave_wait_delay(self, _): """ Connection state model changed to state WAIT_DELAY. :param data: event attributes :type data: object """ if self._comm_delay_timer is not None: self._comm_delay_timer.cancel() def _on_state_communicating(self, _): """ Connection state model changed to state COMMUNICATING. :param data: event attributes :type data: object """ self._logger.debug("connectionState -> COMMUNICATING") self.events.fire("handler_communicating", {'handler': self}) for event in self._wait_event_list: event.set()
[docs] def on_connection_closed(self, connection): """Handle connection was closed event.""" self._logger.info("Connection was closed") # call parent handlers super().on_connection_closed(connection) if self._communication_state.current == "COMMUNICATING": # update communication state self._communication_state.communicationfail()
[docs] def on_commack_requested(self) -> int: """ Get the acknowledgement code for the connection request. override to accept or deny connection request :returns: 0 when connection is accepted, 1 when connection is denied :rtype: integer """ return 0
[docs] def send_process_program(self, ppid: typing.Union[int, str], ppbody: str): """ Send a process program. :param ppid: Transferred process programs ID :type ppid: string :param ppbody: Content of process program :type ppbody: string """ # send remote command self._logger.info("Send process program %s", ppid) return self.secs_decode(self.send_and_waitfor_response(self.stream_function(7, 3)( {"PPID": ppid, "PPBODY": ppbody}))).get()
[docs] def request_process_program(self, ppid: typing.Union[int, str]) -> typing.Tuple[typing.Union[int, str], str]: """ Request a process program. :param ppid: Transferred process programs ID :type ppid: string """ self._logger.info("Request process program %s", ppid) # send remote command s7f6 = self.secs_decode(self.send_and_waitfor_response(self.stream_function(7, 5)(ppid))) return s7f6.PPID.get(), s7f6.PPBODY.get()
[docs] def waitfor_communicating(self, timeout: typing.Optional[float] = None) -> bool: """ Wait until connection gets into communicating state. Returns immediately if state is communicating. :param timeout: seconds to wait before aborting :type timeout: float :returns: True if state is communicating, False if timed out :rtype: bool """ event = threading.Event() self._wait_event_list.append(event) if self._communication_state.isstate("COMMUNICATING"): self._wait_event_list.remove(event) return True result = event.wait(timeout) self._wait_event_list.remove(event) return result
def _on_s01f01(self, handler: secsgem.secs.SecsHandler, packet: secsgem.common.Packet) -> typing.Optional[secsgem.secs.SecsStreamFunction]: """ Handle Stream 1, Function 1, Are You There. :param handler: handler the message was received on :type handler: :class:`secsgem.secs.SecsHandler` :param packet: complete message received :type packet: :class:`secsgem.common.Packet` """ del handler, packet # unused parameters if self._is_host: return self.stream_function(1, 2)() return self.stream_function(1, 2)([self._mdln, self._softrev]) def _on_s01f13(self, handler: secsgem.secs.SecsHandler, packet: secsgem.common.Packet) -> typing.Optional[secsgem.secs.SecsStreamFunction]: """ Handle Stream 1, Function 13, Establish Communication Request. :param handler: handler the message was received on :type handler: :class:`secsgem.secs.SecsHandler` :param packet: complete message received :type packet: :class:`secsgem.common.Packet` """ del handler, packet # unused parameters if self._is_host: return self.stream_function(1, 14)({"COMMACK": self.on_commack_requested(), "MDLN": []}) return self.stream_function(1, 14)({"COMMACK": self.on_commack_requested(), "MDLN": [self._mdln, self._softrev]})