Here’s a very basic Python command line music player, for Windows or Linux: (only 48kHz files, lossless formats like FLAC recommended, no mp3)
#!/usr/bin/env python3
# play an audio file on QA402 / QA403(untested)
import sys # for argv
import struct
import numpy
import threading
import queue
import usb1 # 'pip install libusb1' if missing
import soundfile # 'pip install PySoundFile' if missing
class Registers: # low level register read/write
def __init__(self, device):
self.endpoint_read = usb1.ENDPOINT_IN | 0x01 # EP1 in
self.endpoint_write = usb1.ENDPOINT_OUT | 0x01 # EP1 out
self.device = device
def read(self, reg):
self.write(0x80 | reg, 0) # write the address, with MSB set
data = self.device.bulkRead(self.endpoint_read, 4, 1000) # read result
(val,) = struct.unpack('>I', data) # 32bit big endian
return val
def write(self, reg, val):
buf = struct.pack('>BI', reg, val) # 8 bit address and 32 bit big endian value
self.device.bulkWrite(self.endpoint_write, buf, 1000)
class Levels: # level settings, with the relays
def __init__(self, registers):
self.registers = registers
self.input2reg = {0: 0, 6: 1, 12: 2, 18: 3, 24: 4, 30: 5, 36: 6, 42: 7}
self.output2reg = {-12: 0, -2: 1, 8: 2, 18: 3}
def set_input(self, gain):
val = self.input2reg[gain]
self.registers.write(5, val)
def set_output(self, gain):
val = self.output2reg[gain]
self.registers.write(6, val)
class Stream: # buffer streaming, with event worker thread
def __init__(self, context, device, registers):
self.context = context
self.device = device
self.registers = registers
self.endpoint_read = usb1.ENDPOINT_IN | 0x02 # EP2 in
self.endpoint_write = usb1.ENDPOINT_OUT | 0x02 # EP2 out
self.bufqueue = queue.Queue(maxsize = 3) # max. 3 overlapping buffers in flight, block on more
self.transfer_helper = usb1.USBTransferHelper() # use the callback dispatcher
self.transfer_helper.setEventCallback(usb1.TRANSFER_COMPLETED, self.callback) # set ours
def start(self): # start streaming, spawn the thread
self.thread = threading.Thread(target = self.worker)
self.running = True
self.thread.start()
self.registers.write(8, 0x05) # start streaming
def stop(self): # stop streaming, end the thread
self.running = False
self.thread.join()
self.registers.write(8, 0x00) # stop streaming
def write(self, buffer): # add a buffer to the playback queue
transfer = self.device.getTransfer()
transfer.setBulk(self.endpoint_write, buffer, self.transfer_helper, None, 1000)
transfer.submit() # asynchronous transfer
self.bufqueue.put(transfer) # it doesn't matter what we put in here
def worker(self): # event loop for the async transfers
while(self.running or not self.bufqueue.empty()): # play until the last
self.context.handleEvents()
def callback(self, transfer): # callback of the worker thread
removed = self.bufqueue.get() # unblock the producer (should pop same transfer)
def play(filename):
with usb1.USBContext() as context:
# if below fails under Linux with LIBUSB_ERROR_ACCESS, add udev rule with access rights
device = context.openByVendorIDAndProductID(0x16c0, 0x4e37) # QA402
if device is None:
device = context.openByVendorIDAndProductID(0x16c0, 0x4e39) # QA403
if device is None:
raise SystemExit("no QA402/QA403 analyzer found")
device.resetDevice()
with device.claimInterface(0):
# create our objects
registers = Registers(device) # low-level register access
levels = Levels(registers) # level setting
stream = Stream(context, device, registers) # buffer streaming
levels.set_output(-12) # set your desired output level here
with soundfile.SoundFile(filename, 'rb') as f:
assert f.samplerate == 48000, "only 48 kHz supported" # or pitch would be wrong
try: # be thread-aware with Ctrl-C
stream.start()
while f.tell() < len(f):
data = f.read(1024, 'int32', True) # some amount of samples
data.resize((len(data) * 2, 1)) # one dimension, alternating left/right samples
iter = numpy.nditer(data) # make it iterable for below
buffer = struct.pack('<%di' % len(data), *iter) # hardware format, little endian
stream.write(buffer) # will block if queue is full (with 3 entries)
except KeyboardInterrupt:
pass # fall into below end-of-stream signalling and join
stream.stop()
if (__name__ == "__main__"):
assert len(sys.argv) > 1, "usage: %s <audio file>" % sys.argv[0]
play(sys.argv[1])
Edit: Now with asynchronous transfer, switched to another libusb wrapper. Examples of async in Python are almost nonexisting.
This should be glitch-free now, I’m off to more listening…