#!/usr/bin/env python3 import json import os import sys import asyncio import websockets import logging import sounddevice as sd import argparse # meine imports import time from ledpixels import Pixels pixels = Pixels() import avm_sid # spawn process for news to run it in a new process import subprocess global activation_word activation_word = "erwin" global run_stop_only_once_flag run_stop_only_once_flag = True def int_or_str(text): """Helper function for argument parsing.""" try: return int(text) except ValueError: return text def callback(indata, frames, time, status): """This is called (from a separate thread) for each audio block.""" loop.call_soon_threadsafe(audio_queue.put_nowait, bytes(indata)) async def run_test(): with sd.RawInputStream(samplerate=args.samplerate, blocksize = 4000, device=args.device, dtype='int16', channels=1, callback=callback) as device: async with websockets.connect(args.uri) as websocket: await websocket.send('{ "config" : { "sample_rate" : %d } }' % (device.samplerate)) run_stop_only_once_flag = True while True: data = await audio_queue.get() await websocket.send(data) finalResult = await websocket.recv() print(finalResult) if "licht" in finalResult and "an" in finalResult and activation_word in finalResult and run_stop_only_once_flag: print("SCHALTER EIN!") pixels.think() time.sleep(1) pixels.off() avm_sid.lights_on() run_stop_only_once_flag = False if "licht" in finalResult and "aus" in finalResult and activation_word in finalResult and run_stop_only_once_flag: print("SCHALTER AUS!") pixels.think() time.sleep(1) pixels.off() avm_sid.lights_off() run_stop_only_once_flag = False if "vogel" in finalResult and "an" in finalResult and activation_word in finalResult and run_stop_only_once_flag: print("SCHALTER EIN!") pixels.think() time.sleep(1) pixels.off() avm_sid.table_on() run_stop_only_once_flag = False if "vogel" in finalResult and "aus" in finalResult and activation_word in finalResult and run_stop_only_once_flag: print("SCHALTER AUS!") pixels.think() time.sleep(1) pixels.off() avm_sid.table_off() run_stop_only_once_flag = False if "ecke" in finalResult and "an" in finalResult and activation_word in finalResult and run_stop_only_once_flag: print("SCHALTER EIN!") pixels.think() time.sleep(1) pixels.off() avm_sid.corner_on() run_stop_only_once_flag = False if "ecke" in finalResult and "aus" in finalResult and activation_word in finalResult and run_stop_only_once_flag: print("SCHALTER AUS!") pixels.think() time.sleep(1) pixels.off() avm_sid.corner_off() run_stop_only_once_flag = False if "kaffee" in finalResult and "an" in finalResult and activation_word in finalResult and run_stop_only_once_flag: print("SCHALTER EIN!") pixels.think() time.sleep(1) pixels.off() avm_sid.coffee_on() run_stop_only_once_flag = False if "kaffee" in finalResult and "aus" in finalResult and activation_word in finalResult and run_stop_only_once_flag: print("SCHALTER AUS!") pixels.think() time.sleep(1) pixels.off() avm_sid.coffee_off() run_stop_only_once_flag = False if "blinken" in finalResult and activation_word in finalResult and run_stop_only_once_flag: print("Blinken!") pixels.think() time.sleep(2) pixels.off() run_stop_only_once_flag = False if "nachrichten" in finalResult and activation_word in finalResult and run_stop_only_once_flag: pixels.think() time.sleep(1) pixels.off() subprocess.Popen(["sh", "/home/pi/tagesschau/start_news.sh"]) run_stop_only_once_flag = False if "saarland" in finalResult and activation_word in finalResult and run_stop_only_once_flag: pixels.think() time.sleep(1) pixels.off() subprocess.Popen(["sh", "/home/pi/tagesschau/start_platt.sh"]) run_stop_only_once_flag = False if "nova" in finalResult and "leise" in finalResult and activation_word in finalResult and run_stop_only_once_flag: pixels.think() time.sleep(1) pixels.off() subprocess.Popen(["sh", "/home/pi/radio/start_nova.sh"]) run_stop_only_once_flag = False if "funk" in finalResult and "leise" in finalResult and activation_word in finalResult and run_stop_only_once_flag: pixels.think() time.sleep(1) pixels.off() subprocess.Popen(["sh", "/home/pi/radio/start_dlf.sh"]) run_stop_only_once_flag = False if "nova" in finalResult and "laut" in finalResult and activation_word in finalResult and run_stop_only_once_flag: pixels.think() time.sleep(1) pixels.off() subprocess.Popen(["sh", "/home/pi/radio/start_nova_loud.sh"]) run_stop_only_once_flag = False if "funk" in finalResult and "laut" in finalResult and activation_word in finalResult and run_stop_only_once_flag: pixels.think() time.sleep(1) pixels.off() subprocess.Popen(["sh", "/home/pi/radio/start_dlf_loud.sh"]) run_stop_only_once_flag = False if ("stop" in finalResult or "stopp" in finalResult) and activation_word in finalResult and run_stop_only_once_flag: pixels.think() time.sleep(1) pixels.off() subprocess.Popen(["sh", "/home/pi/tagesschau/end_news.sh"]) run_stop_only_once_flag = False if "result" in finalResult and not run_stop_only_once_flag: run_stop_only_once_flag = True pixels.wakeup() time.sleep(1) pixels.off() async def main(): global args global loop global audio_queue parser = argparse.ArgumentParser(add_help=False) parser.add_argument('-l', '--list-devices', action='store_true', help='show list of audio devices and exit') args, remaining = parser.parse_known_args() if args.list_devices: print(sd.query_devices()) parser.exit(0) parser = argparse.ArgumentParser(description="ASR Server", formatter_class=argparse.RawDescriptionHelpFormatter, parents=[parser]) parser.add_argument('-u', '--uri', type=str, metavar='URL', help='Server URL', default='ws://192.168.178.15:2700') parser.add_argument('-d', '--device', type=int_or_str, help='input device (numeric ID or substring)') parser.add_argument('-r', '--samplerate', type=int, help='sampling rate', default=16000) args = parser.parse_args(remaining) loop = asyncio.get_running_loop() audio_queue = asyncio.Queue() logging.basicConfig(level=logging.INFO) await run_test() if __name__ == '__main__': asyncio.run(main())