Source code for etho.services.Opt2ZeroService

#!/usr/bin/env python
import threading
from .ZeroService import BaseZeroService
from .utils.log_exceptions import for_all_methods, log_exceptions
import time
import logging
import sys

try:
    from .utils.delay_pwmled import Delay_PWMLED

    import_error = None
except ImportError as import_error:
    pass


[docs]@for_all_methods(log_exceptions(logging.getLogger(__name__))) class OPT2(BaseZeroService): """Service for controlling multiple LEDs on raspberry pi. Uses [gpiozero](https://gpiozero.readthedocs.io) to control LEDs via (via GPIO pins). Individual LEDs can be controlled with independent temporal patterns and amplitudes.""" LOGGING_PORT = 1452 SERVICE_PORT = 4252 SERVICE_NAME = "OPT2" def setup(self, pins, duration, blink_pers, blink_durs, blink_paus, blink_nums, blink_dels, blink_amps): """Setup up service Args: pins (List[int]): list of pin numbers duration ([type]): duration (in seconds) the service should run blink_pers ([type]): List [trials,] of trial periods (=durations) Lists [trials, number of pins]: blink_durs ([type]): [description] blink_paus ([type]): [description] blink_nums ([type]): [description] blink_dels ([type]): [description] blink_amps ([type]): [description] """ if import_error is not None: raise import_error self.pins = [int(pin) for pin in pins] self.LEDs = [Delay_PWMLED(pin, initial_value=0, frequency=1000) for pin in self.pins] # try to set frequency to 1000 self.duration = float(duration) # total duration of experiments self.LED_blinkinterval = blink_pers # common clock for all pins - [trials,] # TODO: blinkduration/pause number should be each be a list of lists = trials x pins self.LED_blinkduration = blink_durs self.LED_blinkpause = blink_paus self.LED_blinknumber = blink_nums self.LED_blinkdelay = blink_dels self.LED_blinkamplitude = blink_amps # import pdb;pdb.set_trace() # compute in the event loop self._turn_off() # make sure LED is off at start of experiment self.trial = -1 self._thread_stopper = threading.Event() if duration > 0: self._thread_timer = threading.Timer(interval=self.duration, function=self.finish, kwargs={"stop_service": True}) self._worker_thread = threading.Thread(target=self._worker, args=(self._thread_stopper,)) for led in self.LEDs: led.blink(on_time=0.1, off_time=0, initial_delay=0, value=0, n=1) def start(self): self._time_started = time.time() # background jobs should be run and controlled via a thread self._worker_thread.start() if self.duration > 0: self.log.info("duration {0} seconds".format(self.duration)) # will execute FINISH after N seconds self._thread_timer.start() self.log.info("finish timer started") def _worker(self, stop_event): # schedule next execution - waits self.LED_blinkinterval seconds before running the _worker function again if not stop_event.is_set(): self.trial += 1 threading.Timer(interval=self.LED_blinkinterval[self.trial], function=self._worker, args=[stop_event]).start() # turn on LED for led, dur, pau, num, amp, dely in zip( self.LEDs, self.LED_blinkduration[self.trial], self.LED_blinkpause[self.trial], self.LED_blinknumber[self.trial], self.LED_blinkamplitude[self.trial], self.LED_blinkdelay[self.trial], ): logging.info(f"{led}, {dur}, {pau}, {num}, {amp}, {dely}") # led.value = amp led.blink(on_time=dur, off_time=pau, initial_delay=dely, value=amp, n=num) def finish(self, stop_service=False): self.log.warning("stopping") if hasattr(self, "_thread_stopper"): self.log.info("stoppping thread stopper") self._thread_stopper.set() if hasattr(self, "_thread_timer"): self.log.info("cancelling thread timer") self._thread_timer.cancel() # self.log.info('joining thread timer') # self._thread_timer.join() # if hasattr(self, '_worker_thread'): # self._worker_thread.join() self.log.info("turning off LED") self._turn_off() self.log.warning(" stopped ") if stop_service: self.service_stop() def _turn_on(self): """Turn LED on.""" [led.on() for led in self.LEDs] def _turn_off(self): """Turn LED off.""" [led.off() for led in self.LEDs] def disp(self): pass def is_busy(self): if any([led in self.LEDs]): return True # is this the right why to check whether thread is running? else: return False def info(self): if self.is_busy(): # NOTE: save to access thread variables? need lock or something? return "state of relay is {1}".format(self.is_busy()) else: return None def test(self): pass def cleanup(self): self.finish() if hasattr(self, "_queue_thread"): del self._queue_thread
if __name__ == "__main__": if len(sys.argv) > 1: ser = sys.argv[1] else: ser = "default" s = OPT2(serializer=ser) s.bind("tcp://0.0.0.0:{0}".format(s.SERVICE_PORT)) s.run()