Files
pizerovoice/server_microphone.py

134 lines
5.3 KiB
Python

#!/usr/bin/env python3
import json
import os
import sys
import asyncio
import websockets
import logging
import sounddevice as sd
import argparse
# meine imports
import time
from ledpixels import Pixels
pixels = Pixels()
import avm_sid
# spawn process for news to run it in a new process
import subprocess
global activation_word
activation_word = "erwin"
global run_stop_only_once_flag
run_stop_only_once_flag = True
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
def callback(indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
loop.call_soon_threadsafe(audio_queue.put_nowait, bytes(indata))
async def run_test():
with sd.RawInputStream(samplerate=args.samplerate, blocksize = 4000, device=args.device, dtype='int16',
channels=1, callback=callback) as device:
async with websockets.connect(args.uri) as websocket:
await websocket.send('{ "config" : { "sample_rate" : %d } }' % (device.samplerate))
run_stop_only_once_flag = True
while True:
data = await audio_queue.get()
await websocket.send(data)
finalResult = await websocket.recv()
print(finalResult)
if "licht" in finalResult and "an" in finalResult and activation_word in finalResult and run_stop_only_once_flag:
print("SCHALTER EIN!")
pixels.wakeup()
time.sleep(1)
pixels.off()
avm_sid.lights_on()
run_stop_only_once_flag = False
if "licht" in finalResult and "aus" in finalResult and activation_word in finalResult and run_stop_only_once_flag:
print("SCHALTER AUS!")
pixels.wakeup()
time.sleep(1)
pixels.off()
avm_sid.lights_off()
run_stop_only_once_flag = False
if "blinken" in finalResult and activation_word in finalResult and run_stop_only_once_flag:
print("Blinken!")
pixels.wakeup()
time.sleep(2)
pixels.off()
run_stop_only_once_flag = False
if "nachrichten" in finalResult and activation_word in finalResult and run_stop_only_once_flag:
pixels.wakeup()
time.sleep(1)
pixels.off()
subprocess.Popen(["sh", "/home/pi/tagesschau/start_news.sh"])
run_stop_only_once_flag = False
if "saarland" in finalResult and activation_word in finalResult and run_stop_only_once_flag:
pixels.wakeup()
time.sleep(1)
pixels.off()
subprocess.Popen(["sh", "/home/pi/tagesschau/start_platt.sh"])
run_stop_only_once_flag = False
if "nova" in finalResult and activation_word in finalResult and run_stop_only_once_flag:
pixels.wakeup()
time.sleep(1)
pixels.off()
subprocess.Popen(["sh", "/home/pi/radio/start_nova.sh"])
run_stop_only_once_flag = False
if "funk" in finalResult and activation_word in finalResult and run_stop_only_once_flag:
pixels.wakeup()
time.sleep(1)
pixels.off()
subprocess.Popen(["sh", "/home/pi/radio/start_dlf.sh"])
run_stop_only_once_flag = False
if ("stop" in finalResult or "stopp" in finalResult) and activation_word in finalResult and run_stop_only_once_flag:
pixels.wakeup()
time.sleep(1)
pixels.off()
subprocess.Popen(["sh", "/home/pi/tagesschau/end_news.sh"])
run_stop_only_once_flag = False
if "result" in finalResult:
run_stop_only_once_flag = True
async def main():
global args
global loop
global audio_queue
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(description="ASR Server",
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument('-u', '--uri', type=str, metavar='URL',
help='Server URL', default='ws://192.168.178.15:2700')
parser.add_argument('-d', '--device', type=int_or_str,
help='input device (numeric ID or substring)')
parser.add_argument('-r', '--samplerate', type=int, help='sampling rate', default=16000)
args = parser.parse_args(remaining)
loop = asyncio.get_running_loop()
audio_queue = asyncio.Queue()
logging.basicConfig(level=logging.INFO)
await run_test()
if __name__ == '__main__':
asyncio.run(main())