Source code for etho.client

import time
import logging
from rich.progress import Progress
import rich
import threading
import _thread as thread
import queue
from typing import Optional, Union, Dict, Any
import psutil

from .utils.tui import rich_information

from . import config
from . import services as service_module
from .utils.config import defaultify, readconfig


def timed(fn, s, *args, **kwargs):
    quit_fun = thread.interrupt_main  # raises KeyboardInterrupt
    timer = threading.Timer(s, quit_fun)
    timer.start()
    try:
        result = fn(*args, **kwargs)
    except:  # catch KeyboardInterrupt for silent timeouts
        result = 1
    finally:
        timer.cancel()
    return result


def kill_child_processes():
    try:
        parent = psutil.Process()
    except psutil.NoSuchProcess:
        return
    children = parent.children(recursive=True)
    for child in children:
        child.terminate()  # friendly termination
    _, still_alive = psutil.wait_procs(children, timeout=3)
    for child in still_alive:
        child.kill()  # unfriendly termination


def _reject_remote_host_blocks(prot: Dict[str, Any]) -> None:
    remote_services = []
    for service_name in prot["use_services"]:
        service_config = prot.get(service_name, {})
        if isinstance(service_config, dict) and "host" in service_config:
            remote_services.append(service_name)
    if remote_services:
        services = ", ".join(remote_services)
        raise ValueError(f"Remote service hosts are no longer supported. Remove the 'host' block from: {services}.")


def _setup_services(prot, defaults, playlistfile, save_prefix, preview, new_console):
    services = {}
    service_classes = {}
    service_counts = {}

    for service_name in prot["use_services"]:
        service_class = service_module.service_class_for(service_name)
        service_type = service_class.SERVICE_NAME
        service_index = service_counts.get(service_type, 0)
        service_counts[service_type] = service_index + 1

        service = service_class.setup_client(
            service_name,
            service_index,
            prot,
            defaults,
            playlistfile,
            save_prefix,
            preview,
            new_console,
        )
        if service is None:
            continue

        services[service_name] = service
        service_classes[service_name] = service_class

    return services, service_classes


def _start_services(services, service_classes):
    logging.info("Starting services")

    start_groups = {service_name: getattr(service_classes[service_name], "CLIENT_START_GROUP", "pre") for service_name in services}
    started_groups = set()
    time_last_prereq_started = time.time()

    def start_group(group):
        nonlocal time_last_prereq_started
        started_groups.add(group)
        for service_name, service in services.items():
            if start_groups[service_name] != group:
                continue
            logging.info(f"   {service_name}.")
            service.start()
            if group in {"pre", "trigger"}:
                time_last_prereq_started = time.time()

    start_group("pre")
    time.sleep(0.5)
    start_group("trigger")

    if "daq" in start_groups.values():
        wait_remaining = 5 - (time.time() - time_last_prereq_started)
        if wait_remaining > 0:
            time.sleep(wait_remaining)
    start_group("daq")

    for group in start_groups.values():
        if group not in started_groups:
            start_group(group)

    logging.info("All services started.")


[docs] def client( protocolfile: Optional[str], playlistfile: Optional[str] = None, *, protocol: Optional[Dict[str, Any]] = None, save_prefix: Optional[str] = None, show_progress: bool = True, monitor: bool = False, debug: bool = False, preview: bool = False, _stop_event: Optional[threading.Event] = None, _done_event: Optional[threading.Event] = None, _queue: Optional[queue.Queue] = None, ): """Starts an experiment. Args: protocolfile (Optional[str]): Path to the protocol file. playlistfile (Optional[str]): _description_. protocol (Optional[Dict[str, Any]]): Protocol parameters provided directly by the GUI. save_prefix (Optional[str]): Specify the stem of the filename for all saved data and logs. Will defaults to HOSTNAME-YYYYMMDD_hhmmss, where HOSTNAME is the computer name the service is run on (typically localhost). show_progress (bool): Show a progress bar. Disable if performance is criticial. monitor (bool): Keep the run in the progress/cleanup loop even when the progress display is hidden. debug (bool): More verbose logs. preview (bool): Preview the camera (will disable saving and logging and only open a window with the camera view). _stop_event (threading.Event, optional): Used to stop the task from an outside thread. Defaults to None. _done_event (threading.Event, optional): Set to signal that the task is done/stopped to an outside thread. Defaults to None. _queue (queue.Queue, optional): Signal the expected duration of the task to outside funs. Defaults to None. """ # load config/protocols if protocol is not None: prot = defaultify(protocol) else: prot = readconfig(protocolfile) logging.debug(prot) defaults = config if defaults["host"] is None: defaults["host"] = "localhost" if defaults["python_exe"] is None: defaults["python_exe"] = "python" if defaults["serializer"] is None: defaults["serializer"] = "pickle" _reject_remote_host_blocks(prot) rich.print(defaults) # unique file name for video and node-local logs if save_prefix is None: save_prefix = f"{defaults['host']}-{time.strftime('%Y%m%d_%H%M%S')}" logging.info(f"Saving as {save_prefix}.") new_console = debug services, service_classes = _setup_services(prot, defaults, playlistfile, save_prefix, preview, new_console) # display config info for key, s in services.items(): rich_information(s.information(), prefix=key) _start_services(services, service_classes) if monitor or show_progress: total = 0 for service_name, service in services.items(): total = max(total, service.progress()["total"]) if _queue is not None: _queue.put(total) cli_progress(services, save_prefix, _stop_event, _done_event, show_progress=show_progress) else: return services
[docs] def cli_progress( services: Dict[str, Any], save_prefix: str, stop_event: Optional[threading.Event] = None, done_event: Optional[threading.Event] = None, show_progress: bool = True, ): """_summary_ Args: services (_type_): Dictionary of intialized services. save_prefix (_type_): Name of the expt. stop_event (_type_, optional): Used to stop the task from an outside thread. Defaults to None. done_event (_type_, optional): Set to signal that the task is done/stopped to an outside thread. Defaults to None. """ STOPPED_PREMATURELY = False try: with Progress(disable=not show_progress) as progress: tasks = {} for service_name, service in services.items(): tasks[service_name] = progress.add_task(f"[red]{service_name}", total=service.progress()["total"]) RUN = True while RUN and not progress.finished: for task_name, task_id in tasks.items(): if stop_event is not None and stop_event.is_set(): break if progress._tasks[task_id].finished: continue try: p = timed(services[task_name].progress, 5) description = None if "framenumber" in p: description = f"{task_name} {p['framenumber_delta'] / p['elapsed_delta']: 7.2f} fps" progress.update(task_id, completed=p["elapsed"], description=description) except: # if call times out, stop progress display - this will stop the display whenever a task times out - not necessarily when a task is done progress.stop_task(task_id) time.sleep(1) if stop_event is not None and stop_event.is_set(): logging.info("Received STOP signal. Cancelling jobs:") for task_name, task_id in tasks.items(): progress.stop_task(task_id) RUN = False STOPPED_PREMATURELY = True time.sleep(1) if STOPPED_PREMATURELY: logging.info("Finishing jobs.") for service_name, service in services.items(): logging.info(f" {service_name}") try: service.finish() except Exception as e: logging.warning(" Failed.") print(e) logging.info(" done.") time.sleep(4) finally: logging.info("Cleaning up jobs.") kill_child_processes() if done_event is not None: done_event.set() logging.info(f"Done with experiment {save_prefix}.")