from .ZeroService import BaseZeroService
import time
import threading
import sys
import copy
from typing import Iterable, Sequence, Optional, Dict, Any
from .utils.log_exceptions import for_all_methods, log_exceptions
from .callbacks import callbacks
import logging
import numpy as np
daqmx_import_error = None
try:
from .daq.IOTask import *
daqmx_import_error = None
except (ImportError, NameError, NotImplementedError) as daqmx_import_error:
pass
[docs]@for_all_methods(log_exceptions(logging.getLogger(__name__)))
class DAQ(BaseZeroService):
"""Bundles and synchronizes analog/digital input and output tasks."""
LOGGING_PORT = 1449 # set this to range 1420-1460
SERVICE_PORT = 4249 # last to digits match logging port - but start with "42" instead of "14"
SERVICE_NAME = "DAQ" # short, uppercase, 3-letter ID of the service (equals class name)
def setup(
self,
savefilename: str = None,
play_order: Iterable = None,
playlist_info=None,
duration: float = -1,
fs: int = 10000,
dev_name="Dev1",
nb_inputsamples_per_cycle=None,
clock_source=None,
analog_chans_out: Optional[Sequence[str]] = None,
analog_chans_in: Sequence[str] = ["ai0"],
digital_chans_out: Optional[Sequence[str]] = None,
analog_data_out: Optional[Sequence[np.ndarray]] = None,
digital_data_out: Optional[Sequence] = None,
metadata: Optional[Dict] = None,
params=None,
):
"""[summary]
Args:
savefilename (str, optional): [description]. Defaults to None.
play_order (Iterable, optional): [description]. Defaults to None.
playlist_info ([type], optional): [description]. Defaults to None.
duration (float, optional): [description]. Defaults to -1.
fs (int, optional): [description]. Defaults to 10000.
nb_inputsamples_per_cycle ([type], optional): [description]. Defaults to None.
clock_source (str, optional): None for AI-synced clock.
Use 'OnboardClock' for boards that don't support this (USB-DAQ).
Defaults to None.
analog_chans_out (Sequence, optional): [description]. Defaults to None.
analog_chans_in (Sequence, optional): [description]. Defaults to ['ai0'].
digital_chans_out (Sequence, optional): [description]. Defaults to None.
analog_data_out (Sequence, optional): [description]. Defaults to None.
digital_data_out (Sequence, optional): [description]. Defaults to None.
metadata (dict, optional): [description]. Defaults to {}.
params: part of prot dict (prot['DAQ'])
Raises:
ValueError: [description]
"""
self.status = "initializing"
if daqmx_import_error is not None:
raise ImportError(daqmx_import_error)
self._time_started = None
self.duration = duration
self.fs = fs
self.savefilename = savefilename
self.metadata = metadata
self.playlist_info = playlist_info
self.analog_chans_out = analog_chans_out
self.analog_chans_in = analog_chans_in
self.digital_chans_out = digital_chans_out
# ANALOG OUTPUT
if self.analog_chans_out:
self.taskAO = IOTask(
dev_name=dev_name,
cha_name=self.analog_chans_out,
rate=fs,
clock_source=clock_source,
logger=self.log,
)
if analog_data_out[0].shape[-1] is not len(self.analog_chans_out):
raise ValueError(f"Number of analog output channels ({len(self.analog_chans_out)}) does not match the number of channels in the sound files ({analog_data_out[0].shape[-1]}).")
play_order_new = copy.deepcopy(play_order)
self.taskAO.data_gen = data_playlist(analog_data_out, play_order_new, playlist_info, self.log, name="AO")
if clock_source is None:
self.taskAO.CfgDigEdgeStartTrig("ai/StartTrigger", DAQmx_Val_Rising)
else:
self.taskAO.DisableStartTrig()
# DIGITAL OUTPUT
if self.digital_chans_out:
self.taskDO = IOTask(
dev_name=dev_name,
cha_name=self.digital_chans_out,
rate=fs,
clock_source=clock_source,
logger=self.log,
)
play_order_new = copy.deepcopy(play_order)
self.taskDO.data_gen = data_playlist(digital_data_out, play_order_new, name="DO")
if clock_source is None:
self.taskDO.CfgDigEdgeStartTrig("ai/StartTrigger", DAQmx_Val_Rising)
else:
self.taskDO.DisableStartTrig()
# ANALOG INPUT
if self.analog_chans_in:
self.taskAI = IOTask(
dev_name=dev_name,
cha_name=self.analog_chans_in,
rate=fs,
nb_inputsamples_per_cycle=nb_inputsamples_per_cycle,
clock_source=clock_source,
duration=self.duration,
logger=self.log,
)
self.taskAI.data_rec = []
self.callbacks = []
if metadata is None:
metadata = {}
attrs = {
"rate": fs,
"analog_chans_in": analog_chans_in,
"analog_chans_out": analog_chans_out,
"digital_chans_out": digital_chans_out,
**metadata,
}
common_task_kwargs = {
"file_name": self.savefilename,
"nb_inputsamples_per_cycle": nb_inputsamples_per_cycle,
"nb_analog_chans_in": len(analog_chans_in),
"attrs": attrs,
}
self.callbacks = []
if "callbacks" in params and params["callbacks"]:
for cb_name, cb_params in params["callbacks"].items():
if cb_params is not None:
task_kwargs = {**common_task_kwargs, **cb_params}
else:
task_kwargs = common_task_kwargs
callback = callbacks[cb_name].make_concurrent(task_kwargs=task_kwargs)
self.callbacks.append(callback)
self.taskAI.data_rec.append(callback)
if self.duration > 0: # if zero, will stop when nothing is to be outputted
self._thread_timer = threading.Timer(self.duration, self.finish, kwargs={"stop_service": True})
self.status = "initialized"
self.info: Dict[str, Dict[str, Any]] = dict()
self.info["job"]: Dict[str, Any] = {
"sample rate": f"{self.fs}Hz",
"analog output": self.analog_chans_out,
"digital output": self.digital_chans_out,
"analog input": self.analog_chans_in,
"duration": f"{self.duration}s",
"savefilename": self.savefilename,
"metadata": self.metadata,
}
self.info["playlist"] = self.playlist_info
def start(self):
self.status = "running"
self._time_started = time.time()
for task in self.taskAI.data_rec:
task.start()
# Arm the output tasks - won't start until the AI start is triggered
if self.analog_chans_out:
self.taskAO.StartTask()
if self.digital_chans_out:
self.taskDO.StartTask()
# Start the AI task - generates AI start trigger and triggers the output tasks
self.taskAI.StartTask()
self.log.debug("started")
if hasattr(self, "_thread_timer"):
self.log.debug("duration {0} seconds".format(self.duration))
self._thread_timer.start()
self.log.debug("finish timer started")
def finish(self, stop_service=False):
self.status = "finishing"
self.log.warning("stopping")
if hasattr(self, "_thread_stopper"):
self._thread_stopper.set()
if hasattr(self, "_thread_timer"):
self._thread_timer.cancel()
for callback in self.callbacks:
try:
callback.finish()
except Exception as e:
self.log.warning(e)
try:
self.taskAI.StopTask()
self.log.warning(" stoppedAI")
except InvalidTaskError as e:
self.log.warning(e)
# close any files in the callbacks
for callback in self.callbacks:
try:
callback.close()
except Exception as e:
self.log.warning(e)
self.taskAI.ClearTask()
# stop tasks and properly close callbacks (e.g. flush data to disk and close file)
if hasattr(self, "digital_chans_out") and self.digital_chans_out:
# try:
# self.taskDO.StopTask()
# except GenStoppedToPreventRegenOfOldSamplesError as e:
# pass
# print("\n stoppedDO")
self.taskDO.stop()
if hasattr(self, "analog_chans_out") and self.analog_chans_out:
# try:
# self.taskAO.StopTask()
# except GenStoppedToPreventRegenOfOldSamplesError as e:
# pass
# print("\n stoppedAO")
self.taskAO.stop()
if self.analog_chans_out:
self.taskAO.ClearTask()
if self.digital_chans_out:
self.taskDO.ClearTask()
self.log.warning(" stopped ")
if stop_service:
time.sleep(0.5)
self.service_stop()
def disp(self):
pass
def is_busy(self, ai=True, ao=True):
taskCheckFailed = False
taskIsDoneAI = daq.c_ulong()
if self.analog_chans_out:
taskIsDoneAO = daq.c_ulong()
if ai:
try:
self.taskAI.IsTaskDone(taskIsDoneAI)
except (
daq.InvalidTaskError,
GenStoppedToPreventRegenOfOldSamplesError,
) as e:
taskCheckFailed = True
else:
taskIsDoneAI = 0
if ao and self.analog_chans_out:
try:
self.taskAO.IsTaskDone(taskIsDoneAO)
except (
daq.InvalidTaskError,
GenStoppedToPreventRegenOfOldSamplesError,
) as e:
taskCheckFailed = True
else:
taskIsDoneAO = 0
return not bool(taskIsDoneAI) and not bool(taskIsDoneAO) and not taskCheckFailed
def test(self):
return True
def cleanup(self):
self.finish()
return True
def info(self):
if self.is_busy():
pass # your code here
else:
return None
if __name__ == "__main__":
daqmx_import_error = None
if len(sys.argv) > 1:
ser = sys.argv[1]
else:
ser = "default"
if len(sys.argv) > 2:
port = sys.argv[2]
else:
port = DAQ.SERVICE_PORT
logging.info("Starting DAQ service")
s = DAQ(serializer=ser) # expose class via zerorpc
s.bind("tcp://0.0.0.0:{0}".format(port)) # broadcast on all IPs
s.run()