eddsctrlserver 15.2 KB
#!/usr/bin/python3
# -*- coding: utf-8 -*-

"""package eddsctrl_server
author    Benoit Dubois
copyright FEMTO ENGINEERING
license   GPL v3.0+
brief     DDS controller, server part.
details   This package provides interface to handle DDS package.
          This package implements server part of a client/server architecture.
          Code inspired from Python Module of the Week website (BSD licence):
          http://pymotw.com/2/select/
          for the logic and from:
          http://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers/
          for the message using the length prefix technique.
"""

# Ctrl-c closes the application
import signal
signal.signal(signal.SIGINT, signal.SIG_DFL)

import os
import logging
import configparser
from queue import Queue, Empty
from struct import pack, unpack
from select import select
from socket import socket, AF_INET, SOCK_STREAM

from dds.ad9912dev import Ad9912Dev as DdsDevice    # for actual use
#from dds.dds_emul import TestUsbDds as DdsDevice   # for test without DDS
from eddsctrl_server.constants import SERVER_PORT, SERVERHOST, \
    DEFAULT_IFREQ, DEFAULT_OFREQ, DEFAULT_AMP, DEFAULT_PHY, \
    CFG_SERVER_FILE, LOG_SERVER_FILE

CONSOLE_LOG_LEVEL = logging.DEBUG
FILE_LOG_LEVEL = logging.DEBUG

