Source code for secsgem.hsms.connectionmanager

#####################################################################
# connectionmanager.py
#
# (c) Copyright 2013-2021, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#####################################################################
"""Contains class for handling multiple connections."""

import logging

import secsgem.common

from .protocol import HsmsProtocol
from .multi_passive_server import HsmsMultiPassiveServer


[docs]class HsmsConnectionManager: """High level class that handles multiple active and passive connections and the model for them.""" def __init__(self): """Initialize a hsms connection manager.""" self._event_producer = secsgem.common.EventProducer() self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) self.handlers = {} self.servers = {} self.stopping = False self._test_server_object = None @property def events(self): """Property for event handling.""" return self._event_producer
[docs] def has_connection_to(self, index): """ Check if connection to certain peer exists. :param index: Name of the reqested handler. :type index: string :returns: Is peer available :rtype: boolean """ for handler in self.handlers.values(): if handler.name == index: return handler return None
def __getitem__(self, index): """Get a connection by using [] on the object.""" return self.has_connection_to(index)
[docs] @staticmethod def get_connection_id(address): """ Generate connection ids used for internal indexing. :param address: The IP address for the affected remote. :type address: string """ return f"{address}"
def _update_required_servers(self, additional_port=-1): # pragma: no cover """ Start server if any active handler is found. .. warning:: Do not call this directly, for internal use only. """ if self._test_server_object: return required_ports = [] if additional_port > 0: required_ports.append(additional_port) for handler in self.handlers.values(): if not handler.active: if handler.port not in required_ports: required_ports.append(handler.port) for server_port, server in self.servers.items(): if server_port not in required_ports: self.logger.debug("stopping server on port %d", server_port) server.stop() del self.servers[server_port] for required_port in required_ports: if required_port not in self.servers: self.logger.debug("starting server on port %d", required_port) self.servers[required_port] = HsmsMultiPassiveServer(required_port) self.servers[required_port].start()
[docs] def add_peer(self, name, address, port, active, session_id, connection_handler=HsmsProtocol): """ Add a new connection. :param name: Name of the peers configuration :type name: string :param address: IP address of peer :type address: string :param port: TCP port of peer :type port: integer :param active: Is the connection active (*True*) or passive (*False*) :type active: boolean :param session_id: session / device ID of peer :type session_id: integer :param connection_handler: Model handling this connection :type connection_handler: inherited from :class:`secsgem.hsms.protocol.HsmsProtocol` """ self.logger.debug("new remote %s at %s:%d", name, address, port) connection_id = self.get_connection_id(address) self._update_required_servers(port) if self._test_server_object: if active: handler = connection_handler(address, port, active, session_id, name, self._test_server_object) else: handler = connection_handler(address, port, active, session_id, name, self._test_server_object) else: # pragma: no cover if active: handler = connection_handler(address, port, active, session_id, name) else: handler = connection_handler(address, port, active, session_id, name, self.servers[port]) handler._event_producer += self._event_producer handler.enable() self.handlers[connection_id] = handler return handler
[docs] def remove_peer(self, name, address, port): """ Remove a previously added connection. :param name: Name of the peers configuration :type name: string :param address: IP address of peer :type address: string :param port: TCP port of peer :type port: integer """ self.logger.debug("disconnecting from %s at %s:%d", name, address, port) connection_id = self.get_connection_id(address) if connection_id in self.handlers: handler = self.handlers[connection_id] handler.connection.disconnect() handler.disable() del self.handlers[connection_id] self._update_required_servers()
[docs] def stop(self): """Stop all servers and terminate the connections.""" self.stopping = True for handler in self.handlers.values(): handler.connection.disconnect() self.handlers.clear() self._update_required_servers()