etho.services.utils.concurrent_task#

Communication primitives with a common interface and a helper class for running tasks in independent processes.

class etho.services.utils.concurrent_task.ConcurrentTask(task: Callable, task_kwargs: Dict[str, Any] = {}, comms: Literal['array', 'pipe', 'queue'] = 'queue', comms_kwargs: Dict[str, Any] = {}, taskstopsignal: Optional[Any] = None)[source]#

Helper class for running tasks in independent processes with communication tools attached.

[summary]

Parameters
  • task (Callable) – First arg to task must be the end of the comms and is provided via args.

  • task_kwargs (Dict[str, Any], optional) – Defaults to {}.

  • comms (Literal["array", "pipe", "queue"], optional) – For passing data to the task. Either “queue”, “pipe”, or “array”. “array” if you want speed and don’t mind loosing data (displaying data) or if you want to ensure you are always assessing fresh data (realtime feedback). “queue” is slower but great when data loss is unacceptable (saving data). “pipe” probably never?? Defaults to “queue”.

  • comms_kwargs (Dict[str, Any], optional) – kwargs for constructing comms. Defaults to {}.

  • taskstopsignal (Any, optional) – Data to send over comms that tells the task to stop. Defaults to None.

Raises

ValueError – for unknown comms

class etho.services.utils.concurrent_task.Faucet(connection: multiprocessing.connection.Connection)[source]#

Wrapper for Pipe connection objects that exposes get function for common interface with Queues.

Wraps Connection object returned when calling Pipe and delegates function calls to have a common interface with the Queue.

Parameters

connection (multiprocessing.connection.Connection) – [description]

get(block: bool = True, timeout: Optional[float] = 0.001, empty_value: Optional[Any] = None) Any[source]#

Mimics the logic of the Queue.get.

Parameters
  • block (bool, optional) – [description]. Defaults to True.

  • timeout (float, optional) – [description]. Defaults to 0.001.

  • empty_value ([type], optional) – [description]. Defaults to None.

Returns

[description]

Return type

[type]

class etho.services.utils.concurrent_task.SharedNumpyArray(shape, ctype=<class 'ctypes.c_double'>)[source]#

Class for sharing a numpy array between processes.

Contains functions for synchronized read/write access and a staleness indicator.

[summary]

Parameters
  • shape (tuple/list-like) – [description]

  • ctype ([type], optional) – Data type of the numpy array. Defaults to ctypes.c_double (is np.float64).

get(timeout=None, block=True)[source]#

Returns array values. Block is unsed and their for interface consistency with Queue.

poll()[source]#

Returns true if the array has been update since the last put.

put(data)[source]#

Update the values in the shared array.

property stale#

Thread safe access to whether or not the current array values have been get-ed already.