#==============================================================================
class EDdsCtrlServer(object):
    """Class dedicated to interface socket client with DDS device.
    Provide a basic interface to handle client queries:
    Message is built using the length prefix technique: length is sent as a
    packed 4-byte little-endian integer.
    Allow only 4 queries, changement of output frequency, input frequency,
    phase or amplitude value.
    Message structure is simple:
      - Length of message,
      - Exclamation mark
      - Character identifier,
      - Exclamation mark
      - Value
      - Exclamation mark
    There are 8 valid identifiers:
      - 'o[?]' for set/get output frequency
      - 'i[?]' for set/get input frequency
      - 'p[?]' for set/get phase
      - 'a[?]' for set/get amplitude
    """

    def __init__(self, settings_file=None):
        """Constructor: initialize the server.
        :param settings_file: file containing setting data value (str)
        :returns: None
        """
        # Retrieve parameter values from 'ini' file
        if settings_file is None:
            raise ValueError("Parameter settings_file missing.")
        self._settingsf = settings_file
        config = configparser.ConfigParser()
        config.read(settings_file)
        try:
            self._port = config.getint('dds_ctrl', 'server_port')
            ifreq = config.getfloat('dds_ctrl', 'ifreq')
            ofreq = config.getfloat('dds_ctrl', 'ofreq')
            phase = config.getfloat('dds_ctrl', 'phase')
            amp = config.getint('dds_ctrl', 'amp')
        except KeyError as ex:
            logging.critical("Correct or delete the configuration file.")
            raise KeyError("Key %s not found in configuration file:"% str(ex))
        # Init devices
        self._init_dds(ifreq, ofreq, phase, amp)
        self._init_server()
        # Start threaded server
        self._server_loop()

    def _init_dds(self, ifreq, ofreq, phase, amp):
        """Create and configure a DDS object.
        :param ifreq: dds input frequency (float)
        :param ofreq: dds output frequency (float)
        :param phase: dds output phase (float)
        :param amp: dds output amplitude (int)
        :returns: None
        """
        self._dds = DdsDevice()
        try:
            self._dds.set_ifreq(ifreq)
            self._dds.set_ofreq(ofreq)
            self._dds.set_phy(phase)
            self._dds.set_amp(amp)
            self._dds.set_hstl_output_state(False)
            self._dds.set_cmos_output_state(False)
        except Exception as ex:
            raise Exception("Unexpected error during DDS initialisation: %s" \
                            % str(ex))
        logging.debug("DDS inititialization done")

    def _init_server(self):
        """Create and configure a basic server object.
        :returns: None
        """
        try:
            self.server = socket(AF_INET, SOCK_STREAM)
            self.server.setblocking(0)
            host = SERVERHOST    # Get local machine name
            self.server.bind((host, self._port))    # Bind to the port
            self.server.listen(3)    # Now wait for client connection
        except IOError as ex:
            raise IOError("EDDSCTRLSERVER: %s, have you closed properly the " \
                          "server? " % str(ex))
        except Exception as ex:
            raise Exception("EDDSCTRLSERVER: unexpected error during server " \
                            "initialisation: %s" % str(ex))
        # Sockets from which we expect to read
        self.inputs = [self.server]
        # Sockets to which we expect to write
        self.outputs = []
        # Outgoing message queues (socket:Queue)
        self.message_queues = {}
        logging.debug("Server initialization done")

    def _server_loop(self):
        """Main server loop: Handle connection, read and write on server socket.
        :returns: None
        """
        while self.inputs:
            # Await an event on a readable socket descriptor
            readable, writable, exceptional = select(self.inputs, \
                                                     self.outputs, \
                                                     self.inputs)
            # Handle inputs
            self._handle_readable(readable)
            # Handle outputs
            self._handle_writable(writable)
            # Handle exceptional condition
            self._handle_exceptional(exceptional)

    def _handle_readable(self, socket_list):
        """Read message from client.
        Message is built using the length prefix technique. See:
        http://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers/
        :param socket_list: List of socket usable to receive data
        :returns: None
        """
        for sock in socket_list:
            # Received a connect to the server (listening) socket
            if sock is self.server:
                self._accept_connect()
            # Established connection
            else:
                header_data = self._recv_n_bytes(sock, 4)
                if header_data is None:
                    logging.error("Message seems received but no data to read")
                    return
                if len(header_data) == 4:
                    msg_len = unpack('<L', header_data)[0]
                    data = self._recv_n_bytes(sock, msg_len).decode('utf-8')
                    # Check message validity
                    if data is None:
                        logging.error("Message received but no data to read")
                        return
                    if len(data) != msg_len:
                        logging.error("Bad message length")
                        return
                    logging.debug("receive %s from %s", \
                                  data, sock.getpeername())
                    # Handle message
                    ret_data = self._input_msg_handler(data)
                    # Return data to client
                    self.message_queues[sock].put(ret_data)
                    # Add output channel for response
                    if sock not in self.outputs:
                        self.outputs.append(sock)
                else:
                    # Interpret empty result as closed connection
                    logging.info("Closing connection after reading no data")
                    # Stop listening for input on the connection
                    if sock in self.outputs:
                        self.outputs.remove(sock)
                    self.inputs.remove(sock)
                    sock.close()
                    # Remove message queue
                    del self.message_queues[sock]

    def _handle_writable(self, socket_list):
        """Send message to client.
        :param socket_list: List of socket object usable to send data
        :returns: None
        """
        for sock in socket_list:
            try:
                next_msg = self.message_queues[sock].get_nowait()
            except Empty:
                self.outputs.remove(sock)
            else:
                self._send_msg(sock, next_msg)
                logging.debug("send %s to %s", next_msg, sock.getpeername())

    def _handle_exceptional(self, socket_list):
        """Handle error with socket by closing it.
        :param socket_list: List of socket object in exceptional condition
        :returns: None
        """
        try:
            for sock in socket_list:
                logging.warning("handling exceptional condition for %s", \
                                sock.getpeername())
                # Stop listening for input on the connection
                self.inputs.remove(sock)
                if sock in self.outputs:
                    self.outputs.remove(sock)
                sock.close()
                # Remove message queue
                del self.message_queues[sock]
        except Exception as ex:
            logging.error("Unexpected error: %s", ex)

    def _input_msg_handler(self, msg):
        """Handle message from clients. Message format is defined in
        :class:`eddsctrl.server.eddsctrlsserver.EDdsCtrlServer`.
        Message contains a command to update DDS device state.
        :param msg: A formated string message (str)
        :returns: Return actual value of parameter in DDS (float)
        """
        split_msg = msg.split("!")
        index = split_msg[1].strip('\0') # Remove extra binary NULL characters
        value = split_msg[2]
        config = configparser.ConfigParser()
        config.read(self._settingsf)
        if index == "o":
            retval = self._dds.set_ofreq(float(value))
            config.set('dds_ctrl', 'ofreq', str(self._dds.get_ofreq()))
        elif index == "p":
            retval = self._dds.set_phy(float(value))
            config.set('dds_ctrl', 'phase', str(self._dds.get_phy()))
        elif index == "a":
            retval = self._dds.set_amp(int(value))
            config.set('dds_ctrl', 'amp', str(self._dds.get_amp()))
        elif index == "i":
            retval = self._dds.set_ifreq(float(value))
            config.set('dds_ctrl', 'ifreq', str(self._dds.get_ifreq()))
        elif index == "o?":
            retval = self._dds.get_ofreq()
        elif index == "p?":
            retval = self._dds.get_phy()
        elif index == "a?":
            retval = self._dds.get_amp()
        elif index == "i?":
            retval = self._dds.get_ifreq()
        else: # Bad identifier, message not valid
            logging.error("Bad identifier, expected o[?], p[?], a[?] or i[?]: "
                          + index + " given")
            return
        # Write modification to setting file
        with open(self._settingsf, 'w') as fd:
            config.write(fd)
        # Note that value actually writed in DDS (retval) can be a bit different
        # than value sended to DDS (value).
        msg = "!" + index + "!" + str(retval) + "!"
        return msg

    @staticmethod
    def _send_msg(sock, msg):
        """Method for sending 'msg' to socket.
        Message is built using the length prefix technique. See:
        http://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers/
        :param sock: a valid socket object
        :param msg: message to be send (str)
        :returns: None
        """
        header = pack('<L', len(msg))
        try:
            sock.sendall(header + msg.encode('utf-8'))
        except IOError as ex:
            logging.error("Error during sending: %s", ex)

    @staticmethod
    def _recv_n_bytes(sock, nb):
        """Convenience method for receiving exactly n bytes from socket
        (assuming it's open and connected).
        :param sock: socket object which receives the n bytes
        :param nb: number of bytes to be received (int)
        :returns: data received (str)
        """
        data = ''
        while len(data) < nb:
            try:
                chunk = sock.recv(nb - len(data))
                if chunk == '':
                    break
                data += chunk
            except IOError as ex:
                logging.error("Socket error in _recv_n_bytes: %s", ex)
                return
            except Exception as ex:
                logging.error("Error in _recv_n_bytes: %s", ex)
                return
        return data

    def _accept_connect(self):
        """Server accept connection from client.
        :returns: a new socket object related to client connection (socket)
        """
        # A "readable" server socket is ready to accept a connection
        connection, client_address = self.server.accept()
        logging.info("New connection from %s", client_address)
        connection.setblocking(0)
        self.inputs.append(connection)
        self.outputs.append(connection)
        # Give the connection a queue for data we want to send
        self.message_queues[connection] = Queue()
        return connection


