Source code for etho.services.SndZeroService

#!/usr/bin/env python
import numpy as np
import threading
from .ZeroService import BaseZeroService
import time
import sys
from .utils.log_exceptions import for_all_methods, log_exceptions
import logging

try:
    import pygame

    pygame_import_error = None
except ImportError as e:
    pygame_import_error = e


[docs]@for_all_methods(log_exceptions(logging.getLogger(__name__))) class SND(BaseZeroService): LOGGING_PORT = 1443 SERVICE_PORT = 4243 SERVICE_NAME = "SND" def setup(self, np_sounds, playlist, playlist_items, duration, fs): if pygame_import_error is not None: raise pygame_import_error self.duration = duration self._time_started = None self.log.info("duration {0} seconds".format(self.duration)) # init sound engine pygame.mixer.pre_init(frequency=fs, size=-16, channels=2, buffer=4096) pygame.mixer.init() pygame.init() # init event system self.chan1 = pygame.mixer.find_channel() # Setup the end track event self.chan1.set_endevent(pygame.USEREVENT) self.log.info(pygame.mixer.get_init()) # np arrays -> pygame sndarrays self.playlist = playlist self.log.info("playlist") self.log.info(self.playlist.to_csv()) self.soundlist = list() for np_sound in np_sounds: this_sound = pygame.sndarray.make_sound(np.array(np_sound)) this_sound.set_volume(1.0) self.soundlist.append(this_sound) self.playlist_items = playlist_items self._thread_stopper = threading.Event() self._queue_thread = threading.Thread(target=self._queue_sounds, args=(self._thread_stopper,)) def start(self): self._time_started = time.time() self._queue_thread.start() # TODO: log playlist etc self.log.info("started") def _queue_next(self): if self.playlist_items: self.current_item = self.playlist_items.pop(0) # return and remove first item on list self.chan1.queue(self.soundlist[self.current_item]) def _queue_sounds(self, stop_event): RUN = True playlist_cnt = -1 while RUN and not stop_event.wait(0.1): if self.chan1.get_queue() is None and self.playlist_items: self._queue_next() playlist_cnt += 1 msg = _format_playlist(self.playlist.iloc[self.current_item], playlist_cnt) self.log.info(msg) if not self.playlist_items and not self.is_busy(): self.log.warning("playlist empty and no playback running - stop queueing") RUN = False self.log.warning("playlist empty - stopping service") self.finish(stop_service=True) def finish(self, stop_service=False): self.log.warning("stopping playback") if hasattr(self, "_thread_stopper"): self._thread_stopper.set() if pygame.mixer.get_init(): pygame.mixer.stop() self.log.warning(" stopped playback") self._flush_loggers() if stop_service: time.sleep(1) self.service_stop() def disp(self): pass def is_busy(self): return pygame.mixer.get_busy() def test(self): return True def cleanup(self): self.finish() if hasattr(self, "_queue_thread"): del self._queue_thread pygame.mixer.quit() return True def info(self): if self.is_busy(): return self.playlist.iloc[self.current_item].to_msgpack() else: return None
def _format_playlist(playlist, cnt): string = f"cnt: {cnt}; " for key, val in playlist.items(): string += f"{key}: {val}; " return string if __name__ == "__main__": if len(sys.argv) > 1: ser = sys.argv[1] else: ser = "default" s = SND(serializer=ser) s.bind("tcp://0.0.0.0:{0}".format(SND.SERVICE_PORT)) s.run()