QA40x_BareMetal

Can’t edit my above posting any more, another update:
Did some reverse engineering and found out how to switch the sample rate. Now with added support for 96 and 192 kHz, which widens my choice of music.

#!/usr/bin/env python3

# play an audio file on QA402 / QA403(untested)

import sys # for argv
import time
import struct
import numpy
import threading
import queue
import usb1 # 'pip install libusb1' if missing
import soundfile # 'pip install PySoundFile' if missing

class Registers: # low level register read/write
	def __init__(self, device):
		self.endpoint_read = usb1.ENDPOINT_IN | 0x01 # EP1 in
		self.endpoint_write = usb1.ENDPOINT_OUT | 0x01 # EP1 out
		self.device = device
		
	def read(self, reg):
		self.write(0x80 | reg, 0) # write the address, with MSB set
		data = self.device.bulkRead(self.endpoint_read, 4, 1000) # read result
		(val,) = struct.unpack('>I', data) # 32bit big endian
		return val
		
	def write(self, reg, val):
		buf = struct.pack('>BI', reg, val) # 8 bit address and 32 bit big endian value
		self.device.bulkWrite(self.endpoint_write, buf, 1000)

class Control: # device settings
	def __init__(self, registers):
		self.registers = registers
		self.output2reg = {-12: 0, -2: 1, 8: 2, 18: 3}
		self.samplerate2reg = {48000: 0, 96000: 1, 192000: 2}
	
	def set_output(self, gain): # output relays
		val = self.output2reg[gain]
		self.registers.write(6, val)
		
	def set_samplerate(self, rate): # streaming rate
		val = self.samplerate2reg[rate]
		self.registers.write(9, val)
		time.sleep(0.1) # it seems a little pause is needed to swing in

class Stream: # buffer streaming, with event worker thread
	def __init__(self, context, device, registers):
		self.context = context
		self.device = device
		self.registers = registers
		self.endpoint_read = usb1.ENDPOINT_IN | 0x02 # EP2 in
		self.endpoint_write = usb1.ENDPOINT_OUT | 0x02 # EP2 out
		self.bufqueue = queue.Queue(maxsize = 5) # max. 5 overlapping buffers in flight, block on more
		self.transfer_helper = usb1.USBTransferHelper() # use the callback dispatcher
		self.transfer_helper.setEventCallback(usb1.TRANSFER_COMPLETED, self.callback) # set ours

	def start(self): # start streaming, spawn the thread
		self.thread = threading.Thread(target = self.worker)
		self.running = True
		self.thread.start()
		self.registers.write(8, 0x05) # start streaming

	def stop(self): # stop streaming, end the thread
		self.running = False
		self.thread.join()
		self.registers.write(8, 0x00) # stop streaming

	def write(self, buffer): # add a buffer to the playback queue
		transfer = self.device.getTransfer()
		transfer.setBulk(self.endpoint_write, buffer, self.transfer_helper, None, 1000)
		transfer.submit() # asynchronous transfer
		self.bufqueue.put(transfer) # it doesn't matter what we put in here
		
	def worker(self): # event loop for the async transfers
		while(self.running or not self.bufqueue.empty()): # play until the last
			self.context.handleEvents()
	
	def callback(self, transfer): # callback of the worker thread
		removed = self.bufqueue.get() # unblock the producer (should pop same transfer)

def play(filename):
	with usb1.USBContext() as context:
		# if below fails under Linux with LIBUSB_ERROR_ACCESS, add udev rule with access rights
		device = context.openByVendorIDAndProductID(0x16c0, 0x4e37) # QA402
		if device is None:
			device = context.openByVendorIDAndProductID(0x16c0, 0x4e39) # QA403
			if device is None:
				raise SystemExit("no QA402/QA403 analyzer found")
		device.resetDevice()
		with device.claimInterface(0):
			# create our objects
			registers = Registers(device) # low-level register access
			control = Control(registers) # device settings
			stream = Stream(context, device, registers) # buffer streaming

			control.set_output(-12) # set your desired output level here

			with soundfile.SoundFile(filename, 'rb') as f:
				try:
					control.set_samplerate(f.samplerate) # raises KeyError if unsupported
				except KeyError:
					raise SystemExit("sample rate %d not supported (48|96|192kHz)" % f.samplerate) # or pitch would be wrong
				try: # be thread-aware with Ctrl-C
					stream.start()
					while f.tell() < len(f):
						data = f.read(1024, 'int32', True) # some amount of samples
						data.resize((len(data) * 2, 1)) # one dimension, alternating left/right samples
						iter = numpy.nditer(data) # make it iterable for below
						buffer = struct.pack('<%di' % len(data), *iter) # hardware format, little endian
						stream.write(buffer) # will block if queue is full
				except KeyboardInterrupt:
					pass # fall into below end-of-stream signalling and join
				stream.stop()

if (__name__ == "__main__"):
	assert len(sys.argv) > 1, "usage: %s <audio file>" % sys.argv[0]
	play(sys.argv[1])
2 Likes