Source code for etho.services.TemplateZeroService

#!/usr/bin/env python
# required imports
from .ZeroService import BaseZeroService  # import super class
import time
import threading
from . import register_service
from .utils.log_exceptions import for_all_methods, log_exceptions
from ..utils.config import undefaultify
import logging
import defopt
from typing import Optional


# decorate all methods in the class so that exceptions are properly logged
[docs] @for_all_methods(log_exceptions(logging.getLogger(__name__))) @register_service class TMP(BaseZeroService): LOGGING_PORT = 1443 # set this to range 1420-1460 SERVICE_PORT = 4243 # last two digits match logging port - but start with "42" instead of "14" SERVICE_NAME = "TMP" # short, uppercase, 3-letter ID of the service (must equal class name) CLIENT_START_GROUP = "pre" @classmethod def setup_client(cls, service_key, service_index, prot, defaults, playlistfile, save_prefix, preview, new_console): this = defaults.copy() this.update(prot[service_key]) if prot[service_key].get("port") is None: prot[service_key]["port"] = cls.SERVICE_PORT + service_index service = cls.make( this["serializer"], this["host"], this["python_exe"], new_console=new_console, port=prot[service_key]["port"], ) params = undefaultify(prot[service_key]) service.setup(params.get("duration", prot["maxduration"])) save_suffix = f"_{service_index + 1}" if service_index > 0 else "" service.init_local_logger(f"{this['savefolder']}/{save_prefix}/{save_prefix}{save_suffix}_{service_key.lower()}.log") return service def setup(self, duration): self._time_started = None self.duration = float(duration) # APPLICATION SPECIFIC SETUP CODE HERE # background jobs should be run and controlled via a thread # threads can be stopped by setting an event: `_thread_stopper.set()` self._thread_stopper = threading.Event() # and/or via a timer if self.duration > 0: self._thread_timer = threading.Timer(self.duration, self.finish, kwargs={"stop_service": True}) # self._worker_thread = threading.Thread(target=self._worker, args=(self._thread_stopper,)) def start(self): self._time_started = time.time() # background jobs should be run and controlled via a thread self._worker_thread.start() self.log.info("started") if hasattr(self, "_thread_timer"): self.log.info("duration {0} seconds".format(self.duration)) self._thread_timer.start() self.log.info("finish timer started") def _worker(self, stop_event): RUN = True while RUN: if stop_event.is_set(): RUN = False # APPLICATION SPECIFIC RUN CODE HERE def finish(self, stop_service=False): self.log.warning("stopping") # stop thread if necessary if hasattr(self, "_thread_stopper"): self._thread_stopper.set() if hasattr(self, "_thread_timer"): self._thread_timer.cancel() # clean up code here self.log.warning(" stopped ") # mode log file and savefilename if stop_service: time.sleep(2) self.service_stop() def disp(self): pass def is_busy(self): return True # should return True/False def test(self): return True def cleanup(self): self.finish() # your code here return True def info(self): if self.is_busy(): pass # your code here else: return None
def cli(serializer: str = "default", port: Optional[str] = None): if port is None: port = TMP.SERVICE_PORT s = TMP(serializer=serializer) s.bind("tcp://0.0.0.0:{0}".format(port)) # broadcast on all IPs print("running TMPZeroService") s.run() print("done") if __name__ == "__main__": defopt.run(cli)