QA40x_BareMetal

I’m not sure how useful this will be to most. But if you are working on a platform that isn’t supported and are able to write software, the code at the link below shows how to control the QA402, QA403 (and upcoming QA404) with raw USB reads and writes. You are own your own for computing THD etc. But this does give you outputs and inputs that are tightly controlled (0.1% thin-film resistors) and you also get a constant phase relationship between the DAC and ADC, access to the attenuators, etc. There is no application required to make this work. You write and control it all.

As the readme at the link explains, this means there are 4 ways to interact with the QA40x hardware: The QA40x application, Tractor (for automated tests without writing code), QA40x_REST (which still needs the application running, although it can be run headless), and the “bare metal” option shared here.

So, theoretically, someone could write a GUI that runs in a browser window.

In any case, I hope this is useful. Thanks, Matt

2 Likes

Very nice, thank you for the example code. I’d like to see an audio driver (like getting the ASIO QA401 driver running for current hardware). This could be a start.
The commands are hard-coded, no definition file? Some must be missing. How is the sample rate set, how to use the front panel I2S, how to read power stats, etc? There are 6 endpoints?
So far, got the relays clicking in Python, with this piece of code:

#!/usr/bin/env python3

import time # just for test
import struct
import libusb_package # pip install this, if missing

class Registers:
	def __init__(self, interface):
		self.endpoint_read = interface.endpoints()[0]
		self.endpoint_write = interface.endpoints()[1]
		
	def read(self, reg):
		self.write(0x80 | reg, 0)
		data = self.endpoint_read.read(4)
		(val,) = struct.unpack('>I', data)
		return val
		
	def write(self, reg, val):
		buf = struct.pack('>BI', reg, val)
		self.endpoint_write.write(buf)

class Levels:
	def __init__(self, registers):
		self.registers = registers
		self.input2reg = {0: 0, 6: 1, 12: 2, 18: 3, 24: 4, 30: 5, 36: 6, 42: 7}
		self.output2reg = {-12: 0, -2: 1, 8: 2, 18: 3}
	
	def set_input(self, gain):
		val = self.input2reg[gain]
		self.registers.write(5, val)
		
	def set_output(self, gain):
		val = self.output2reg[gain]
		self.registers.write(6, val)
	

def main():
	device = libusb_package.find(idVendor=0x16c0, idProduct=0x4e37) # QA402
	if device is None:
		device = libusb_package.find(idVendor=0x16c0, idProduct=0x4e39) # QA403
	if device is None:
		raise SystemExit("no analyzer found")
	
	device.set_configuration() # first one
	interface = device[0].interfaces()[0]
	registers = Registers(interface) # create register object

	print(registers.read(0))
	registers.write(0, 1234) # register test
	print(registers.read(0))

	levels = Levels(registers) # create level setting object
	
	for level in range(0, 42+1, 6): # test all input levels
		levels.set_input(level)
		time.sleep(1)

	for level in range(-12, 18+1, 10): # test all output levels
		levels.set_output(level)
		time.sleep(1)

if (__name__ == "__main__"):
	main()

Now need to learn how to do asynchronous USB transfer in Python…
(Python was just for a quick start)

Here’s a very basic Python command line music player, for Windows or Linux: (only 48kHz files, lossless formats like FLAC recommended, no mp3)

#!/usr/bin/env python3

# play an audio file on QA402 / QA403(untested)

import sys # for argv
import struct
import numpy
import threading
import queue
import usb1 # 'pip install libusb1' if missing
import soundfile # 'pip install PySoundFile' if missing

class Registers: # low level register read/write
	def __init__(self, device):
		self.endpoint_read = usb1.ENDPOINT_IN | 0x01 # EP1 in
		self.endpoint_write = usb1.ENDPOINT_OUT | 0x01 # EP1 out
		self.device = device
		
	def read(self, reg):
		self.write(0x80 | reg, 0) # write the address, with MSB set
		data = self.device.bulkRead(self.endpoint_read, 4, 1000) # read result
		(val,) = struct.unpack('>I', data) # 32bit big endian
		return val
		
	def write(self, reg, val):
		buf = struct.pack('>BI', reg, val) # 8 bit address and 32 bit big endian value
		self.device.bulkWrite(self.endpoint_write, buf, 1000)

class Levels: # level settings, with the relays
	def __init__(self, registers):
		self.registers = registers
		self.input2reg = {0: 0, 6: 1, 12: 2, 18: 3, 24: 4, 30: 5, 36: 6, 42: 7}
		self.output2reg = {-12: 0, -2: 1, 8: 2, 18: 3}
	
	def set_input(self, gain):
		val = self.input2reg[gain]
		self.registers.write(5, val)
		
	def set_output(self, gain):
		val = self.output2reg[gain]
		self.registers.write(6, val)

