Source code for secsgem.hsms.protocol

# (c) Copyright 2013-2021, Benjamin Parzella. All rights reserved.
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Lesser General Public License for more details.
"""Contains class to create model for hsms endpoints."""
import logging
import queue
import random
import threading
import typing

import secsgem.common

from .connection import HSMS_STYPES, HsmsConnection
from .active_connection import HsmsActiveConnection
from .passive_connection import HsmsPassiveConnection
from .packet import HsmsPacket
from .select_req_header import HsmsSelectReqHeader
from .select_rsp_header import HsmsSelectRspHeader
from .deselect_req_header import HsmsDeselectReqHeader
from .deselect_rsp_header import HsmsDeselectRspHeader
from .linktest_req_header import HsmsLinktestReqHeader
from .linktest_rsp_header import HsmsLinktestRspHeader
from .reject_req_header import HsmsRejectReqHeader
from .separate_req_header import HsmsSeparateReqHeader
from .stream_function_header import HsmsStreamFunctionHeader
from .connectionstatemachine import ConnectionStateMachine

from ..secs.functions.base import SecsStreamFunction

[docs]class HsmsProtocol(secsgem.common.Protocol): # pylint: disable=too-many-instance-attributes """ Baseclass for creating Host/Equipment models. This layer contains the HSMS functionality. Inherit from this class and override required functions. """ def __init__(self, address, port, active, session_id, name, custom_connection_handler=None): """ Initialize hsms handler. :param address: IP address of remote host :type address: string :param port: TCP port of remote host :type port: integer :param active: Is the connection active (*True*) or passive (*False*) :type active: boolean :param session_id: session / device ID to use for connection :type session_id: integer :param name: Name of the underlying configuration :type name: string :param custom_connection_handler: object for connection handling (ie multi server) :type custom_connection_handler: :class:`secsgem.hsms.HsmsMultiPassiveServer` **Example**:: import secsgem.hsms def onConnect(event, data): print "Connected" client = secsgem.hsms.HsmsProtocol("", 5000, True, 0, "test") += onConnect client.enable() time.sleep(3) client.disable() """ super().__init__() self._logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) self._communication_logger = logging.getLogger("hsms_communication") self._address = address self._port = port self._active = active self._session_id = session_id self._name = name self._connected = False # system id counter self._system_counter = random.randint(0, (2 ** 32) - 1) # repeating linktest variables self._linktest_timer = None self._linktest_timeout = 30 # select request thread for active connections, to avoid blocking state changes self._select_req_thread = None # response queues self._system_queues = {} # hsms connection state fsm self._connection_state = ConnectionStateMachine({"on_enter_CONNECTED": self._on_state_connect, "on_exit_CONNECTED": self._on_state_disconnect, "on_enter_CONNECTED_SELECTED": self._on_state_select}) # setup connection if self._active: if custom_connection_handler is None: self._connection = HsmsActiveConnection(self._address, self._port, self._session_id, self) else: self._connection = custom_connection_handler.create_connection(self._address, self._port, self._session_id, self) else: if custom_connection_handler is None: self._connection = HsmsPassiveConnection(self._address, self._port, self._session_id, self) else: self._connection = custom_connection_handler.create_connection(self._address, self._port, self._session_id, self) @property def timeouts(self) -> secsgem.common.Timeouts: """Property for timeout.""" return self._connection.timeouts @property def name(self) -> str: """Property for name.""" return self._name @property def connection(self) -> HsmsConnection: """Property for connection.""" return self._connection @property def connection_state(self) -> HsmsConnection: """Property for connection state.""" return self._connection_state
[docs] def get_next_system_counter(self): """ Return the next System. :returns: System for the next command :rtype: integer """ self._system_counter += 1 if self._system_counter > ((2 ** 32) - 1): self._system_counter = 0 return self._system_counter
def _send_select_req_thread(self): response = self.send_select_req() if response is None: self._logger.warning("select request failed") def _start_linktest_timer(self): self._linktest_timer = threading.Timer(self._linktest_timeout, self._on_linktest_timer) self._linktest_timer.daemon = True # kill thread automatically on main program termination = "secsgem_hsmsProtocol_linktestTimer" self._linktest_timer.start() def _on_state_connect(self): """ Handle connection state model got event connect. :param data: event attributes :type data: object """ # start linktest timer self._start_linktest_timer() # start select process if connection is active if self._active: self._select_req_thread = threading.Thread( target=self._send_select_req_thread, name="secsgem_hsmsProtocol_sendSelectReqThread") self._select_req_thread.daemon = True # kill thread automatically on main program termination self._select_req_thread.start() def _on_state_disconnect(self): """ Handle connection state model got event disconnect. :param data: event attributes :type data: object """ # stop linktest timer if self._linktest_timer: self._linktest_timer.cancel() self._linktest_timer = None def _on_state_select(self): """ Handle connection state model got event select. :param data: event attributes :type data: object """ # send event'hsms_selected', {'connection': self}) def _on_linktest_timer(self): """Linktest time timed out, so send linktest request.""" # send linktest request and wait for response self.send_linktest_req() # restart the timer self._start_linktest_timer()
[docs] def on_connection_established(self, _): """Handle connection was established event.""" self._connected = True # update connection state self._connection_state.connect()"hsms_connected", {'connection': self})
[docs] def on_connection_before_closed(self, _): """Handle connection is about to be closed event.""" # send separate request self.send_separate_req()
[docs] def on_connection_closed(self, _): """Handle connection was closed event.""" # update connection state self._connected = False self._connection_state.disconnect()"hsms_disconnected", {'connection': self})
def __handle_hsms_requests(self, packet): # noqa: MC0001"< %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) # check if it is a select request if packet.header.s_type == 0x01: # if we are disconnecting send reject else send response if self._connection.disconnecting: self.send_reject_rsp(packet.header.system, packet.header.s_type, 4) else: self.send_select_rsp(packet.header.system) # update connection state # check if it is a select response elif packet.header.s_type == 0x02: # update connection state if packet.header.system in self._system_queues: # send packet to request sender self._system_queues[packet.header.system].put_nowait(packet) # what to do if no sender for request waiting? # check if it is a deselect request elif packet.header.s_type == 0x03: # if we are disconnecting send reject else send response if self._connection.disconnecting: self.send_reject_rsp(packet.header.system, packet.header.s_type, 4) else: self.send_deselect_rsp(packet.header.system) # update connection state self._connection_state.deselect() # check if it is a deselect response elif packet.header.s_type == 0x04: # update connection state self._connection_state.deselect() if packet.header.system in self._system_queues: # send packet to request sender self._system_queues[packet.header.system].put_nowait(packet) # what to do if no sender for request waiting? # check if it is a linktest request elif packet.header.s_type == 0x05: # if we are disconnecting send reject else send response if self._connection.disconnecting: self.send_reject_rsp(packet.header.system, packet.header.s_type, 4) else: self.send_linktest_rsp(packet.header.system) else: if packet.header.system in self._system_queues: # send packet to request sender self._system_queues[packet.header.system].put_nowait(packet) # what to do if no sender for request waiting?
[docs] def on_connection_packet_received(self, _, packet): """ Packet received by connection. :param packet: received data packet :type packet: :class:`secsgem.hsms.HsmsPacket` """ if packet.header.s_type > 0: self.__handle_hsms_requests(packet) else: if callable(self._secs_decode): message = self._secs_decode(packet)"< %s\n%s", packet, message, extra=self._get_log_extra()) else:"< %s", packet, extra=self._get_log_extra()) if not self._connection_state.is_CONNECTED_SELECTED(): self._logger.warning("received message when not selected") out_packet = HsmsPacket(HsmsRejectReqHeader(packet.header.system, packet.header.s_type, 4)) "> %s\n %s", out_packet, HSMS_STYPES[out_packet.header.s_type], extra=self._get_log_extra()) self._connection.send_packet(out_packet) return # someone is waiting for this message if packet.header.system in self._system_queues: # send packet to request sender self._system_queues[packet.header.system].put_nowait(packet) # just log if nobody is interested else:"hsms_packet_received", {'connection': self, 'packet': packet})
def _get_queue_for_system(self, system_id): """ Create a new queue to receive responses for a certain system. :param system_id: system id to watch :type system_id: int :returns: queue to receive responses with :rtype: queue.Queue """ self._system_queues[system_id] = queue.Queue() return self._system_queues[system_id] def _remove_queue(self, system_id): """ Remove queue for system id from list. :param system_id: system id to remove :type system_id: int """ del self._system_queues[system_id] def __repr__(self): """Generate textual representation for an object of this class.""" return f"{self.__class__.__name__} {str(self.serialize_data())}"
[docs] def serialize_data(self) -> typing.Dict[str, typing.Any]: """ Return data for serialization. :returns: data to serialize for this object :rtype: dict """ return {'address': self._address, 'port': self._port, 'active': self._active, 'session_id': self._session_id, 'name': self._name, 'connected': self._connected}
[docs] def enable(self): """Enable the connection.""" self._connection.enable()
[docs] def disable(self): """Disable the connection.""" self._connection.disable()
[docs] def send_stream_function(self, function: SecsStreamFunction) -> bool: """ Send the packet and wait for the response. :param packet: packet to be sent :type packet: :class:`secsgem.secs.functionbase.SecsStreamFunction` """ out_packet = HsmsPacket( HsmsStreamFunctionHeader(self.get_next_system_counter(),, function.function, function.is_reply_required, self._session_id), function.encode())"> %s\n%s", out_packet, function, extra=self._get_log_extra()) return self._connection.send_packet(out_packet)
[docs] def send_and_waitfor_response(self, function: SecsStreamFunction) -> typing.Optional[secsgem.common.Packet]: """ Send the packet and wait for the response. :param packet: packet to be sent :type packet: :class:`secsgem.secs.functionbase.SecsStreamFunction` :returns: Packet that was received :rtype: :class:`secsgem.hsms.HsmsPacket` """ system_id = self.get_next_system_counter() response_queue = self._get_queue_for_system(system_id) out_packet = HsmsPacket(HsmsStreamFunctionHeader(system_id,, function.function, True, self._session_id), function.encode())"> %s\n%s", out_packet, function, extra=self._get_log_extra()) if not self._connection.send_packet(out_packet): self._logger.error("Sending packet failed") self._remove_queue(system_id) return None try: response = response_queue.get(True, self.timeouts.t3) except queue.Empty: response = None self._remove_queue(system_id) return response
[docs] def send_response(self, function: SecsStreamFunction, system: int) -> bool: """ Send response function for system. :param function: function to be sent :type function: :class:`secsgem.secs.functionbase.SecsStreamFunction` :param system: system to reply to :type system: integer """ out_packet = HsmsPacket(HsmsStreamFunctionHeader(system,, function.function, False, self._session_id), function.encode())"> %s\n%s", out_packet, function, extra=self._get_log_extra()) return self._connection.send_packet(out_packet)
[docs] def send_select_req(self): """ Send a Select Request to the remote host. :returns: System of the sent request :rtype: integer """ system_id = self.get_next_system_counter() response_queue = self._get_queue_for_system(system_id) packet = HsmsPacket(HsmsSelectReqHeader(system_id)) "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) if not self._connection.send_packet(packet): self._remove_queue(system_id) return None try: response = response_queue.get(True, self.timeouts.t6) except queue.Empty: response = None self._remove_queue(system_id) return response
[docs] def send_select_rsp(self, system_id): """ Send a Select Response to the remote host. :param system_id: System of the request to reply for :type system_id: integer """ packet = HsmsPacket(HsmsSelectRspHeader(system_id)) "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) return self._connection.send_packet(packet)
[docs] def send_linktest_req(self): """ Send a Linktest Request to the remote host. :returns: System of the sent request :rtype: integer """ system_id = self.get_next_system_counter() response_queue = self._get_queue_for_system(system_id) packet = HsmsPacket(HsmsLinktestReqHeader(system_id)) "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) if not self._connection.send_packet(packet): self._remove_queue(system_id) return None try: response = response_queue.get(True, self.timeouts.t6) except queue.Empty: response = None self._remove_queue(system_id) return response
[docs] def send_linktest_rsp(self, system_id): """ Send a Linktest Response to the remote host. :param system_id: System of the request to reply for :type system_id: integer """ packet = HsmsPacket(HsmsLinktestRspHeader(system_id)) "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) return self._connection.send_packet(packet)
[docs] def send_deselect_req(self): """ Send a Deselect Request to the remote host. :returns: System of the sent request :rtype: integer """ system_id = self.get_next_system_counter() response_queue = self._get_queue_for_system(system_id) packet = HsmsPacket(HsmsDeselectReqHeader(system_id))"> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) if not self._connection.send_packet(packet): self._remove_queue(system_id) return None try: response = response_queue.get(True, self.timeouts.t6) except queue.Empty: response = None self._remove_queue(system_id) return response
[docs] def send_deselect_rsp(self, system_id): """ Send a Deselect Response to the remote host. :param system_id: System of the request to reply for :type system_id: integer """ packet = HsmsPacket(HsmsDeselectRspHeader(system_id)) "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) return self._connection.send_packet(packet)
[docs] def send_reject_rsp(self, system_id, s_type, reason): """ Send a Reject Response to the remote host. :param system_id: System of the request to reply for :type system_id: integer :param s_type: s_type of rejected message :type s_type: integer :param reason: reason for rejection :type reason: integer """ packet = HsmsPacket(HsmsRejectReqHeader(system_id, s_type, reason)) "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) return self._connection.send_packet(packet)
[docs] def send_separate_req(self): """Send a Separate Request to the remote host.""" system_id = self.get_next_system_counter() packet = HsmsPacket(HsmsSeparateReqHeader(system_id)) "> %s\n %s", packet, HSMS_STYPES[packet.header.s_type], extra=self._get_log_extra()) if not self._connection.send_packet(packet): return None return system_id
# helpers def _get_log_extra(self): return {"address": self._address, "port": self._port, "session_id": self._session_id, "remoteName": self._name}