servo2to1_core.py 8.74 KB
# -*- coding: utf-8 -*-

"""
author  Benoit Dubois
copyright FEMTO Engineering, 2021
licence GPL v3+
brief   Generic (core) controller composed of 2 inputs 2 servo and 1 output:
details
               -------
   INPUT 1 >---|PID 1|---|
               -------   |
                         |-----> OUTPUT
               -------   |
   INPUT 2 >---|PID 2|---|
               -------
"""
import pyb
import ucollections
import uselect
from servo_serial_com import ServoSerialCom


MNEMO_LIST = ["kp", "ki", "sp", "osp", "imax", "omax", "oscale", "fs"]

# Avoid use of timer 2, 3, 5 and 6
# see http://docs.micropython.org/en/latest/pyboard/library/pyb.Timer.html
TIMER_MONITOR_ID = 9

TIMER_MONI_FREQ = 0.35

PIN_KP1 = 'PE0'
PIN_KI1 = 'PE2'
PIN_KP2 = 'PE4'
PIN_KI2 = 'PE6'


#----------------------------------------------------------
class Servo2to1(object):
    """A class defining a servo with two inputs and one output. You can select
    the input to be used by the servo. Each input used a different controller,
    so you can adjust different control parameters.
    Note:
    - servo can be disabled by selecting input '0',
    - input '0' is selected at initialization,
    - if input is changed, integrator of selected controller is reseted.
    Note on implementation:
    - inputs must have a read() method,
    - controllers must have a compute() and reset() methods,
    - output must have a write() method.
    """

    def __init__(self, in1, in2, ctrl1, ctrl2, out, timer, freq):
        self.in1 = in1
        self.in2 = in2
        self.ctrl1 = ctrl1
        self.ctrl2 = ctrl2
        self.out = out
        self.timer = timer
        self.cfreq = freq
        self.freq1 = freq
        self.freq2 = freq
        self.ctrl = 1
        self.yn = 0 # Needed for monitoring
        self.un = 0 # Needed for monitoring

    def compute(self):
        if self.ctrl == 1:
            self.yn = self._value1()
            self.un = self._compute1(self.yn)
        elif self.ctrl == 2:
            self.yn = self._value2()
            self.un = self._compute2(self.yn)
        else: # Servo disabled
            self.un = 0
            self.yn = 0
            return
        self._set_value(self.un)

    @property
    def ctrl(self):
        return self._ctrl

    @ctrl.setter
    def ctrl(self, value):
        if value == 1:
            self._reset1()
        elif value == 2:
            self._reset2()
        self._ctrl = value

    @property
    def cfreq(self):
        return self.timer.freq()

    @cfreq.setter
    def cfreq(self, value):
        self.timer.freq(value)

    def _value1(self):
        return self.in1.read()

    def _value2(self):
        return self.in2.read()

    def _compute1(self, value):
        return self.ctrl1.compute(value)

    def _compute2(self, value):
        return self.ctrl2.compute(value)

    def _reset1(self):
        self.ctrl1.reset()

    def _reset2(self):
        self.ctrl2.reset()

    def _set_value(self, value):
        self.out.write(value)


