Can’t edit my above posting any more, another update:
Did some reverse engineering and found out how to switch the sample rate. Now with added support for 96 and 192 kHz, which widens my choice of music.
#!/usr/bin/env python3
# play an audio file on QA402 / QA403(untested)
import sys # for argv
import time
import struct
import numpy
import threading
import queue
import usb1 # 'pip install libusb1' if missing
import soundfile # 'pip install PySoundFile' if missing
class Registers: # low level register read/write
def __init__(self, device):
self.endpoint_read = usb1.ENDPOINT_IN | 0x01 # EP1 in
self.endpoint_write = usb1.ENDPOINT_OUT | 0x01 # EP1 out
self.device = device
def read(self, reg):
self.write(0x80 | reg, 0) # write the address, with MSB set
data = self.device.bulkRead(self.endpoint_read, 4, 1000) # read result
(val,) = struct.unpack('>I', data) # 32bit big endian
return val
def write(self, reg, val):
buf = struct.pack('>BI', reg, val) # 8 bit address and 32 bit big endian value
self.device.bulkWrite(self.endpoint_write, buf, 1000)
class Control: # device settings
def __init__(self, registers):
self.registers = registers
self.output2reg = {-12: 0, -2: 1, 8: 2, 18: 3}
self.samplerate2reg = {48000: 0, 96000: 1, 192000: 2}
def set_output(self, gain): # output relays
val = self.output2reg[gain]
self.registers.write(6, val)
def set_samplerate(self, rate): # streaming rate
val = self.samplerate2reg[rate]
self.registers.write(9, val)
time.sleep(0.1) # it seems a little pause is needed to swing in
class Stream: # buffer streaming, with event worker thread
def __init__(self, context, device, registers):
self.context = context
self.device = device
self.registers = registers
self.endpoint_read = usb1.ENDPOINT_IN | 0x02 # EP2 in
self.endpoint_write = usb1.ENDPOINT_OUT | 0x02 # EP2 out
self.bufqueue = queue.Queue(maxsize = 5) # max. 5 overlapping buffers in flight, block on more
self.transfer_helper = usb1.USBTransferHelper() # use the callback dispatcher
self.transfer_helper.setEventCallback(usb1.TRANSFER_COMPLETED, self.callback) # set ours
def start(self): # start streaming, spawn the thread
self.thread = threading.Thread(target = self.worker)
self.running = True
self.thread.start()
self.registers.write(8, 0x05) # start streaming
def stop(self): # stop streaming, end the thread
self.running = False
self.thread.join()
self.registers.write(8, 0x00) # stop streaming
def write(self, buffer): # add a buffer to the playback queue
transfer = self.device.getTransfer()
transfer.setBulk(self.endpoint_write, buffer, self.transfer_helper, None, 1000)
transfer.submit() # asynchronous transfer
self.bufqueue.put(transfer) # it doesn't matter what we put in here
def worker(self): # event loop for the async transfers
while(self.running or not self.bufqueue.empty()): # play until the last
self.context.handleEvents()
def callback(self, transfer): # callback of the worker thread
removed = self.bufqueue.get() # unblock the producer (should pop same transfer)
def play(filename):
with usb1.USBContext() as context:
# if below fails under Linux with LIBUSB_ERROR_ACCESS, add udev rule with access rights
device = context.openByVendorIDAndProductID(0x16c0, 0x4e37) # QA402
if device is None:
device = context.openByVendorIDAndProductID(0x16c0, 0x4e39) # QA403
if device is None:
raise SystemExit("no QA402/QA403 analyzer found")
device.resetDevice()
with device.claimInterface(0):
# create our objects
registers = Registers(device) # low-level register access
control = Control(registers) # device settings
stream = Stream(context, device, registers) # buffer streaming
control.set_output(-12) # set your desired output level here
with soundfile.SoundFile(filename, 'rb') as f:
try:
control.set_samplerate(f.samplerate) # raises KeyError if unsupported
except KeyError:
raise SystemExit("sample rate %d not supported (48|96|192kHz)" % f.samplerate) # or pitch would be wrong
try: # be thread-aware with Ctrl-C
stream.start()
while f.tell() < len(f):
data = f.read(1024, 'int32', True) # some amount of samples
data.resize((len(data) * 2, 1)) # one dimension, alternating left/right samples
iter = numpy.nditer(data) # make it iterable for below
buffer = struct.pack('<%di' % len(data), *iter) # hardware format, little endian
stream.write(buffer) # will block if queue is full
except KeyboardInterrupt:
pass # fall into below end-of-stream signalling and join
stream.stop()
if (__name__ == "__main__"):
assert len(sys.argv) > 1, "usage: %s <audio file>" % sys.argv[0]
play(sys.argv[1])