class Stream: # buffer streaming, with event worker thread
	def __init__(self, context, device, registers):
		self.context = context
		self.device = device
		self.registers = registers
		self.endpoint_read = usb1.ENDPOINT_IN | 0x02 # EP2 in
		self.endpoint_write = usb1.ENDPOINT_OUT | 0x02 # EP2 out
		self.bufqueue = queue.Queue(maxsize = 3) # max. 3 overlapping buffers in flight, block on more
		self.transfer_helper = usb1.USBTransferHelper() # use the callback dispatcher
		self.transfer_helper.setEventCallback(usb1.TRANSFER_COMPLETED, self.callback) # set ours

	def start(self): # start streaming, spawn the thread
		self.thread = threading.Thread(target = self.worker)
		self.running = True
		self.thread.start()
		self.registers.write(8, 0x05) # start streaming

	def stop(self): # stop streaming, end the thread
		self.running = False
		self.thread.join()
		self.registers.write(8, 0x00) # stop streaming

	def write(self, buffer): # add a buffer to the playback queue
		transfer = self.device.getTransfer()
		transfer.setBulk(self.endpoint_write, buffer, self.transfer_helper, None, 1000)
		transfer.submit() # asynchronous transfer
		self.bufqueue.put(transfer) # it doesn't matter what we put in here
		
	def worker(self): # event loop for the async transfers
		while(self.running or not self.bufqueue.empty()): # play until the last
			self.context.handleEvents()
	
	def callback(self, transfer): # callback of the worker thread
		removed = self.bufqueue.get() # unblock the producer (should pop same transfer)

def play(filename):
	with usb1.USBContext() as context:
		# if below fails under Linux with LIBUSB_ERROR_ACCESS, add udev rule with access rights
		device = context.openByVendorIDAndProductID(0x16c0, 0x4e37) # QA402
		if device is None:
			device = context.openByVendorIDAndProductID(0x16c0, 0x4e39) # QA403
			if device is None:
				raise SystemExit("no QA402/QA403 analyzer found")
		device.resetDevice()
		with device.claimInterface(0):
			# create our objects
			registers = Registers(device) # low-level register access
			levels = Levels(registers) # level setting
			stream = Stream(context, device, registers) # buffer streaming

			levels.set_output(-12) # set your desired output level here

			with soundfile.SoundFile(filename, 'rb') as f:
				assert f.samplerate == 48000, "only 48 kHz supported" # or pitch would be wrong
				try: # be thread-aware with Ctrl-C
					stream.start()
					while f.tell() < len(f):
						data = f.read(1024, 'int32', True) # some amount of samples
						data.resize((len(data) * 2, 1)) # one dimension, alternating left/right samples
						iter = numpy.nditer(data) # make it iterable for below
						buffer = struct.pack('<%di' % len(data), *iter) # hardware format, little endian
						stream.write(buffer) # will block if queue is full (with 3 entries)
				except KeyboardInterrupt:
					pass # fall into below end-of-stream signalling and join
				stream.stop()

if (__name__ == "__main__"):
	assert len(sys.argv) > 1, "usage: %s <audio file>" % sys.argv[0]
	play(sys.argv[1])

Edit: Now with asynchronous transfer, switched to another libusb wrapper. Examples of async in Python are almost nonexisting.
This should be glitch-free now, I’m off to more listening… :slight_smile:

1 Like

Can’t edit my above posting any more, another update:
Did some reverse engineering and found out how to switch the sample rate. Now with added support for 96 and 192 kHz, which widens my choice of music.

#!/usr/bin/env python3

# play an audio file on QA402 / QA403(untested)

import sys # for argv
import time
import struct
import numpy
import threading
import queue
import usb1 # 'pip install libusb1' if missing
import soundfile # 'pip install PySoundFile' if missing

class Registers: # low level register read/write
	def __init__(self, device):
		self.endpoint_read = usb1.ENDPOINT_IN | 0x01 # EP1 in
		self.endpoint_write = usb1.ENDPOINT_OUT | 0x01 # EP1 out
		self.device = device
		
	def read(self, reg):
		self.write(0x80 | reg, 0) # write the address, with MSB set
		data = self.device.bulkRead(self.endpoint_read, 4, 1000) # read result
		(val,) = struct.unpack('>I', data) # 32bit big endian
		return val
		
	def write(self, reg, val):
		buf = struct.pack('>BI', reg, val) # 8 bit address and 32 bit big endian value
		self.device.bulkWrite(self.endpoint_write, buf, 1000)