#----------------------------------------------------------
class Servo2to1Core(object):

    def __init__(self, servo):
        self.task = 0
        self.switches, self.irqs = self.build_switches()
        self.servo = servo
        self.scom = ServoSerialCom()
        self.init_servo_parameters()
        self.log_stack = ucollections.deque((), 5)
        # servo.timer is used to sync servo update.
        self.servo.timer.callback(lambda f: self.callback_servo())
        # Init servo and timer_servo
        self.handle_servo_state()

    def callback_servo(self):
        """Executed when timer_servo is triggered.
        """
        self.task = 1

    def callback_kx(self, line):
        """Executed when a changement on a Kx switches is detected.
        """
        self.task = 2

    def apply_cmd(self, cmd):
        if len(cmd) == 1: # Request
            pass
        else:
            if cmd[0] == "kp":
                if cmd[1] == '0':
                    self.servo.ctrl1.kp = cmd[2]
                elif cmd[1] == '1':
                    self.servo.ctrl2.kp = cmd[2]
            elif cmd[0] == "ki":
                if cmd[1] == '0':
                    self.servo.ctrl1.ki = cmd[2]
                elif cmd[1] == '1':
                    self.servo.ctrl2.ki = cmd[2]
            elif cmd[0] == "sp":
                if cmd[1] == '0':
                    self.servo.ctrl1.sp = cmd[2]
                elif cmd[1] == '1':
                    self.servo.ctrl2.sp = cmd[2]
            elif cmd[0] == "osp":
                if cmd[1] == '0':
                    self.servo.ctrl1.osp = cmd[2]
                elif cmd[1] == '1':
                    self.servo.ctrl2.osp = cmd[2]
            elif cmd[0] == "imax":
                if cmd[1] == '0':
                    self.servo.ctrl1.imax = cmd[2]
                elif cmd[1] == '1':
                    self.servo.ctrl2.imax = cmd[2]
            elif cmd[0] == "omax":
                if cmd[1] == '0':
                    self.servo.ctrl1.omax = cmd[2]
                elif cmd[1] == '1':
                    self.servo.ctrl2.omax = cmd[2]
            elif cmd[0] == "oscale":
                if cmd[1] == '0':
                    self.servo.ctrl1.oscale = cmd[2]
                elif cmd[1] == '1':
                    self.servo.ctrl2.oscale = cmd[2]
            elif cmd[0] == "fs":
                if cmd[1] == '0':
                    self.servo.freq1 = cmd[2]
                elif cmd[1] == '1':
                    self.servo.freq2 = cmd[2]

    def handle_servo_state(self):
        """Handle controller to be used with respect to switche value.
        Note: Controller 1 have priority over controller 2.
        """
        if self.switches['p2'].value() == 1:
            self.servo.ctrl = 2
            self.servo.timer.freq(self.servo.freq2)
            # Avoid spike during switch to controller 2
            self.servo.ctrl2.osp = self.servo.ctrl1.un
            if self.switches['i2'].value() == 1:
                self.servo.ctrl2.enable_int(True)
            else:
                self.servo.ctrl2.enable_int(False)
        elif self.switches['p1'].value() == 1:
            self.servo.ctrl = 1
            self.servo.timer.freq(self.servo.freq1)
            if self.switches['i1'].value() == 1:
                self.servo.ctrl1.enable_int(True)
            else:
                self.servo.ctrl1.enable_int(False)
        else:
            self.servo.ctrl = 0

    def init_servo_parameters(self):
        self.servo.ctrl1.kp = 0
        self.servo.ctrl1.ki = 0
        self.servo.ctrl1.sp = 0
        self.servo.ctrl2.kp = 0
        self.servo.ctrl2.ki = 0
        self.servo.ctrl2.sp = 0

    def build_switches(self):
        switches = {'p1': pyb.Pin(PIN_KP1),
                    'i1': pyb.Pin(PIN_KI1),
                    'p2': pyb.Pin(PIN_KP2),
                    'i2': pyb.Pin(PIN_KI2)}
        irqs = {'p1': pyb.ExtInt(switches['p1'],
                                 pyb.ExtInt.IRQ_RISING_FALLING,
                                 pyb.Pin.PULL_UP,
                                 self.callback_kx),
                'i1': pyb.ExtInt(switches['i1'],
                                 pyb.ExtInt.IRQ_RISING_FALLING,
                                 pyb.Pin.PULL_UP,
                                 self.callback_kx),
                'p2': pyb.ExtInt(switches['p2'],
                                 pyb.ExtInt.IRQ_RISING_FALLING,
                                 pyb.Pin.PULL_UP,
                                 self.callback_kx),
                'i2': pyb.ExtInt(switches['i2'],
                                 pyb.ExtInt.IRQ_RISING_FALLING,
                                 pyb.Pin.PULL_UP,
                                 self.callback_kx)}
        return switches, irqs

    def run(self):
        while True:
            if self.task == 2: # Handle events on switches
                self.handle_servo_state()
                self.task = 0
            elif self.task == 1:  # Servo computation process
                self.servo.compute()
                self.log_stack.append("{:d}\t{:d}\t{:d}\t{:d}\n".format(
                    self.servo.yn,
                    self.servo.un,
                    self.servo.ctrl1.integrator,
                    self.servo.ctrl2.integrator))
                self.task = 0
            else: # => task = 0, Handle R/W on serial interface
                events = self.scom.poller.poll(250)
                if self.scom.vcom.any():
                    msg = self.scom.read_message()
                    if msg is not None:
                        cmd = self.scom.decode_message(msg)
                        if cmd is not None:
                            self.apply_cmd(cmd)
                try:
                    message = self.log_stack.popleft()
                except IndexError:
                    pass
                else:
                    self.scom.write_message_simple(message)