Example Programs¶
Play a Sound File¶
#!/usr/bin/env python3
"""Load an audio file and play its contents.
PySoundFile (https://github.com/bastibe/PySoundFile/) has to be installed!
"""
import argparse
import logging
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("filename", help="audio file to be played back")
parser.add_argument("-d", "--device", type=int, help="device ID")
args = parser.parse_args()
try:
import sounddevice as sd
import soundfile as sf
data, fs = sf.read(args.filename, dtype='float32')
sd.play(data, fs, device=args.device, blocking=True)
status = sd.get_status()
if status:
logging.warning(str(status))
except KeyboardInterrupt:
parser.exit('\nInterrupted by user')
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
Input to Output Pass-Through¶
#!/usr/bin/env python3
"""Pass input directly to output.
See https://www.assembla.com/spaces/portaudio/subversion/source/HEAD/portaudio/trunk/test/patest_wire.c
"""
import argparse
import logging
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("-i", "--input-device", type=int, help="input device ID")
parser.add_argument("-o", "--output-device", type=int, help="output device ID")
parser.add_argument("-c", "--channels", type=int, default=2,
help="number of channels")
parser.add_argument("-t", "--dtype", help="audio data type")
parser.add_argument("-s", "--samplerate", type=float, help="sampling rate")
parser.add_argument("-b", "--blocksize", type=int, help="block size")
parser.add_argument("-l", "--latency", type=float, help="latency in seconds")
args = parser.parse_args()
try:
import sounddevice as sd
cumulated_status = sd.CallbackFlags()
def callback(indata, outdata, frames, time, status):
global cumulated_status
cumulated_status |= status
outdata[:] = indata
with sd.Stream(device=(args.input_device, args.output_device),
samplerate=args.samplerate, blocksize=args.blocksize,
dtype=args.dtype, latency=args.latency,
channels=args.channels, callback=callback):
print("#" * 80)
print("press Return to quit")
print("#" * 80)
input()
if cumulated_status:
logging.warning(str(cumulated_status))
except KeyboardInterrupt:
parser.exit('\nInterrupted by user')
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
Plot Microphone Signal(s) in Real-Time¶
#!/usr/bin/env python3
"""Plot the live microphone signal(s) with matplotlib."""
import argparse
from queue import Queue, Empty
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
parser.add_argument(
'-d', '--device', type=int_or_str,
help='input device (numeric ID or substring)')
parser.add_argument(
'-w', '--window', type=float, default=200, metavar='DURATION',
help='visible time slot (default: %(default)s ms)')
parser.add_argument(
'-i', '--interval', type=float, default=30,
help='minimum time between plot updates (default: %(default)s ms)')
parser.add_argument(
'-b', '--blocksize', type=int, help='block size (in samples)')
parser.add_argument(
'-r', '--samplerate', type=float, help='sampling rate of audio device')
parser.add_argument(
'-n', '--downsample', type=int, default=10, metavar='N',
help='display every Nth sample (default: %(default)s)')
parser.add_argument(
'channels', type=int, default=[1], nargs='*', metavar='CHANNEL',
help='input channels to plot (default: the first)')
args = parser.parse_args()
if any(c < 1 for c in args.channels):
parser.error('argument CHANNEL: must be >= 1')
mapping = [c - 1 for c in args.channels] # Channel numbers start with 1
queue = Queue()
def audio_callback(indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
if status:
print(status, flush=True)
# Fancy indexing with mapping creates a (necessary!) copy:
queue.put(indata[::args.downsample, mapping])
def update_plot(frame):
"""This is called by matplotlib for each plot update.
Typically, audio callbacks happen more frequently than plot updates,
therefore the queue tends to contain multiple blocks of audio data.
"""
global plotdata
block = True # The first read from the queue is blocking ...
while True:
try:
data = queue.get(block=block)
except Empty:
break
shift = len(data)
plotdata = np.roll(plotdata, -shift, axis=0)
plotdata[-shift:, :] = data
block = False # ... all further reads are non-blocking
for column, line in enumerate(lines):
line.set_ydata(plotdata[:, column])
return lines
try:
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
import numpy as np
import sounddevice as sd
if args.list_devices:
print(sd.query_devices())
parser.exit()
if args.samplerate is None:
device_info = sd.query_devices(args.device, 'input')
args.samplerate = device_info['default_samplerate']
length = np.ceil(args.window * args.samplerate / (1000 * args.downsample))
plotdata = np.zeros((length, len(args.channels)))
fig, ax = plt.subplots()
lines = ax.plot(plotdata)
if len(args.channels) > 1:
ax.legend(['channel {}'.format(c) for c in args.channels],
loc='lower left', ncol=len(args.channels))
ax.axis((0, len(plotdata), -1, 1))
ax.set_yticks([0])
ax.yaxis.grid(True)
ax.tick_params(bottom='off', top='off', labelbottom='off',
right='off', left='off', labelleft='off')
fig.tight_layout(pad=0)
stream = sd.InputStream(
device=args.device, channels=max(args.channels),
samplerate=args.samplerate, callback=audio_callback)
ani = FuncAnimation(fig, update_plot, interval=args.interval, blit=True)
with stream:
plt.show()
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
Real-Time Text-Mode Spectrogram¶
#!/usr/bin/env python3
"""Show a text-mode spectrogram using live microphone data."""
import argparse
import logging
import numpy as np
import shutil
usage_line = ' press <enter> to quit, +<enter> or -<enter> to change scaling '
try:
columns, _ = shutil.get_terminal_size()
except AttributeError:
columns = 80
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-l', '--list-devices', action='store_true',
help='list audio devices and exit')
parser.add_argument('-b', '--block-duration', type=float,
metavar='DURATION', default=50,
help='block size (default %(default)s milliseconds)')
parser.add_argument('-c', '--columns', type=int, default=columns,
help='width of spectrogram')
parser.add_argument('-d', '--device', type=int, help='input device ID')
parser.add_argument('-g', '--gain', type=float, default=10,
help='initial gain factor (default %(default)s)')
parser.add_argument('-r', '--range', type=float, nargs=2,
metavar=('LOW', 'HIGH'), default=[100, 2000],
help='frequency range (default %(default)s Hz)')
args = parser.parse_args()
low, high = args.range
if high <= low:
parser.error("HIGH must be greater than LOW")
# Create a nice output gradient using ANSI escape sequences.
# Stolen from https://gist.github.com/maurisvh/df919538bcef391bc89f
colors = 30, 34, 35, 91, 93, 97
chars = ' :%#\t#%:'
gradient = []
for bg, fg in zip(colors, colors[1:]):
for char in chars:
if char == '\t':
bg, fg = fg, bg
else:
gradient.append('\x1b[{};{}m{}'.format(fg, bg + 10, char))
try:
import sounddevice as sd
if args.list_devices:
print(sd.query_devices())
parser.exit()
samplerate = sd.query_devices(args.device, 'input')['default_samplerate']
delta_f = (high - low) / (args.columns - 1)
fftsize = np.ceil(samplerate / delta_f).astype(int)
low_bin = np.floor(low / delta_f)
cumulated_status = sd.CallbackFlags()
def callback(indata, frames, time, status):
global cumulated_status
cumulated_status |= status
if any(indata):
magnitude = np.abs(np.fft.rfft(indata[:, 0], n=fftsize))
magnitude *= args.gain / fftsize
line = (gradient[int(np.clip(x, 0, 1) * (len(gradient) - 1))]
for x in magnitude[low_bin:low_bin + args.columns])
print(*line, sep='', end='\x1b[0m\n', flush=True)
else:
print('no input', flush=True)
with sd.InputStream(device=args.device, channels=1, callback=callback,
blocksize=int(samplerate * args.block_duration / 1000),
samplerate=samplerate):
while True:
response = input()
if response in ('', 'q', 'Q'):
break
for ch in response:
if ch == '+':
args.gain *= 2
elif ch == '-':
args.gain /= 2
else:
print('\x1b[31;40m', usage_line.center(args.columns, '#'),
'\x1b[0m', sep='', flush=True)
break
if cumulated_status:
logging.warning(str(cumulated_status))
except KeyboardInterrupt:
parser.exit('Interrupted by user')
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))