class Control: # device settings
	def __init__(self, registers):
		self.registers = registers
		self.output2reg = {-12: 0, -2: 1, 8: 2, 18: 3}
		self.samplerate2reg = {48000: 0, 96000: 1, 192000: 2}
	
	def set_output(self, gain): # output relays
		val = self.output2reg[gain]
		self.registers.write(6, val)
		
	def set_samplerate(self, rate): # streaming rate
		val = self.samplerate2reg[rate]
		self.registers.write(9, val)
		time.sleep(0.1) # it seems a little pause is needed to swing in

class Stream: # buffer streaming, with event worker thread
	def __init__(self, context, device, registers):
		self.context = context
		self.device = device
		self.registers = registers
		self.endpoint_read = usb1.ENDPOINT_IN | 0x02 # EP2 in
		self.endpoint_write = usb1.ENDPOINT_OUT | 0x02 # EP2 out
		self.bufqueue = queue.Queue(maxsize = 5) # max. 5 overlapping buffers in flight, block on more
		self.transfer_helper = usb1.USBTransferHelper() # use the callback dispatcher
		self.transfer_helper.setEventCallback(usb1.TRANSFER_COMPLETED, self.callback) # set ours

	def start(self): # start streaming, spawn the thread
		self.thread = threading.Thread(target = self.worker)
		self.running = True
		self.thread.start()
		self.registers.write(8, 0x05) # start streaming

	def stop(self): # stop streaming, end the thread
		self.running = False
		self.thread.join()
		self.registers.write(8, 0x00) # stop streaming

	def write(self, buffer): # add a buffer to the playback queue
		transfer = self.device.getTransfer()
		transfer.setBulk(self.endpoint_write, buffer, self.transfer_helper, None, 1000)
		transfer.submit() # asynchronous transfer
		self.bufqueue.put(transfer) # it doesn't matter what we put in here
		
	def worker(self): # event loop for the async transfers
		while(self.running or not self.bufqueue.empty()): # play until the last
			self.context.handleEvents()
	
	def callback(self, transfer): # callback of the worker thread
		removed = self.bufqueue.get() # unblock the producer (should pop same transfer)

def play(filename):
	with usb1.USBContext() as context:
		# if below fails under Linux with LIBUSB_ERROR_ACCESS, add udev rule with access rights
		device = context.openByVendorIDAndProductID(0x16c0, 0x4e37) # QA402
		if device is None:
			device = context.openByVendorIDAndProductID(0x16c0, 0x4e39) # QA403
			if device is None:
				raise SystemExit("no QA402/QA403 analyzer found")
		device.resetDevice()
		with device.claimInterface(0):
			# create our objects
			registers = Registers(device) # low-level register access
			control = Control(registers) # device settings
			stream = Stream(context, device, registers) # buffer streaming

			control.set_output(-12) # set your desired output level here

			with soundfile.SoundFile(filename, 'rb') as f:
				try:
					control.set_samplerate(f.samplerate) # raises KeyError if unsupported
				except KeyError:
					raise SystemExit("sample rate %d not supported (48|96|192kHz)" % f.samplerate) # or pitch would be wrong
				try: # be thread-aware with Ctrl-C
					stream.start()
					while f.tell() < len(f):
						data = f.read(1024, 'int32', True) # some amount of samples
						data.resize((len(data) * 2, 1)) # one dimension, alternating left/right samples
						iter = numpy.nditer(data) # make it iterable for below
						buffer = struct.pack('<%di' % len(data), *iter) # hardware format, little endian
						stream.write(buffer) # will block if queue is full
				except KeyboardInterrupt:
					pass # fall into below end-of-stream signalling and join
				stream.stop()

if (__name__ == "__main__"):
	assert len(sys.argv) > 1, "usage: %s <audio file>" % sys.argv[0]
	play(sys.argv[1])
2 Likes

Good grief, @IDC_Dragon, that is impressive…

2 Likes

Thank you for the appreciation.

Heres one more, now with left/right fixed and using a sample rate converter when needed, so it will play all rates.
(I shall move to github perhaps, instead of versioning in a forum thread.)

#!/usr/bin/env python3

# play an audio file on QA402 / QA403(untested)

import sys # for argv
import time
import struct
import numpy
import threading
import queue
import usb1 # 'pip install libusb1' if missing
import soundfile # 'pip install PySoundFile' if missing
import samplerate # 'pip install samplerate' if missing

class Registers: # low level register read/write
	def __init__(self, device):
		self.endpoint_write = usb1.ENDPOINT_OUT | 0x01 # EP1 out
		self.device = device
		
	def write(self, reg, val):
		buf = struct.pack('>BI', reg, val) # 8 bit address and 32 bit big endian value
		self.device.bulkWrite(self.endpoint_write, buf, 1000)

