Example Programs§
Most of these examples use the argparse
module to handle command line
arguments.
To show a help text explaining all available arguments,
use the --help
argument.
For example:
python play_file.py --help
Play a Sound File§
#!/usr/bin/env python3
"""Load an audio file into memory and play its contents.
NumPy and the soundfile module (https://python-soundfile.readthedocs.io/)
must be installed for this to work.
This example program loads the whole file into memory before starting
playback.
To play very long files, you should use play_long_file.py instead.
This example could simply be implemented like this::
import sounddevice as sd
import soundfile as sf
data, fs = sf.read('my-file.wav')
sd.play(data, fs)
sd.wait()
... but in this example we show a more low-level implementation
using a callback stream.
"""
import argparse
import threading
import sounddevice as sd
import soundfile as sf
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument(
'filename', metavar='FILENAME',
help='audio file to be played back')
parser.add_argument(
'-d', '--device', type=int_or_str,
help='output device (numeric ID or substring)')
args = parser.parse_args(remaining)
event = threading.Event()
try:
data, fs = sf.read(args.filename, always_2d=True)
current_frame = 0
def callback(outdata, frames, time, status):
global current_frame
if status:
print(status)
chunksize = min(len(data) - current_frame, frames)
outdata[:chunksize] = data[current_frame:current_frame + chunksize]
if chunksize < frames:
outdata[chunksize:] = 0
raise sd.CallbackStop()
current_frame += chunksize
stream = sd.OutputStream(
samplerate=fs, device=args.device, channels=data.shape[1],
callback=callback, finished_callback=event.set)
with stream:
event.wait() # Wait until playback is finished
except KeyboardInterrupt:
parser.exit('\nInterrupted by user')
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
Play a Very Long Sound File§
#!/usr/bin/env python3
"""Play an audio file using a limited amount of memory.
The soundfile module (https://python-soundfile.readthedocs.io/) must be
installed for this to work.
In contrast to play_file.py, which loads the whole file into memory
before starting playback, this example program only holds a given number
of audio blocks in memory and is therefore able to play files that are
larger than the available RAM.
This example is implemented using NumPy, see play_long_file_raw.py
for a version that doesn't need NumPy.
"""
import argparse
import queue
import sys
import threading
import sounddevice as sd
import soundfile as sf
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument(
'filename', metavar='FILENAME',
help='audio file to be played back')
parser.add_argument(
'-d', '--device', type=int_or_str,
help='output device (numeric ID or substring)')
parser.add_argument(
'-b', '--blocksize', type=int, default=2048,
help='block size (default: %(default)s)')
parser.add_argument(
'-q', '--buffersize', type=int, default=20,
help='number of blocks used for buffering (default: %(default)s)')
args = parser.parse_args(remaining)
if args.blocksize == 0:
parser.error('blocksize must not be zero')
if args.buffersize < 1:
parser.error('buffersize must be at least 1')
q = queue.Queue(maxsize=args.buffersize)
event = threading.Event()
def callback(outdata, frames, time, status):
assert frames == args.blocksize
if status.output_underflow:
print('Output underflow: increase blocksize?', file=sys.stderr)
raise sd.CallbackAbort
assert not status
try:
data = q.get_nowait()
except queue.Empty as e:
print('Buffer is empty: increase buffersize?', file=sys.stderr)
raise sd.CallbackAbort from e
if len(data) < len(outdata):
outdata[:len(data)] = data
outdata[len(data):].fill(0)
raise sd.CallbackStop
else:
outdata[:] = data
try:
with sf.SoundFile(args.filename) as f:
for _ in range(args.buffersize):
data = f.read(args.blocksize)
if not len(data):
break
q.put_nowait(data) # Pre-fill queue
stream = sd.OutputStream(
samplerate=f.samplerate, blocksize=args.blocksize,
device=args.device, channels=f.channels,
callback=callback, finished_callback=event.set)
with stream:
timeout = args.blocksize * args.buffersize / f.samplerate
while len(data):
data = f.read(args.blocksize)
q.put(data, timeout=timeout)
event.wait() # Wait until playback is finished
except KeyboardInterrupt:
parser.exit('\nInterrupted by user')
except queue.Full:
# A timeout occurred, i.e. there was an error in the callback
parser.exit(1)
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
Play a Very Long Sound File without Using NumPy§
#!/usr/bin/env python3
"""Play an audio file using a limited amount of memory.
This is the same as play_long_file.py, but implemented without using NumPy.
"""
import argparse
import queue
import sys
import threading
import sounddevice as sd
import soundfile as sf
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument(
'filename', metavar='FILENAME',
help='audio file to be played back')
parser.add_argument(
'-d', '--device', type=int_or_str,
help='output device (numeric ID or substring)')
parser.add_argument(
'-b', '--blocksize', type=int, default=2048,
help='block size (default: %(default)s)')
parser.add_argument(
'-q', '--buffersize', type=int, default=20,
help='number of blocks used for buffering (default: %(default)s)')
args = parser.parse_args(remaining)
if args.blocksize == 0:
parser.error('blocksize must not be zero')
if args.buffersize < 1:
parser.error('buffersize must be at least 1')
q = queue.Queue(maxsize=args.buffersize)
event = threading.Event()
def callback(outdata, frames, time, status):
assert frames == args.blocksize
if status.output_underflow:
print('Output underflow: increase blocksize?', file=sys.stderr)
raise sd.CallbackAbort
assert not status
try:
data = q.get_nowait()
except queue.Empty as e:
print('Buffer is empty: increase buffersize?', file=sys.stderr)
raise sd.CallbackAbort from e
if len(data) < len(outdata):
outdata[:len(data)] = data
outdata[len(data):] = b'\x00' * (len(outdata) - len(data))
raise sd.CallbackStop
else:
outdata[:] = data
try:
with sf.SoundFile(args.filename) as f:
for _ in range(args.buffersize):
data = f.buffer_read(args.blocksize, dtype='float32')
if not data:
break
q.put_nowait(data) # Pre-fill queue
stream = sd.RawOutputStream(
samplerate=f.samplerate, blocksize=args.blocksize,
device=args.device, channels=f.channels, dtype='float32',
callback=callback, finished_callback=event.set)
with stream:
timeout = args.blocksize * args.buffersize / f.samplerate
while data:
data = f.buffer_read(args.blocksize, dtype='float32')
q.put(data, timeout=timeout)
event.wait() # Wait until playback is finished
except KeyboardInterrupt:
parser.exit('\nInterrupted by user')
except queue.Full:
# A timeout occurred, i.e. there was an error in the callback
parser.exit(1)
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
Play a Web Stream§
#!/usr/bin/env python3
"""Play a web stream.
ffmpeg-python (https://github.com/kkroening/ffmpeg-python) has to be installed.
If you don't know a stream URL, try http://icecast.spc.org:8000/longplayer
(see https://longplayer.org/ for a description).
"""
import argparse
import queue
import sys
import ffmpeg
import sounddevice as sd
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument(
'url', metavar='URL',
help='stream URL')
parser.add_argument(
'-d', '--device', type=int_or_str,
help='output device (numeric ID or substring)')
parser.add_argument(
'-b', '--blocksize', type=int, default=1024,
help='block size (default: %(default)s)')
parser.add_argument(
'-q', '--buffersize', type=int, default=20,
help='number of blocks used for buffering (default: %(default)s)')
args = parser.parse_args(remaining)
if args.blocksize == 0:
parser.error('blocksize must not be zero')
if args.buffersize < 1:
parser.error('buffersize must be at least 1')
q = queue.Queue(maxsize=args.buffersize)
print('Getting stream information ...')
try:
info = ffmpeg.probe(args.url)
except ffmpeg.Error as e:
sys.stderr.buffer.write(e.stderr)
parser.exit(e)
streams = info.get('streams', [])
if len(streams) != 1:
parser.exit('There must be exactly one stream available')
stream = streams[0]
if stream.get('codec_type') != 'audio':
parser.exit('The stream must be an audio stream')
channels = stream['channels']
samplerate = float(stream['sample_rate'])
def callback(outdata, frames, time, status):
assert frames == args.blocksize
if status.output_underflow:
print('Output underflow: increase blocksize?', file=sys.stderr)
raise sd.CallbackAbort
assert not status
try:
data = q.get_nowait()
except queue.Empty as e:
print('Buffer is empty: increase buffersize?', file=sys.stderr)
raise sd.CallbackAbort from e
assert len(data) == len(outdata)
outdata[:] = data
try:
print('Opening stream ...')
process = ffmpeg.input(
args.url
).output(
'pipe:',
format='f32le',
acodec='pcm_f32le',
ac=channels,
ar=samplerate,
loglevel='quiet',
).run_async(pipe_stdout=True)
stream = sd.RawOutputStream(
samplerate=samplerate, blocksize=args.blocksize,
device=args.device, channels=channels, dtype='float32',
callback=callback)
read_size = args.blocksize * channels * stream.samplesize
print('Buffering ...')
for _ in range(args.buffersize):
q.put_nowait(process.stdout.read(read_size))
print('Starting Playback ...')
with stream:
timeout = args.blocksize * args.buffersize / samplerate
while True:
q.put(process.stdout.read(read_size), timeout=timeout)
except KeyboardInterrupt:
parser.exit('\nInterrupted by user')
except queue.Full:
# A timeout occurred, i.e. there was an error in the callback
parser.exit(1)
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
Play a Sine Signal§
#!/usr/bin/env python3
"""Play a sine signal."""
import argparse
import sys
import numpy as np
import sounddevice as sd
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument(
'frequency', nargs='?', metavar='FREQUENCY', type=float, default=500,
help='frequency in Hz (default: %(default)s)')
parser.add_argument(
'-d', '--device', type=int_or_str,
help='output device (numeric ID or substring)')
parser.add_argument(
'-a', '--amplitude', type=float, default=0.2,
help='amplitude (default: %(default)s)')
args = parser.parse_args(remaining)
start_idx = 0
try:
samplerate = sd.query_devices(args.device, 'output')['default_samplerate']
def callback(outdata, frames, time, status):
if status:
print(status, file=sys.stderr)
global start_idx
t = (start_idx + np.arange(frames)) / samplerate
t = t.reshape(-1, 1)
outdata[:] = args.amplitude * np.sin(2 * np.pi * args.frequency * t)
start_idx += frames
with sd.OutputStream(device=args.device, channels=1, callback=callback,
samplerate=samplerate):
print('#' * 80)
print('press Return to quit')
print('#' * 80)
input()
except KeyboardInterrupt:
parser.exit('')
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
Input to Output Pass-Through§
#!/usr/bin/env python3
"""Pass input directly to output.
https://github.com/PortAudio/portaudio/blob/master/test/patest_wire.c
"""
import argparse
import sounddevice as sd
import numpy # Make sure NumPy is loaded before it is used in the callback
assert numpy # avoid "imported but unused" message (W0611)
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument(
'-i', '--input-device', type=int_or_str,
help='input device (numeric ID or substring)')
parser.add_argument(
'-o', '--output-device', type=int_or_str,
help='output device (numeric ID or substring)')
parser.add_argument(
'-c', '--channels', type=int, default=2,
help='number of channels')
parser.add_argument('--dtype', help='audio data type')
parser.add_argument('--samplerate', type=float, help='sampling rate')
parser.add_argument('--blocksize', type=int, help='block size')
parser.add_argument('--latency', type=float, help='latency in seconds')
args = parser.parse_args(remaining)
def callback(indata, outdata, frames, time, status):
if status:
print(status)
outdata[:] = indata
try:
with sd.Stream(device=(args.input_device, args.output_device),
samplerate=args.samplerate, blocksize=args.blocksize,
dtype=args.dtype, latency=args.latency,
channels=args.channels, callback=callback):
print('#' * 80)
print('press Return to quit')
print('#' * 80)
input()
except KeyboardInterrupt:
parser.exit('')
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
Plot Microphone Signal(s) in Real-Time§
#!/usr/bin/env python3
"""Plot the live microphone signal(s) with matplotlib.
Matplotlib and NumPy have to be installed.
"""
import argparse
import queue
import sys
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
import numpy as np
import sounddevice as sd
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument(
'channels', type=int, default=[1], nargs='*', metavar='CHANNEL',
help='input channels to plot (default: the first)')
parser.add_argument(
'-d', '--device', type=int_or_str,
help='input device (numeric ID or substring)')
parser.add_argument(
'-w', '--window', type=float, default=200, metavar='DURATION',
help='visible time slot (default: %(default)s ms)')
parser.add_argument(
'-i', '--interval', type=float, default=30,
help='minimum time between plot updates (default: %(default)s ms)')
parser.add_argument(
'-b', '--blocksize', type=int, help='block size (in samples)')
parser.add_argument(
'-r', '--samplerate', type=float, help='sampling rate of audio device')
parser.add_argument(
'-n', '--downsample', type=int, default=10, metavar='N',
help='display every Nth sample (default: %(default)s)')
args = parser.parse_args(remaining)
if any(c < 1 for c in args.channels):
parser.error('argument CHANNEL: must be >= 1')
mapping = [c - 1 for c in args.channels] # Channel numbers start with 1
q = queue.Queue()
def audio_callback(indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
if status:
print(status, file=sys.stderr)
# Fancy indexing with mapping creates a (necessary!) copy:
q.put(indata[::args.downsample, mapping])
def update_plot(frame):
"""This is called by matplotlib for each plot update.
Typically, audio callbacks happen more frequently than plot updates,
therefore the queue tends to contain multiple blocks of audio data.
"""
global plotdata
while True:
try:
data = q.get_nowait()
except queue.Empty:
break
shift = len(data)
plotdata = np.roll(plotdata, -shift, axis=0)
plotdata[-shift:, :] = data
for column, line in enumerate(lines):
line.set_ydata(plotdata[:, column])
return lines
try:
if args.samplerate is None:
device_info = sd.query_devices(args.device, 'input')
args.samplerate = device_info['default_samplerate']
length = int(args.window * args.samplerate / (1000 * args.downsample))
plotdata = np.zeros((length, len(args.channels)))
fig, ax = plt.subplots()
lines = ax.plot(plotdata)
if len(args.channels) > 1:
ax.legend([f'channel {c}' for c in args.channels],
loc='lower left', ncol=len(args.channels))
ax.axis((0, len(plotdata), -1, 1))
ax.set_yticks([0])
ax.yaxis.grid(True)
ax.tick_params(bottom=False, top=False, labelbottom=False,
right=False, left=False, labelleft=False)
fig.tight_layout(pad=0)
stream = sd.InputStream(
device=args.device, channels=max(args.channels),
samplerate=args.samplerate, callback=audio_callback)
ani = FuncAnimation(fig, update_plot, interval=args.interval, blit=True)
with stream:
plt.show()
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
Real-Time Text-Mode Spectrogram§
#!/usr/bin/env python3
"""Show a text-mode spectrogram using live microphone data."""
import argparse
import math
import shutil
import numpy as np
import sounddevice as sd
usage_line = ' press <enter> to quit, +<enter> or -<enter> to change scaling '
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
try:
columns, _ = shutil.get_terminal_size()
except AttributeError:
columns = 80
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__ + '\n\nSupported keys:' + usage_line,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument(
'-b', '--block-duration', type=float, metavar='DURATION', default=50,
help='block size (default %(default)s milliseconds)')
parser.add_argument(
'-c', '--columns', type=int, default=columns,
help='width of spectrogram')
parser.add_argument(
'-d', '--device', type=int_or_str,
help='input device (numeric ID or substring)')
parser.add_argument(
'-g', '--gain', type=float, default=10,
help='initial gain factor (default %(default)s)')
parser.add_argument(
'-r', '--range', type=float, nargs=2,
metavar=('LOW', 'HIGH'), default=[100, 2000],
help='frequency range (default %(default)s Hz)')
args = parser.parse_args(remaining)
low, high = args.range
if high <= low:
parser.error('HIGH must be greater than LOW')
# Create a nice output gradient using ANSI escape sequences.
# Stolen from https://gist.github.com/maurisvh/df919538bcef391bc89f
colors = 30, 34, 35, 91, 93, 97
chars = ' :%#\t#%:'
gradient = []
for bg, fg in zip(colors, colors[1:]):
for char in chars:
if char == '\t':
bg, fg = fg, bg
else:
gradient.append(f'\x1b[{fg};{bg + 10}m{char}')
try:
samplerate = sd.query_devices(args.device, 'input')['default_samplerate']
delta_f = (high - low) / (args.columns - 1)
fftsize = math.ceil(samplerate / delta_f)
low_bin = math.floor(low / delta_f)
def callback(indata, frames, time, status):
if status:
text = ' ' + str(status) + ' '
print('\x1b[34;40m', text.center(args.columns, '#'),
'\x1b[0m', sep='')
if any(indata):
magnitude = np.abs(np.fft.rfft(indata[:, 0], n=fftsize))
magnitude *= args.gain / fftsize
line = (gradient[int(np.clip(x, 0, 1) * (len(gradient) - 1))]
for x in magnitude[low_bin:low_bin + args.columns])
print(*line, sep='', end='\x1b[0m\n')
else:
print('no input')
with sd.InputStream(device=args.device, channels=1, callback=callback,
blocksize=int(samplerate * args.block_duration / 1000),
samplerate=samplerate):
while True:
response = input()
if response in ('', 'q', 'Q'):
break
for ch in response:
if ch == '+':
args.gain *= 2
elif ch == '-':
args.gain /= 2
else:
print('\x1b[31;40m', usage_line.center(args.columns, '#'),
'\x1b[0m', sep='')
break
except KeyboardInterrupt:
parser.exit('Interrupted by user')
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
Recording with Arbitrary Duration§
#!/usr/bin/env python3
"""Create a recording with arbitrary duration.
The soundfile module (https://python-soundfile.readthedocs.io/)
has to be installed!
"""
import argparse
import tempfile
import queue
import sys
import sounddevice as sd
import soundfile as sf
import numpy # Make sure NumPy is loaded before it is used in the callback
assert numpy # avoid "imported but unused" message (W0611)
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument(
'filename', nargs='?', metavar='FILENAME',
help='audio file to store recording to')
parser.add_argument(
'-d', '--device', type=int_or_str,
help='input device (numeric ID or substring)')
parser.add_argument(
'-r', '--samplerate', type=int, help='sampling rate')
parser.add_argument(
'-c', '--channels', type=int, default=1, help='number of input channels')
parser.add_argument(
'-t', '--subtype', type=str, help='sound file subtype (e.g. "PCM_24")')
args = parser.parse_args(remaining)
q = queue.Queue()
def callback(indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
if status:
print(status, file=sys.stderr)
q.put(indata.copy())
try:
if args.samplerate is None:
device_info = sd.query_devices(args.device, 'input')
# soundfile expects an int, sounddevice provides a float:
args.samplerate = int(device_info['default_samplerate'])
if args.filename is None:
args.filename = tempfile.mktemp(prefix='delme_rec_unlimited_',
suffix='.wav', dir='')
# Make sure the file is opened before recording anything:
with sf.SoundFile(args.filename, mode='x', samplerate=args.samplerate,
channels=args.channels, subtype=args.subtype) as file:
with sd.InputStream(samplerate=args.samplerate, device=args.device,
channels=args.channels, callback=callback):
print('#' * 80)
print('press Ctrl+C to stop the recording')
print('#' * 80)
while True:
file.write(q.get())
except KeyboardInterrupt:
print('\nRecording finished: ' + repr(args.filename))
parser.exit(0)
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
Using a stream in an asyncio
coroutine§
#!/usr/bin/env python3
"""An example for using a stream in an asyncio coroutine.
This example shows how to create a stream in a coroutine and how to wait for
the completion of the stream.
You need Python 3.7 or newer to run this.
"""
import asyncio
import sys
import numpy as np
import sounddevice as sd
async def record_buffer(buffer, **kwargs):
loop = asyncio.get_event_loop()
event = asyncio.Event()
idx = 0
def callback(indata, frame_count, time_info, status):
nonlocal idx
if status:
print(status)
remainder = len(buffer) - idx
if remainder == 0:
loop.call_soon_threadsafe(event.set)
raise sd.CallbackStop
indata = indata[:remainder]
buffer[idx:idx + len(indata)] = indata
idx += len(indata)
stream = sd.InputStream(callback=callback, dtype=buffer.dtype,
channels=buffer.shape[1], **kwargs)
with stream:
await event.wait()
async def play_buffer(buffer, **kwargs):
loop = asyncio.get_event_loop()
event = asyncio.Event()
idx = 0
def callback(outdata, frame_count, time_info, status):
nonlocal idx
if status:
print(status)
remainder = len(buffer) - idx
if remainder == 0:
loop.call_soon_threadsafe(event.set)
raise sd.CallbackStop
valid_frames = frame_count if remainder >= frame_count else remainder
outdata[:valid_frames] = buffer[idx:idx + valid_frames]
outdata[valid_frames:] = 0
idx += valid_frames
stream = sd.OutputStream(callback=callback, dtype=buffer.dtype,
channels=buffer.shape[1], **kwargs)
with stream:
await event.wait()
async def main(frames=150_000, channels=1, dtype='float32', **kwargs):
buffer = np.empty((frames, channels), dtype=dtype)
print('recording buffer ...')
await record_buffer(buffer, **kwargs)
print('playing buffer ...')
await play_buffer(buffer, **kwargs)
print('done')
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
sys.exit('\nInterrupted by user')
Creating an asyncio
generator for audio blocks§
#!/usr/bin/env python3
"""Creating an asyncio generator for blocks of audio data.
This example shows how a generator can be used to analyze audio input blocks.
In addition, it shows how a generator can be created that yields not only input
blocks but also output blocks where audio data can be written to.
You need Python 3.7 or newer to run this.
"""
import asyncio
import queue
import sys
import numpy as np
import sounddevice as sd
async def inputstream_generator(channels=1, **kwargs):
"""Generator that yields blocks of input data as NumPy arrays."""
q_in = asyncio.Queue()
loop = asyncio.get_event_loop()
def callback(indata, frame_count, time_info, status):
loop.call_soon_threadsafe(q_in.put_nowait, (indata.copy(), status))
stream = sd.InputStream(callback=callback, channels=channels, **kwargs)
with stream:
while True:
indata, status = await q_in.get()
yield indata, status
async def stream_generator(blocksize, *, channels=1, dtype='float32',
pre_fill_blocks=10, **kwargs):
"""Generator that yields blocks of input/output data as NumPy arrays.
The output blocks are uninitialized and have to be filled with
appropriate audio signals.
"""
assert blocksize != 0
q_in = asyncio.Queue()
q_out = queue.Queue()
loop = asyncio.get_event_loop()
def callback(indata, outdata, frame_count, time_info, status):
loop.call_soon_threadsafe(q_in.put_nowait, (indata.copy(), status))
outdata[:] = q_out.get_nowait()
# pre-fill output queue
for _ in range(pre_fill_blocks):
q_out.put(np.zeros((blocksize, channels), dtype=dtype))
stream = sd.Stream(blocksize=blocksize, callback=callback, dtype=dtype,
channels=channels, **kwargs)
with stream:
while True:
indata, status = await q_in.get()
outdata = np.empty((blocksize, channels), dtype=dtype)
yield indata, outdata, status
q_out.put_nowait(outdata)
async def print_input_infos(**kwargs):
"""Show minimum and maximum value of each incoming audio block."""
async for indata, status in inputstream_generator(**kwargs):
if status:
print(status)
print('min:', indata.min(), '\t', 'max:', indata.max())
async def wire_coro(**kwargs):
"""Create a connection between audio inputs and outputs.
Asynchronously iterates over a stream generator and for each block
simply copies the input data into the output block.
"""
async for indata, outdata, status in stream_generator(**kwargs):
if status:
print(status)
outdata[:] = indata
async def main(**kwargs):
print('Some informations about the input signal:')
try:
await asyncio.wait_for(print_input_infos(), timeout=2)
except asyncio.TimeoutError:
pass
print('\nEnough of that, activating wire ...\n')
audio_task = asyncio.create_task(wire_coro(**kwargs))
for i in range(10, 0, -1):
print(i)
await asyncio.sleep(1)
audio_task.cancel()
try:
await audio_task
except asyncio.CancelledError:
print('\nwire was cancelled')
if __name__ == "__main__":
try:
asyncio.run(main(blocksize=1024))
except KeyboardInterrupt:
sys.exit('\nInterrupted by user')