Source code for etho.services.TemplateZeroService
#!/usr/bin/env python
# required imports
from .ZeroService import BaseZeroService # import super class
import time
import threading
from .utils.log_exceptions import for_all_methods, log_exceptions
import logging
import defopt
from typing import Optional
# decorate all methods in the class so that exceptions are properly logged
[docs]@for_all_methods(log_exceptions(logging.getLogger(__name__)))
class TMP(BaseZeroService):
LOGGING_PORT = 1443 # set this to range 1420-1460
SERVICE_PORT = 4243 # last two digits match logging port - but start with "42" instead of "14"
SERVICE_NAME = "TMP" # short, uppercase, 3-letter ID of the service (must equal class name)
def setup(self, duration):
self._time_started = None
self.duration = float(duration)
# APPLICATION SPECIFIC SETUP CODE HERE
# background jobs should be run and controlled via a thread
# threads can be stopped by setting an event: `_thread_stopper.set()`
self._thread_stopper = threading.Event()
# and/or via a timer
if self.duration > 0:
self._thread_timer = threading.Timer(self.duration, self.finish, kwargs={"stop_service": True})
#
self._worker_thread = threading.Thread(target=self._worker, args=(self._thread_stopper,))
def start(self):
self._time_started = time.time()
# background jobs should be run and controlled via a thread
self._worker_thread.start()
self.log.info("started")
if hasattr(self, "_thread_timer"):
self.log.info("duration {0} seconds".format(self.duration))
self._thread_timer.start()
self.log.info("finish timer started")
def _worker(self, stop_event):
RUN = True
while RUN:
if stop_event.is_set():
RUN = False
# APPLICATION SPECIFIC RUN CODE HERE
def finish(self, stop_service=False):
self.log.warning("stopping")
# stop thread if necessary
if hasattr(self, "_thread_stopper"):
self._thread_stopper.set()
if hasattr(self, "_thread_timer"):
self._thread_timer.cancel()
# clean up code here
self.log.warning(" stopped ")
# mode log file and savefilename
if stop_service:
time.sleep(2)
self.service_stop()
def disp(self):
pass
def is_busy(self):
return True # should return True/False
def test(self):
return True
def cleanup(self):
self.finish()
# your code here
return True
def info(self):
if self.is_busy():
pass # your code here
else:
return None
def cli(serializer: str = "default", port: Optional[str] = None):
if port is None:
port = TMP.SERVICE_PORT
s = TMP(serializer=serializer)
s.bind("tcp://0.0.0.0:{0}".format(port)) # broadcast on all IPs
print("running TMPZeroService")
s.run()
print("done")
if __name__ == "__main__":
defopt.run(cli)