class Control: # device settings
	def __init__(self, registers):
		self.registers = registers
		self.output2reg = {-12: 0, -2: 1, 8: 2, 18: 3}
		self.samplerate2reg = [(48000, 0), (96000, 1), (192000, 2)] # ascending rate
	
	def set_output(self, gain): # output relays
		val = self.output2reg[gain]
		self.registers.write(6, val)
		
	def set_samplerate(self, rate): # set streaming rate, or next higher
		streamrate, val = next((i for i in self.samplerate2reg if i[0] >= rate), self.samplerate2reg[-1]) # next highest or last
		self.registers.write(9, val)
		time.sleep(0.1) # it seems a little pause is needed to swing in
		return streamrate

class Stream: # buffer streaming, with event worker thread
	def __init__(self, context, device, registers):
		self.context = context
		self.device = device
		self.registers = registers
		self.endpoint_dac = usb1.ENDPOINT_OUT | 0x02 # EP2 out
		self.bufqueue = queue.Queue(maxsize = 5*2) # max. 5 overlapping buffers in flight, block on more
		self.transfer_helper = usb1.USBTransferHelper() # use the callback dispatcher
		self.transfer_helper.setEventCallback(usb1.TRANSFER_COMPLETED, self.callback) # set ours

	def start(self): # start streaming, spawn the thread
		self.thread = threading.Thread(target = self.worker)
		self.running = True
		self.thread.start()
		self.registers.write(0x08, 0x05) # start streaming DAC (and ADC?)

	def stop(self): # stop streaming, end the thread
		self.running = False
		self.thread.join()
		self.registers.write(0x08, 0x00) # stop streaming DAC

	def write(self, buffer): # add a buffer to the playback queue
		transfer = self.device.getTransfer()
		swapped = numpy.fliplr(buffer) # swapped left/right view for QA hardware (ToDo: endian swap if machine is big endian)
		transfer.setBulk(self.endpoint_dac, swapped.tobytes(), self.transfer_helper, None, 1000)
		transfer.submit() # asynchronous transfer
		self.bufqueue.put(transfer) # it doesn't matter what we put in here
		
	def worker(self): # event loop for the async transfers
		while(self.running or not self.bufqueue.empty()): # play until the last
			self.context.handleEvents()
	
	def callback(self, transfer): # callback of the worker thread
		removed = self.bufqueue.get() # unblock the producer (should pop same transfer)

def play(filename):
	with usb1.USBContext() as context:
		# if below fails under Linux with LIBUSB_ERROR_ACCESS, add udev rule with access rights
		device = context.openByVendorIDAndProductID(0x16c0, 0x4e37) # QA402
		if device is None:
			device = context.openByVendorIDAndProductID(0x16c0, 0x4e39) # QA403
			if device is None:
				raise SystemExit("no QA402/QA403 analyzer found")
		device.resetDevice()
		with device.claimInterface(0):
			# create our objects
			registers = Registers(device) # low-level register access
			control = Control(registers) # device settings
			stream = Stream(context, device, registers) # buffer streaming

			control.set_output(-2) # set your desired output level here
			
			with soundfile.SoundFile(filename, 'rb') as file:
				rate = control.set_samplerate(file.samplerate) # returns possible rate
				try: # be thread-aware with Ctrl-C
					stream.start()
					if rate == file.samplerate: # no sample rate conversion, integer directly to hardware
						for block in file.blocks(blocksize = 1024, dtype = 'int32', always_2d = True):
							stream.write(block) # will block if queue is full
					else: # use sample rate conversion
						resampler = samplerate.CallbackResampler(
							lambda: file.read(frames = 1024, always_2d = True),
							rate / file.samplerate, # ratio
							converter_type = 'sinc_best', 
							channels = 2)
						float2int = 2**31 # start with 1:1 amplitude
						while len(block := resampler.read(1024)): # read as float
							peak = max(-block.min(), block.max()) # should stay below 1.0, but intersample overshoots may happen
							if (peak * float2int > 0x7FFFFFB2): # magic threshold because of roundoff errors
								float2int = (0x7FFFFFB2 / peak) # impose hard limit to stay below 2**31-1
							block = (block * float2int).astype(int) # scale up to integer range, convert
							stream.write(block) # will block if queue is full
				except KeyboardInterrupt:
					pass # fall into below end-of-stream signalling and join
				stream.stop()

if (__name__ == "__main__"):
	assert len(sys.argv) > 1, "usage: %s <audio file>" % sys.argv[0]
	play(sys.argv[1])
1 Like