#!/usr/bin/env python
import threading
from .ZeroService import BaseZeroService
import zerorpc
import time
import sys
from .utils.log_exceptions import for_all_methods, log_exceptions
import logging
HAS_RPI4_LIB = False
HAS_RPI3_LIB = False
try:
import Adafruit_DHT
HAS_RPI3_LIB = True
ada_import_error_rpi3 = None
except Exception as ada_import_error_rpi3:
pass
try:
import adafruit_dht, board
HAS_RPI4_LIB = True
ada_import_error_rpi4 = None
except Exception as ada_import_error_rpi4:
pass
@for_all_methods(log_exceptions(logging.getLogger(__name__)))
class THU_rpi4:
def __init__(self, pin):
# if ada_import_error_rpi4 is not None:
# raise ada_import_error_rpi4
self.pin = pin
self.sensor = adafruit_dht.DHT22(getattr(board, f"D{self.pin}"))
def read(self):
try:
temperature = self.sensor.temperature
humidity = self.sensor.humidity
except RuntimeError:
temperature = None
humidity = None
return humidity, temperature
@for_all_methods(log_exceptions(logging.getLogger(__name__)))
class THU_rpi3:
def __init__(self, pin):
# if ada_import_error_rpi3 is not None:
# raise ada_import_error_rpi3
self.pin = pin
self.sensor = Adafruit_DHT.DHT22
def read(self):
humidity, temperature = Adafruit_DHT.read_retry(self.sensor, self.pin)
return humidity, temperature
[docs]@for_all_methods(log_exceptions(logging.getLogger(__name__)))
class THU(BaseZeroService):
"""
Temperature and Humidity sensor. connect to rpi via arduino. or directly to rpi if possible
try DHT22 and https://github.com/adafruit/Adafruit_Python_DHT
"""
LOGGING_PORT = 1446
SERVICE_PORT = 4246
SERVICE_NAME = "THU"
def setup(self, pin, delay, duration):
self.pin = pin # data pin
if ada_import_error_rpi4 is not None:
self.sensor = THU_rpi4(self.pin) # type of temperature sensor - make this arg?
elif ada_import_error_rpi3 is not None:
self.sensor = THU_rpi3(self.pin) # type of temperature sensor - make this arg?
else:
raise ValueError("Oh no, could not import dht lib")
self.delay = int(delay) # delay between reads
self.duration = duration # total duration of experiments
# initialize
self.temperature = None
self.humidity = None
# setup up thread
self._thread_timer = threading.Timer(self.duration, self.finish, kwargs={"stop_service": True})
self._thread_stopper = threading.Event() # not sure this is required here - but probably does not hurt
self._queue_thread = threading.Thread(target=self._read_temperature_and_humidity, args=(self._thread_stopper,))
def start(self):
self._time_started = time.time()
self._queue_thread.start()
if self.duration > 0:
self.log.info(f"duration {self.duration} seconds")
self.log.info(f"reading from pin {self.pin} every {self.delay} seconds.")
# will execute FINISH after N seconds
self._thread_timer.start()
self.log.info("finish timer started")
def _read_temperature_and_humidity(self, stop_event):
RUN = True
while RUN:
try:
# self.humidity, self.temperature = Adafruit_DHT.read_retry(self.sensor, self.pin)
self.humidity, self.temperature = self.sensor.read()
try:
self.log.info(f"temperature: {self.temperature:0.1f}; humidity: {self.humidity:0.1f}")
except TypeError as e:
self.log.warning(f"invalid values for temperature ({self.temperature}C) or humidity ({self.humidity}%).")
time.sleep(self.delay)
except Exception as e:
self.log.error(e)
def finish(self, stop_service=False):
self.log.warning("stopping")
if hasattr(self, "_thread_stopper"):
self._thread_stopper.set()
time.sleep(1) # wait for thread to stop
self.log.warning(" stopped ")
self._flush_loggers()
if stop_service:
time.sleep(2)
self.service_stop()
def disp(self):
pass
def is_busy(self):
return self._queue_thread.is_alive() # is this the right way to check whether thread is running?
def info(self):
if self.is_busy():
# NOTE: save to access thread variables? need lock or something?
try:
return f"temperature,{self.temperature:0.1f},C;humidity,{self.humidity:0.1f}%"
except TypeError as e:
return f"invalid values for temperature ({self.temperature}C) or humidity ({self.humidity}%)."
else:
return None
def test(self):
pass
def cleanup(self):
self.finish()
if hasattr(self, "_queue_thread"):
del self._queue_thread
if __name__ == "__main__":
if len(sys.argv) > 1:
ser = sys.argv[1]
else:
ser = "default"
s = THU(serializer=ser)
s.bind("tcp://0.0.0.0:{0}".format(THU.SERVICE_PORT))
s.run()