servo2to2_core.py 7.17 KB
# -*- coding: utf-8 -*-

"""
author    Benoit Dubois
copyright FEMTO Engineering
licence   LGPL v3
brief     Generic (core) controller composed of 2 servos.
details
               -------
   INPUT 1 >---|PID 1|---> OUTPUT1
               -------

               -------
   INPUT 2 >---|PID 2|---> OUTPUT1
               -------
"""
import pyb

from switches import Switches
from servo_serial_com import ServoSerialCom


MNEMO_LIST = ["kp", "ki", "sp", "osp", "imax", "omax", "oscale", "fs"]

# Avoid use of timer 2, 3, 5 and 6
# see http://docs.micropython.org/en/latest/pyboard/library/pyb.Timer.html
TIMER_SERVO1_ID = 4
TIMER_SERVO2_ID = 7
TIMER_MONITOR_ID = 12

DEFAULT_TIMER_SERVO_FREQ = 1
TIMER_MONI_FREQ = 0.35

PIN_KP1 = 'PE0'
PIN_KI1 = 'PE2'
PIN_KP2 = 'PE4'
PIN_KI2 = 'PE6'

#----------------------------------------------------------
class Servo1to1(object):
    """A class defining a basic servo with one input and one output.
    Note:
    - servo is disable at initialization,
    Note on implementation:
    - 'in' must have a read() method,
    - 'ctrl' must have a compute(), enable and reset() methods,
    - 'out' must have a write() method.
    """

    def __init__(self, in_, ctrl, out):
        self.in_ = in_
        self.ctrl = ctrl
        self.out = out
        self.yn = 0 # Needed for monitoring
        self.un = 0 # Needed for monitoring
        self._en = False

    def compute(self):
        self.yn = self.in_.read()
        if self._en is True:
            self.un = self.ctrl.compute(self.yn)
        else: # Servo disabled
            self.un = 0
        self.out.write(self.un)

    def enable(self, value=True):
        self._en = value

    def reset(self):
        self.ctrl.reset()
        self._en = False


#----------------------------------------------------------
class Servo2to2Core(object):

    def __init__(self, servo):
        self.task = 0
        self.servo = servo
        self.switches, self.irqs = self.build_switches()
        self.scom = ServoSerialCom()

        # Init servos
        self.init_servo_parameters()
        self.handle_servo_state()

        # timer_monitor is used to update monitoring output
        timer_monitor = pyb.Timer(TIMER_MONITOR_ID, freq=TIMER_MONI_FREQ)
        timer_monitor.callback(lambda f: self.callback_monitor())

        # timer_servo[x] are used to sync update of servos
        self.timer_servo = (pyb.Timer(TIMER_SERVO1_ID,
                                      freq=DEFAULT_TIMER_SERVO_FREQ),
                            pyb.Timer(TIMER_SERVO2_ID,
                                      freq=DEFAULT_TIMER_SERVO_FREQ))
        self.timer_servo[0].callback(lambda f: self.callback_servo1())
        self.timer_servo[1].callback(lambda f: self.callback_servo2())

    def callback_servo1(self):
        """Executed when timer_servo1 is triggered.
        """
        self.task = 1

    def callback_servo2(self):
        """Executed when timer_servo2 is triggered.
        """
        self.task = 2

    def callback_kx(self, line):
        """Executed when a changement on a Kx switches is detected.
        """
        self.task = 3

    def callback_monitor(self):
        """Executed when timer_monitor is triggered.
        """
        self.task = 4

    def apply_cmd(self, cmd):
        if len(cmd) == 1: # Request
            pass
        else:
            if cmd[0] == "kp":
                self.servo[int(cmd[1])].ctrl.kp = cmd[2]
            elif cmd[0] == "ki":
                self.servo[int(cmd[1])].ctrl.ki = cmd[2]
            elif cmd[0] == "sp":
                self.servo[int(cmd[1])].ctrl.sp = cmd[2]
            elif cmd[0] == "osp":
                self.servo[int(cmd[1])].ctrl.osp = cmd[2]
            elif cmd[0] == "imax":
                self.servo[int(cmd[1])].ctrl.imax = cmd[2]
            elif cmd[0] == "omax":
                self.servo[int(cmd[1])].ctrl.omax = cmd[2]
            elif cmd[0] == "oscale":
                self.servo[int(cmd[1])].ctrl.oscale = cmd[2]
            elif cmd[0] == "fs":
                self.timer_servo[int(cmd[1])].freq(cmd[2])

    def handle_servo_state(self):
        """Handle controller with respect to pin values.
        """
        if self.switches.p1.value() == 1:
            self.servo[0].enable(True)
            if self.switches.i1.value() == 1:
                self.servo[0].ctrl.enable_int(True)
            else:
                self.servo[0].ctrl.enable_int(False)
        else:
            self.servo[0].enable(False)
            self.servo[0].reset()
        if self.switches.p2.value() == 1:
            self.servo[1].enable(True)
            if self.switches.i2.value() == 1:
                self.servo[1].ctrl.enable_int(True)
            else:
                self.servo[1].ctrl.enable_int(False)
        else:
            self.servo[1].enable(False)
            self.servo[1].reset()

    def init_servo_parameters(self):
        for s in self.servo:
            s.ctrl.kp = 0
            s.ctrl.ki = 0
            s.ctrl.sp = 0

    def build_switches(self):
        switches = Switches(pyb.Pin(PIN_KP1),
                            pyb.Pin(PIN_KI1),
                            pyb.Pin(PIN_KP2),
                            pyb.Pin(PIN_KI2))
        irqs = Switches(pyb.ExtInt(switches.p1,
                                   pyb.ExtInt.IRQ_RISING_FALLING,
                                   pyb.Pin.PULL_UP,
                                   self.callback_kx),
                        pyb.ExtInt(switches.i1,
                                   pyb.ExtInt.IRQ_RISING_FALLING,
                                   pyb.Pin.PULL_UP,
                                   self.callback_kx),
                        pyb.ExtInt(switches.p2,
                                   pyb.ExtInt.IRQ_RISING_FALLING,
                                   pyb.Pin.PULL_UP,
                                   self.callback_kx),
                        pyb.ExtInt(switches.i2,
                                   pyb.ExtInt.IRQ_RISING_FALLING,
                                   pyb.Pin.PULL_UP,
                                   self.callback_kx))
        return switches, irqs

    def run(self):
        while True:
            if self.task == 3: # Handle events on switches
                self.handle_servo_state()
                self.task = 0
            elif self.task == 1:  # Servo1 computation process
                self.servo[0].compute()
                self.task = 0
            elif self.task == 2:  # Servo2 computation process
                self.servo[1].compute()
                self.task = 0
            elif self.task == 4: # Get values to user (monitoring)
                print("{:d}\t{:d}\t{:d}\t{:d}\t{:d}\t{:d}".format(
                    self.servo[0].yn, self.servo[0].un,
                    self.servo[0].ctrl.integrator,
                    self.servo[1].yn, self.servo[1].un,
                    self.servo[1].ctrl.integrator))
                self.task = 0
            else: # => task = 0, wait request from user
                self.scom.poller.poll(250)
                if self.scom.vcom.any():
                    msg = self.scom.read_message()
                    if msg is not None:
                        cmd = self.scom.decode_message(msg)
                        if cmd is not None:
                            self.apply_cmd(cmd)