brewer.controller module
from os import getenv import time from . import str116 from . import settings from .slack import BrewerBot import serial from .omega import Omega from terminaltables import AsciiTable from .fake_controller import FakeController from .color import * class Controller: """ This is class that interfaces with the hardware of the brew rig and you. """ def __new__(cls): """ Looks for hardware. If it doesn't find any, return a `FakeController` Can be overridden by environment variables `force_fake_controller` and `force_real_controller` set to '1' """ if getenv("force_fake_controller") == '1': green("force_fake_controller enabled") green("Using FakeController()") return FakeController() elif getenv("force_real_controller") == '1': green("force_real_controller enabled") green("Using Controller()") return super(Controller, cls).__new__(cls) try: # This is what throws the error if there's no hardware. Might seem a bit out of place. omega = Omega( settings.port, settings.rimsAddress, settings.baudRate, settings.timeout ) except serial.serialutil.SerialException as exc: yellow("No hardware detected!") green("Using FakeConroller()") return FakeController() green("Hardware found") green("Using Controller()") return super(Controller, cls).__new__(cls) def __init__(self): self.omega = Omega( settings.port, settings.rimsAddress, settings.baudRate, settings.timeout ) self.settings = settings self.slack = BrewerBot() @staticmethod def simulator(): return FakeController() def relay_status(relay_num: int): """ Returns the relay status of a specified relay """ return str116.get_relay(relay_num) def set_relay(self, relay_num: int, state: int): """ Sets relay state to on or off. """ str116.set_relay(relay_num, state) def pid_running(self): """ Returns True if pid is running, False otherwise. """ return self.omega.is_running() def pid_status(self): """ Returns an a dictionary of pid status, pv, and sv """ return { "pid_running": bool(self.pid_running()), "sv": self.sv(), "pv": self.pv() } def pid(self, state: int): """ Set the pid on or off. Note: the pid should *never* be on if the pump is off. Liquid needs to circulate, or the heater will burn itself out. """ self._safegaurd_state(state) if state == 1: self.pump(1) self.omega.run() else: self.omega.stop() return True def hlt(self, state: int): """ Opens or closes the hlt valve """ self._safegaurd_state(state) self.set_relay(self.settings.relays["hlt"], state) return True def hlt_to(self, location: str): """ Sets location of hlt divert valve to 'mash' or 'boil' tuns. """ if location == "mash": self.set_relay(self.settings.relays["hltToMash"], 1) return True elif location == "boil": self.set_relay(self.settings.relays["hltToMash"], 0) return True else: raise ValueError("Location unknown: valid locations are 'mash' and 'boil'") def rims_to(self, location: str): """ Sets location of rims divert valve to 'mash' or 'boil' tuns. """ if location == "mash": self.set_relay(self.settings.relays["rimsToMash"], 1) return True elif location == "boil": self.set_relay(self.settings.relays["rimsToMash"], 0) return True else: raise ValueError("Location unknown: valid locations are 'mash' and 'boil'") def pump_status(self): return self.relay_status(self.settings.relays["pump"]) def pump(self, state: int): """ Turns the pump on or off """ self._safegaurd_state(state) if state == 0: self.pid(0) self.set_relay(self.settings.relays['pump'], state) return True def _safegaurd_state(self, state): """ Ensures the argument is an int 0 or 1 """ if not isinstance(state, int): raise ValueError("Relay State needs to be an integer, " + str(type(state)) + " given.") if state < 0 or state > 1: raise ValueError("State needs to be integer 0 or 1, " + str(state) + " given.") return True def sv(self): """ Returns the Setpoint value (SV) """ return float(self.omega.sv()) def set_sv(self, temp: float): """ Sets the setpoint value. Should be float, int is acceptable """ self.omega.safeguard(temp, [int, float]) self.omega.set_sv(temp) return self.omega.sv() def pv(self): """ Returns the Proccess Value (PV) """ return float(self.omega.pv()) def watch(self): """ Watches temperatures, and returns true when PV is >= SV. """ while self.pv() <= self.sv(): time.sleep(2) # :nocov: self.slack.send("PV is now at " + str(self.pv()) + " f") return True def status_table(self): """ Returns an `AsciiTable` class of pid and pump status. """ status = AsciiTable([ ["Setting", "Value"], ["PID on?", str(self.pid_status()['pid_running'])], ["Pump on?", str(self.pump_status())], ["pv", str(self.pv())], ["sv", str(self.sv())] ]) return status
Module variables
var BOLD
var END
var FAIL
var HEADER
var OKBLUE
var OKGREEN
var UNDERLINE
var WARNING
Classes
class Controller
This is class that interfaces with the hardware of the brew rig and you.
class Controller: """ This is class that interfaces with the hardware of the brew rig and you. """ def __new__(cls): """ Looks for hardware. If it doesn't find any, return a `FakeController` Can be overridden by environment variables `force_fake_controller` and `force_real_controller` set to '1' """ if getenv("force_fake_controller") == '1': green("force_fake_controller enabled") green("Using FakeController()") return FakeController() elif getenv("force_real_controller") == '1': green("force_real_controller enabled") green("Using Controller()") return super(Controller, cls).__new__(cls) try: # This is what throws the error if there's no hardware. Might seem a bit out of place. omega = Omega( settings.port, settings.rimsAddress, settings.baudRate, settings.timeout ) except serial.serialutil.SerialException as exc: yellow("No hardware detected!") green("Using FakeConroller()") return FakeController() green("Hardware found") green("Using Controller()") return super(Controller, cls).__new__(cls) def __init__(self): self.omega = Omega( settings.port, settings.rimsAddress, settings.baudRate, settings.timeout ) self.settings = settings self.slack = BrewerBot() @staticmethod def simulator(): return FakeController() def relay_status(relay_num: int): """ Returns the relay status of a specified relay """ return str116.get_relay(relay_num) def set_relay(self, relay_num: int, state: int): """ Sets relay state to on or off. """ str116.set_relay(relay_num, state) def pid_running(self): """ Returns True if pid is running, False otherwise. """ return self.omega.is_running() def pid_status(self): """ Returns an a dictionary of pid status, pv, and sv """ return { "pid_running": bool(self.pid_running()), "sv": self.sv(), "pv": self.pv() } def pid(self, state: int): """ Set the pid on or off. Note: the pid should *never* be on if the pump is off. Liquid needs to circulate, or the heater will burn itself out. """ self._safegaurd_state(state) if state == 1: self.pump(1) self.omega.run() else: self.omega.stop() return True def hlt(self, state: int): """ Opens or closes the hlt valve """ self._safegaurd_state(state) self.set_relay(self.settings.relays["hlt"], state) return True def hlt_to(self, location: str): """ Sets location of hlt divert valve to 'mash' or 'boil' tuns. """ if location == "mash": self.set_relay(self.settings.relays["hltToMash"], 1) return True elif location == "boil": self.set_relay(self.settings.relays["hltToMash"], 0) return True else: raise ValueError("Location unknown: valid locations are 'mash' and 'boil'") def rims_to(self, location: str): """ Sets location of rims divert valve to 'mash' or 'boil' tuns. """ if location == "mash": self.set_relay(self.settings.relays["rimsToMash"], 1) return True elif location == "boil": self.set_relay(self.settings.relays["rimsToMash"], 0) return True else: raise ValueError("Location unknown: valid locations are 'mash' and 'boil'") def pump_status(self): return self.relay_status(self.settings.relays["pump"]) def pump(self, state: int): """ Turns the pump on or off """ self._safegaurd_state(state) if state == 0: self.pid(0) self.set_relay(self.settings.relays['pump'], state) return True def _safegaurd_state(self, state): """ Ensures the argument is an int 0 or 1 """ if not isinstance(state, int): raise ValueError("Relay State needs to be an integer, " + str(type(state)) + " given.") if state < 0 or state > 1: raise ValueError("State needs to be integer 0 or 1, " + str(state) + " given.") return True def sv(self): """ Returns the Setpoint value (SV) """ return float(self.omega.sv()) def set_sv(self, temp: float): """ Sets the setpoint value. Should be float, int is acceptable """ self.omega.safeguard(temp, [int, float]) self.omega.set_sv(temp) return self.omega.sv() def pv(self): """ Returns the Proccess Value (PV) """ return float(self.omega.pv()) def watch(self): """ Watches temperatures, and returns true when PV is >= SV. """ while self.pv() <= self.sv(): time.sleep(2) # :nocov: self.slack.send("PV is now at " + str(self.pv()) + " f") return True def status_table(self): """ Returns an `AsciiTable` class of pid and pump status. """ status = AsciiTable([ ["Setting", "Value"], ["PID on?", str(self.pid_status()['pid_running'])], ["Pump on?", str(self.pump_status())], ["pv", str(self.pv())], ["sv", str(self.sv())] ]) return status
Ancestors (in MRO)
- Controller
- builtins.object
Static methods
def __init__(
self)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self): self.omega = Omega( settings.port, settings.rimsAddress, settings.baudRate, settings.timeout ) self.settings = settings self.slack = BrewerBot()
def hlt(
self, state)
Opens or closes the hlt valve
def hlt(self, state: int): """ Opens or closes the hlt valve """ self._safegaurd_state(state) self.set_relay(self.settings.relays["hlt"], state) return True
def hlt_to(
self, location)
Sets location of hlt divert valve to 'mash' or 'boil' tuns.
def hlt_to(self, location: str): """ Sets location of hlt divert valve to 'mash' or 'boil' tuns. """ if location == "mash": self.set_relay(self.settings.relays["hltToMash"], 1) return True elif location == "boil": self.set_relay(self.settings.relays["hltToMash"], 0) return True else: raise ValueError("Location unknown: valid locations are 'mash' and 'boil'")
def pid(
self, state)
Set the pid on or off.
Note: the pid should never be on if the pump is off. Liquid needs to circulate, or the heater will burn itself out.
def pid(self, state: int): """ Set the pid on or off. Note: the pid should *never* be on if the pump is off. Liquid needs to circulate, or the heater will burn itself out. """ self._safegaurd_state(state) if state == 1: self.pump(1) self.omega.run() else: self.omega.stop() return True
def pid_running(
self)
Returns True if pid is running, False otherwise.
def pid_running(self): """ Returns True if pid is running, False otherwise. """ return self.omega.is_running()
def pid_status(
self)
Returns an a dictionary of pid status, pv, and sv
def pid_status(self): """ Returns an a dictionary of pid status, pv, and sv """ return { "pid_running": bool(self.pid_running()), "sv": self.sv(), "pv": self.pv() }
def pump(
self, state)
Turns the pump on or off
def pump(self, state: int): """ Turns the pump on or off """ self._safegaurd_state(state) if state == 0: self.pid(0) self.set_relay(self.settings.relays['pump'], state) return True
def pump_status(
self)
def pump_status(self): return self.relay_status(self.settings.relays["pump"])
def pv(
self)
Returns the Proccess Value (PV)
def pv(self): """ Returns the Proccess Value (PV) """ return float(self.omega.pv())
def relay_status(
relay_num)
Returns the relay status of a specified relay
def relay_status(relay_num: int): """ Returns the relay status of a specified relay """ return str116.get_relay(relay_num)
def rims_to(
self, location)
Sets location of rims divert valve to 'mash' or 'boil' tuns.
def rims_to(self, location: str): """ Sets location of rims divert valve to 'mash' or 'boil' tuns. """ if location == "mash": self.set_relay(self.settings.relays["rimsToMash"], 1) return True elif location == "boil": self.set_relay(self.settings.relays["rimsToMash"], 0) return True else: raise ValueError("Location unknown: valid locations are 'mash' and 'boil'")
def set_relay(
self, relay_num, state)
Sets relay state to on or off.
def set_relay(self, relay_num: int, state: int): """ Sets relay state to on or off. """ str116.set_relay(relay_num, state)
def set_sv(
self, temp)
Sets the setpoint value. Should be float, int is acceptable
def set_sv(self, temp: float): """ Sets the setpoint value. Should be float, int is acceptable """ self.omega.safeguard(temp, [int, float]) self.omega.set_sv(temp) return self.omega.sv()
def simulator(
)
@staticmethod def simulator(): return FakeController()
def status_table(
self)
Returns an AsciiTable
class of pid and pump status.
def status_table(self): """ Returns an `AsciiTable` class of pid and pump status. """ status = AsciiTable([ ["Setting", "Value"], ["PID on?", str(self.pid_status()['pid_running'])], ["Pump on?", str(self.pump_status())], ["pv", str(self.pv())], ["sv", str(self.sv())] ]) return status
def sv(
self)
Returns the Setpoint value (SV)
def sv(self): """ Returns the Setpoint value (SV) """ return float(self.omega.sv())
def watch(
self)
Watches temperatures, and returns true when PV is >= SV.
def watch(self): """ Watches temperatures, and returns true when PV is >= SV. """ while self.pv() <= self.sv(): time.sleep(2) # :nocov: self.slack.send("PV is now at " + str(self.pv()) + " f") return True
Instance variables
var omega
var settings
var slack