Source code for secsgem.hsms.protocol

#####################################################################
# handler.py
#
# (c) Copyright 2013-2021, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#####################################################################
"""Contains class to create model for hsms endpoints."""
import logging
import queue
import random
import threading
import typing

import secsgem.common

from .connection import HSMS_STYPES, HsmsConnection
from .active_connection import HsmsActiveConnection
from .passive_connection import HsmsPassiveConnection
from .packet import HsmsPacket
from .select_req_header import HsmsSelectReqHeader
from .select_rsp_header import HsmsSelectRspHeader
from .deselect_req_header import HsmsDeselectReqHeader
from .deselect_rsp_header import HsmsDeselectRspHeader
from .linktest_req_header import HsmsLinktestReqHeader
from .linktest_rsp_header import HsmsLinktestRspHeader
from .reject_req_header import HsmsRejectReqHeader
from .separate_req_header import HsmsSeparateReqHeader
from .stream_function_header import HsmsStreamFunctionHeader
from .connectionstatemachine import ConnectionStateMachine

from ..secs.functions.base import SecsStreamFunction


[docs]class HsmsProtocol(secsgem.common.Protocol): # pylint: disable=too-many-instance-attributes """ Baseclass for creating Host/Equipment models. This layer contains the HSMS functionality. Inherit from this class and override required functions. """ def __init__(self, address, port, active, session_id, name, custom_connection_handler=None): """ Initialize hsms handler. :param address: IP address of remote host :type address: string :param port: TCP port of remote host :type port: integer :param active: Is the connection active (*True*) or passive (*False*) :type active: boolean :param session_id: session / device ID to use for connection :type session_id: integer :param name: Name of the underlying configuration :type name: string :param custom_connection_handler: object for connection handling (ie multi server) :type custom_connection_handler: :class:`secsgem.hsms.HsmsMultiPassiveServer` **Example**:: import secsgem.hsms def onConnect(event, data): print "Connected" client = secsgem.hsms.HsmsProtocol("10.211.55.33", 5000, True, 0, "test") client.events.hsms_connected += onConnect client.enable() time.sleep(3) client.disable() """ super().__init__() self._logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) self._communication_logger = logging.getLogger("hsms_communication") self._address = address self._port = port self._active = active self._session_id = session_id self._name = name self._connected = False # system id counter self._system_counter = random.randint(0, (2 ** 32) - 1) # repeating linktest variables self._linktest_timer = None self._linktest_timeout = 30 # select request thread for active connections, to avoid blocking state changes self._select_req_thread = None # response queues self._system_queues = {} # hsms connection state fsm self._connection_state = ConnectionStateMachine({"on_enter_CONNECTED": self._on_state_connect, "on_exit_CONNECTED": self._on_state_disconnect, "on_enter_CONNECTED_SELECTED": self._on_state_select}) # setup connection if self._active: if custom_connection_handler is None: self._connection = HsmsActiveConnection(self._address, self._port, self._session_id, self) else: self._connection = custom_connection_handler.create_connection(self._address, self._port, self._session_id, self) else: if custom_connection_handler is None: self._connection = HsmsPassiveConnection(self._address, self._port, self._session_id, self) else: self._connection = custom_connection_handler.create_connection(self._address, self._port, self._session_id, self) @property def timeouts(self) -> secsgem.common.Timeouts: """Property for timeout.""" return self._connection.timeouts @property def name(self) -> str: """Property for name.""" return self._name @property def connection(self) -> HsmsConnection: """Property for connection.""" return self._connection @property def connection_state(self) -> HsmsConnection: """Property for connection state.""" return self._connection_state
[docs] def get_next_system_counter(self): """ Return the next System. :returns: System for the next command :rtype: integer """ self._system_counter += 1 if self._system_counter > ((2 ** 32) - 1): self._system_counter = 0 return self._system_counter
def _send_select_req_thread(self): response = self.send_select_req() if response is None: self._logger.warning("select request failed") def _start_linktest_timer(self): self._linktest_timer = threading.Timer(self._linktest_timeout, self._on_linktest_timer) self._linktest_timer.daemon = True # kill thread automatically on main program termination self._linktest_timer.name = "secsgem_hsmsProtocol_linktestTimer" self._linktest_timer.start() def _on_state_connect(self): """ Handle connection state model got event connect. :param data: event attributes :type data: object """ # start linktest timer self._start_linktest_timer() # start select process if connection is active if self._active: self._select_req_thread = threading.Thread( target=self._send_select_req_thread, name="secsgem_hsmsProtocol_sendSelectReqThread") self._select_req_thread.daemon = True # kill thread automatically on main program termination self._select_req_thread.start() def _on_state_disconnect(self): """ Handle connection state model got event disconnect. :param data: event attributes :type data: object """ # stop linktest timer if self._linktest_timer: self._linktest_timer.cancel() self._linktest_timer = None def _on_state_select(self): """ Handle connection state model got event select. :param data: event attributes :type data: object """ # send event self.events.fire('hsms_selected', {'connection': self}) def _on_linktest_timer(self): """Linktest time timed out, so send linktest request.""" # send linktest request and wait for response self.send_linktest_req() # restart the timer self._start_linktest_timer()
[docs] def on_connection_established(self, _): """Handle connection was established event.""" self._connected = True # update connection state self._connection_state.connect() self.events.fire("hsms_connected", {'connection': self})
[docs] def on_connection_before_closed(self, _): """Handle connection is about to be closed event.""" # send separate request self.send_separate_req()
[docs] def on_connection_closed(self, _): """Handle connection was closed event.""" # update connection state self._connected = False self._connection_state.disconnect() self.events.fire("hsms_disconnected", {'connection': self})
def __handle_hsms_requests(self, packet): # noqa: MC0001 self._communication_logger.info("< %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) # check if it is a select request if packet.header.s_type == 0x01: # if we are disconnecting send reject else send response if self._connection.disconnecting: self.send_reject_rsp(packet.header.system, packet.header.s_type, 4) else: self.send_select_rsp(packet.header.system) # update connection state self._connection_state.select() # check if it is a select response elif packet.header.s_type == 0x02: # update connection state self._connection_state.select() if packet.header.system in self._system_queues: # send packet to request sender self._system_queues[packet.header.system].put_nowait(packet) # what to do if no sender for request waiting? # check if it is a deselect request elif packet.header.s_type == 0x03: # if we are disconnecting send reject else send response if self._connection.disconnecting: self.send_reject_rsp(packet.header.system, packet.header.s_type, 4) else: self.send_deselect_rsp(packet.header.system) # update connection state self._connection_state.deselect() # check if it is a deselect response elif packet.header.s_type == 0x04: # update connection state self._connection_state.deselect() if packet.header.system in self._system_queues: # send packet to request sender self._system_queues[packet.header.system].put_nowait(packet) # what to do if no sender for request waiting? # check if it is a linktest request elif packet.header.s_type == 0x05: # if we are disconnecting send reject else send response if self._connection.disconnecting: self.send_reject_rsp(packet.header.system, packet.header.s_type, 4) else: self.send_linktest_rsp(packet.header.system) else: if packet.header.system in self._system_queues: # send packet to request sender self._system_queues[packet.header.system].put_nowait(packet) # what to do if no sender for request waiting?
[docs] def on_connection_packet_received(self, _, packet): """ Packet received by connection. :param packet: received data packet :type packet: :class:`secsgem.hsms.HsmsPacket` """ if packet.header.s_type > 0: self.__handle_hsms_requests(packet) else: if callable(self._secs_decode): message = self._secs_decode(packet) self._communication_logger.info("< %s\n%s", packet, message, extra=self._get_log_extra()) else: self._communication_logger.info("< %s", packet, extra=self._get_log_extra()) if not self._connection_state.is_CONNECTED_SELECTED(): self._logger.warning("received message when not selected") out_packet = HsmsPacket(HsmsRejectReqHeader(packet.header.system, packet.header.s_type, 4)) self._communication_logger.info( "> %s\n %s", out_packet, HSMS_STYPES[out_packet.header.s_type], extra=self._get_log_extra()) self._connection.send_packet(out_packet) return # someone is waiting for this message if packet.header.system in self._system_queues: # send packet to request sender self._system_queues[packet.header.system].put_nowait(packet) # just log if nobody is interested else: self.events.fire("hsms_packet_received", {'connection': self, 'packet': packet})
def _get_queue_for_system(self, system_id): """ Create a new queue to receive responses for a certain system. :param system_id: system id to watch :type system_id: int :returns: queue to receive responses with :rtype: queue.Queue """ self._system_queues[system_id] = queue.Queue() return self._system_queues[system_id] def _remove_queue(self, system_id): """ Remove queue for system id from list. :param system_id: system id to remove :type system_id: int """ del self._system_queues[system_id] def __repr__(self): """Generate textual representation for an object of this class.""" return f"{self.__class__.__name__} {str(self.serialize_data())}"
[docs] def serialize_data(self) -> typing.Dict[str, typing.Any]: """ Return data for serialization. :returns: data to serialize for this object :rtype: dict """ return {'address': self._address, 'port': self._port, 'active': self._active, 'session_id': self._session_id, 'name': self._name, 'connected': self._connected}
[docs] def enable(self): """Enable the connection.""" self._connection.enable()
[docs] def disable(self): """Disable the connection.""" self._connection.disable()
[docs] def send_stream_function(self, function: SecsStreamFunction) -> bool: """ Send the packet and wait for the response. :param packet: packet to be sent :type packet: :class:`secsgem.secs.functionbase.SecsStreamFunction` """ out_packet = HsmsPacket( HsmsStreamFunctionHeader(self.get_next_system_counter(), function.stream, function.function, function.is_reply_required, self._session_id), function.encode()) self._communication_logger.info("> %s\n%s", out_packet, function, extra=self._get_log_extra()) return self._connection.send_packet(out_packet)
[docs] def send_and_waitfor_response(self, function: SecsStreamFunction) -> typing.Optional[secsgem.common.Packet]: """ Send the packet and wait for the response. :param packet: packet to be sent :type packet: :class:`secsgem.secs.functionbase.SecsStreamFunction` :returns: Packet that was received :rtype: :class:`secsgem.hsms.HsmsPacket` """ system_id = self.get_next_system_counter() response_queue = self._get_queue_for_system(system_id) out_packet = HsmsPacket(HsmsStreamFunctionHeader(system_id, function.stream, function.function, True, self._session_id), function.encode()) self._communication_logger.info("> %s\n%s", out_packet, function, extra=self._get_log_extra()) if not self._connection.send_packet(out_packet): self._logger.error("Sending packet failed") self._remove_queue(system_id) return None try: response = response_queue.get(True, self.timeouts.t3) except queue.Empty: response = None self._remove_queue(system_id) return response
[docs] def send_response(self, function: SecsStreamFunction, system: int) -> bool: """ Send response function for system. :param function: function to be sent :type function: :class:`secsgem.secs.functionbase.SecsStreamFunction` :param system: system to reply to :type system: integer """ out_packet = HsmsPacket(HsmsStreamFunctionHeader(system, function.stream, function.function, False, self._session_id), function.encode()) self._communication_logger.info("> %s\n%s", out_packet, function, extra=self._get_log_extra()) return self._connection.send_packet(out_packet)
[docs] def send_select_req(self): """ Send a Select Request to the remote host. :returns: System of the sent request :rtype: integer """ system_id = self.get_next_system_counter() response_queue = self._get_queue_for_system(system_id) packet = HsmsPacket(HsmsSelectReqHeader(system_id)) self._communication_logger.info( "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) if not self._connection.send_packet(packet): self._remove_queue(system_id) return None try: response = response_queue.get(True, self.timeouts.t6) except queue.Empty: response = None self._remove_queue(system_id) return response
[docs] def send_select_rsp(self, system_id): """ Send a Select Response to the remote host. :param system_id: System of the request to reply for :type system_id: integer """ packet = HsmsPacket(HsmsSelectRspHeader(system_id)) self._communication_logger.info( "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) return self._connection.send_packet(packet)
[docs] def send_linktest_req(self): """ Send a Linktest Request to the remote host. :returns: System of the sent request :rtype: integer """ system_id = self.get_next_system_counter() response_queue = self._get_queue_for_system(system_id) packet = HsmsPacket(HsmsLinktestReqHeader(system_id)) self._communication_logger.info( "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) if not self._connection.send_packet(packet): self._remove_queue(system_id) return None try: response = response_queue.get(True, self.timeouts.t6) except queue.Empty: response = None self._remove_queue(system_id) return response
[docs] def send_linktest_rsp(self, system_id): """ Send a Linktest Response to the remote host. :param system_id: System of the request to reply for :type system_id: integer """ packet = HsmsPacket(HsmsLinktestRspHeader(system_id)) self._communication_logger.info( "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) return self._connection.send_packet(packet)
[docs] def send_deselect_req(self): """ Send a Deselect Request to the remote host. :returns: System of the sent request :rtype: integer """ system_id = self.get_next_system_counter() response_queue = self._get_queue_for_system(system_id) packet = HsmsPacket(HsmsDeselectReqHeader(system_id)) self._communication_logger.info("> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) if not self._connection.send_packet(packet): self._remove_queue(system_id) return None try: response = response_queue.get(True, self.timeouts.t6) except queue.Empty: response = None self._remove_queue(system_id) return response
[docs] def send_deselect_rsp(self, system_id): """ Send a Deselect Response to the remote host. :param system_id: System of the request to reply for :type system_id: integer """ packet = HsmsPacket(HsmsDeselectRspHeader(system_id)) self._communication_logger.info( "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) return self._connection.send_packet(packet)
[docs] def send_reject_rsp(self, system_id, s_type, reason): """ Send a Reject Response to the remote host. :param system_id: System of the request to reply for :type system_id: integer :param s_type: s_type of rejected message :type s_type: integer :param reason: reason for rejection :type reason: integer """ packet = HsmsPacket(HsmsRejectReqHeader(system_id, s_type, reason)) self._communication_logger.info( "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) return self._connection.send_packet(packet)
[docs] def send_separate_req(self): """Send a Separate Request to the remote host.""" system_id = self.get_next_system_counter() packet = HsmsPacket(HsmsSeparateReqHeader(system_id)) self._communication_logger.info( "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) if not self._connection.send_packet(packet): return None return system_id
# helpers def _get_log_extra(self): return {"address": self._address, "port": self._port, "session_id": self._session_id, "remoteName": self._name}