Here some code that starts from @IDC_Dragon’s code and allows you send and receive buffers using the bare_metal interface. This just a test to see how it works and if it seems glitchy or not. It seems pretty solid, which is encouraging. I’m sure there’s a lot of improvement to the code here, Python isn’t something I’m that familiar with and without @IDC_Dragon’s code, I’d never have figured this out.
Next, when time permits, I’ll see if I can pull the calibration constants from the QA40x hardware and apply those so the amplitude accuracy of the python code is just as good as the .
# generate a sine and play that out the QA403 DAC, while capturing the same
# sine on the QA403 ADC input
import time
import struct
import numpy as np
import threading
import queue
import usb1 # 'pip install libusb1' if missing
import matplotlib.pyplot as plt # for plotting
def generate_sine_wave(num_samples, frequency, amplitude, sample_rate=48000):
"""
Generates a sine wave with the specified parameters.
Args:
num_samples (int): Total number of samples.
frequency (float): Frequency of the sine wave in Hz.
amplitude (float): Peak amplitude of the sine wave.
sample_rate (int, optional): Sample rate in Hz. Defaults to 48000 Hz.
Returns:
np.ndarray: Array of doubles representing the sine wave.
"""
# Generate the time array
t = np.arange(num_samples) / sample_rate
# Generate the sine wave
sine_wave = amplitude * np.sin(2 * np.pi * frequency * t)
return sine_wave
class Registers: # low level register read/write
def __init__(self, device):
self.endpoint_read = usb1.ENDPOINT_IN | 0x01 # EP1 in
self.endpoint_write = usb1.ENDPOINT_OUT | 0x01 # EP1 out
self.device = device
def read(self, reg):
self.write(0x80 | reg, 0) # write the address, with MSB set
data = self.device.bulkRead(self.endpoint_read, 4, 1000) # read result
(val,) = struct.unpack('>I', data) # 32bit big endian
return val
def write(self, reg, val):
buf = struct.pack('>BI', reg, val) # 8 bit address and 32 bit big endian value
self.device.bulkWrite(self.endpoint_write, buf, 1000)
class Control: # device settings
def __init__(self, registers):
self.registers = registers
self.output2reg = {-12: 0, -2: 1, 8: 2, 18: 3}
self.input2reg = {0: 0, 6: 1, 12: 2, 18: 3, 24:4, 30:5, 36:6, 42:7}
self.samplerate2reg = {48000: 0, 96000: 1, 192000: 2}
def set_output(self, gain): # output relays
val = self.output2reg[gain]
self.registers.write(6, val)
def set_input(self, gain): # input relays
val = self.input2reg[gain]
self.registers.write(5, val)
def set_samplerate(self, rate): # streaming rate
val = self.samplerate2reg[rate]
self.registers.write(9, val)
time.sleep(0.1) # it seems a little pause is needed to swing in
class Stream: # buffer streaming, with event worker thread
def __init__(self, context, device, registers):
self.context = context
self.device = device
self.registers = registers
self.endpoint_read = usb1.ENDPOINT_IN | 0x02 # EP2 in
self.endpoint_write = usb1.ENDPOINT_OUT | 0x02 # EP2 out
self.dacQueue = queue.Queue(maxsize=5) # max. 5 overlapping buffers in flight, block on more
self.adcQueue = queue.Queue() # unlimited queue for received data buffers
self.transfer_helper = usb1.USBTransferHelper() # use the callback dispatcher
self.transfer_helper.setEventCallback(usb1.TRANSFER_COMPLETED, self.callback) # set ours
self.received_data = bytearray() # collection of received data bytes
def start(self): # start streaming, spawn the thread
self.thread = threading.Thread(target=self.worker)
self.running = True
self.thread.start()
self.registers.write(8, 0x05) # start streaming
def stop(self): # stop streaming, end the thread
self.running = False
self.thread.join()
self.registers.write(8, 0x00) # stop streaming
def write(self, buffer): # add a buffer to the playback queue
transfer = self.device.getTransfer()
transfer.setBulk(self.endpoint_write, buffer, self.transfer_helper, None, 1000)
transfer.submit() # asynchronous transfer
self.dacQueue.put(transfer) # it doesn't matter what we put in here
# Submit a USB bulk transfer to read
read_transfer = self.device.getTransfer()
read_transfer.setBulk(self.endpoint_read, 16384, self.transfer_helper, None, 1000)
read_transfer.submit() # asynchronous transfer
self.adcQueue.put(read_transfer) # it doesn't matter what we put in here
def worker(self): # event loop for the async transfers
while self.running or not (self.dacQueue.empty() and self.adcQueue.empty()): # play until the last
self.context.handleEvents()
def callback(self, transfer): # callback of the worker thread
if transfer.getEndpoint() == self.endpoint_read:
self.received_data.extend(transfer.getBuffer()) # collect received data bytes
self.adcQueue.get() # unblock the producer (should pop same transfer)
else:
self.dacQueue.get() # unblock the producer (should pop same transfer)
def collect_remaining_adc_data(self):
# Wait for all remaining ADC transfers to complete
while not self.adcQueue.empty():
self.context.handleEvents()
return self.received_data
def play():
with usb1.USBContext() as context:
# if below fails under Linux with LIBUSB_ERROR_ACCESS, add udev rule with access rights
device = context.openByVendorIDAndProductID(0x16c0, 0x4e37) # QA402
if device is None:
device = context.openByVendorIDAndProductID(0x16c0, 0x4e39) # QA403
if device is None:
raise SystemExit("no QA402/QA403 analyzer found")
device.resetDevice()
with device.claimInterface(0):
# create our objects
registers = Registers(device) # low-level register access
control = Control(registers) # device settings
stream = Stream(context, device, registers) # buffer streaming
control.set_output(-2) # set full scale output
control.set_input(0) # set full scale input
# generate a 1 kHz sine, at an RMS level of 0.1 = -20 dBFS
# and a peak level of -17 dBFS
outData = generate_sine_wave(32768, 1000, 0.141)
print(f"outData Len: {len(outData)} outData Bytes: {outData.nbytes}")
# Create left and right channels as 32-bit floats
leftData = outData.astype(np.float32)
rightData = outData.astype(np.float32)
print(f"leftData Len: {len(leftData)} leftData Bytes: {leftData.nbytes}")
print(f"rightData Len: {len(rightData)} rightData Bytes: {rightData.nbytes}")
# Interleave the left and right channels
interleaved_data = np.empty((leftData.size + rightData.size,), dtype=np.float32)
interleaved_data[0::2] = leftData
interleaved_data[1::2] = rightData
print(f"interleaved_data Len: {len(interleaved_data)} interleaved_data Bytes: {interleaved_data.nbytes}")
# Convert to bytes, multiplying by max int value
max_int_value = 2**31 - 1
interleaved_data = (interleaved_data * max_int_value).astype(np.int32)
# Pack the data into chunks of 16k bytes
chunk_size = 16384 # 16k bytes
num_ints_per_chunk = chunk_size // 4 # 32-bit ints, so 4 bytes per int
total_chunks = len(interleaved_data) // num_ints_per_chunk
print(f"total_chunks: {total_chunks}")
stream.start()
try:
for i in range(total_chunks):
print(f"chunk {i} of {total_chunks}")
chunk = interleaved_data[i * num_ints_per_chunk:(i + 1) * num_ints_per_chunk]
# Pack the data as 32-bit ints
buffer = struct.pack('<%di' % len(chunk), *chunk)
stream.write(buffer)
finally:
stream.stop()
# Collect remaining ADC data
adc_data = stream.collect_remaining_adc_data()
# Convert collected ADC data back to int
adc_data = np.frombuffer(adc_data, dtype=np.int32)
# Separate interleaved ADC data into left and right channels
left_adc_data = adc_data[0::2]
right_adc_data = adc_data[1::2]
# Convert left and right channels back to float
left_adc_data = left_adc_data.astype(np.float32) / max_int_value
right_adc_data = right_adc_data.astype(np.float32) / max_int_value
# Plot leftData
plt.figure(figsize=(10, 4))
plt.plot(leftData[:1000]) # Plot the first 1000 samples for clarity
plt.title("DAC Data Left Channel")
plt.xlabel("Sample Number")
plt.ylabel("Amplitude")
plt.grid(True)
plt.show()
# Plot adc_data
plt.figure(figsize=(10, 4))
plt.plot(left_adc_data[:1000]) # Plot the first 1000 samples for clarity
plt.title("ADC Data Left Channel")
plt.xlabel("Sample Number")
plt.ylabel("Amplitude")
plt.grid(True)
plt.show()
# Print received data length
print(f"Received data length: {len(adc_data)}")
play()
The environment was Jupyter Lab 4.1.8. This comes with a default Python install, so you can install Jupyter Lab and then when it runs there’s an option at the bottom to install the Python environment.
The output is below. When you run it, you’ll see a constant phase relationship between the output and input. This means the adc left channel data can be slid to the left to get the desired sample offset you need.
If I get the calibration stuff working, I’ll move to another thread as Dan’s explorations and teachings here are very informative.