#==============================================================================
def reset_settings(settings_file):
    """Resets the "settings" file with default values.
    :param settings_file: file containing setting data value (str)
    :returns: None
    """
    config = configparser.ConfigParser()
    config.add_section('dds_ctrl')
    config.set('dds_ctrl', 'server_port', str(SERVER_PORT))
    config.set('dds_ctrl', 'ofreq', str(DEFAULT_OFREQ))
    config.set('dds_ctrl', 'phase', str(DEFAULT_PHY))
    config.set('dds_ctrl', 'amp', str(DEFAULT_AMP))
    config.set('dds_ctrl', 'ifreq', str(DEFAULT_IFREQ))
    # Write modification to setting file
    with open(settings_file, 'w') as fd:
        fd.truncate(0) # Reset file contents
        config.write(fd)
    logging.info("Settings file reseted.")


#==============================================================================
def configure_logging():
    """Configures logs.
    """
    date_fmt = "%d/%m/%Y %H:%M:%S"
    log_format = "%(asctime)s %(levelname) -8s %(filename)s " + \
                 " %(funcName)s (%(lineno)d): %(message)s"
    logging.basicConfig(level=FILE_LOG_LEVEL, \
                        datefmt=date_fmt, \
                        format=log_format, \
                        filename=LOG_SERVER_FILE, \
                        filemode='w')
    console = logging.StreamHandler()
    # define a Handler which writes messages to the sys.stderr
    console.setLevel(CONSOLE_LOG_LEVEL)
    # set a format which is simpler for console use
    console_format = '%(levelname) -8s %(filename)s (%(lineno)d): %(message)s'
    formatter = logging.Formatter(console_format)
    # tell the handler to use this format
    console.setFormatter(formatter)
    # add the handler to the root logger
    logging.getLogger('').addHandler(console)


#==============================================================================
def main(settings_file):
    """Main
    """
    configure_logging()
    if os.path.isfile(settings_file) is False:
        logging.error("Settings file missing: create one with default values.")
        reset_settings(settings_file)
    EDdsCtrlServer(settings_file)


#==============================================================================
if __name__ == '__main__':
    main(CFG_SERVER_FILE)