etho.services.utils.concurrent_task#
Communication primitives with a common interface and a helper class for running tasks in independent processes.
- class etho.services.utils.concurrent_task.ConcurrentTask(task: Callable, task_kwargs: Dict[str, Any] = {}, comms: Literal['array', 'pipe', 'queue'] = 'queue', comms_kwargs: Dict[str, Any] = {}, taskstopsignal: Optional[Any] = None)[source]#
Helper class for running tasks in independent processes with communication tools attached.
[summary]
- Parameters
task (Callable) – First arg to task must be the end of the comms and is provided via
args
.task_kwargs (Dict[str, Any], optional) – Defaults to {}.
comms (Literal["array", "pipe", "queue"], optional) – For passing data to the task. Either “queue”, “pipe”, or “array”. “array” if you want speed and don’t mind loosing data (displaying data) or if you want to ensure you are always assessing fresh data (realtime feedback). “queue” is slower but great when data loss is unacceptable (saving data). “pipe” probably never?? Defaults to “queue”.
comms_kwargs (Dict[str, Any], optional) – kwargs for constructing comms. Defaults to {}.
taskstopsignal (Any, optional) – Data to send over comms that tells the task to stop. Defaults to None.
- Raises
ValueError – for unknown comms
- class etho.services.utils.concurrent_task.Faucet(connection: multiprocessing.connection.Connection)[source]#
Wrapper for Pipe connection objects that exposes
get
function for common interface with Queues.Wraps Connection object returned when calling Pipe and delegates function calls to have a common interface with the Queue.
- Parameters
connection (multiprocessing.connection.Connection) – [description]
- get(block: bool = True, timeout: Optional[float] = 0.001, empty_value: Optional[Any] = None) Any [source]#
Mimics the logic of the
Queue.get
.- Parameters
block (bool, optional) – [description]. Defaults to True.
timeout (float, optional) – [description]. Defaults to 0.001.
empty_value ([type], optional) – [description]. Defaults to None.
- Returns
[description]
- Return type
[type]
Class for sharing a numpy array between processes.
Contains functions for synchronized read/write access and a staleness indicator.
[summary]
- Parameters
shape (tuple/list-like) – [description]
ctype ([type], optional) – Data type of the numpy array. Defaults to ctypes.c_double (is np.float64).
Returns array values. Block is unsed and their for interface consistency with Queue.
Returns true if the array has been update since the last put.
Update the values in the shared array.
Thread safe access to whether or not the current array values have been get-ed already.