Is the REST API Phase/Degrees endpoint working?

The QA403 does some really useful processing of the raw data when it does the FFT. I wanted to use the REST API to access this frequency domain data and also the phase. Yes, I can calculate it myself but there will be discrepancies due to slight differences in windowing functions I might use etc. and it would be good to have consistency between measurements with the app the REST API.

All seems good but the I get HTTP status code 400 when I hit /Phase/Degrees endpoint

The other end points I have tried work fine… maybe I am using it wrong or it’s a bug?

PS. I am on the latest release 1.197 ~version 1.189, I will update the software and see if that changes anything.~

Actually after doing a bit more reading of the REST API docs, it seems this is not what I was expecting. I need the complex part of the FFT.

I’ll do this by grabbing the time-domain waveform.

Moreover, I need an endpoint like /Data/Frequency/InputPhase

Hi @Dan, I’ll look at the phase issue you ran into.

For a lot of algorithm development, I have an environment I made that uses live time-domain data from the QA403 and a lot of extensions in c#.

Some code from a recent exploration looks as follows:

 int sampleRate = 48000;
 int fftSize = 32768;
 double resolution = sampleRate / fftSize;

 TG = new TabGraph(tabControl1);

 double[] chirp = SigProcGen.GenChirp(fftSize, 24000, sampleRate, 20, 20000, 1.3);

 TG.AddTabGraph("Chirp", chirp);

 await QA402.SetDefaults();
 QA402.SetSampleRate(48000);
 QA402.SetBufferSize((uint)chirp.Length);
 QA402.SetMaxInput(18);
 await QA402.DoAcquisitionAsync(chirp, chirp);

 LeftRightTimeSeries lr = await QA402.GetInputTimeSeriesAsync();

 double[] outBuf = chirp;
 double[] inBuf = lr.Left;

// Attempt to align the output and input data in time using cross correlation
 if (true)
 {
     // apply cross correlation to align input data
     double[] xc = outBuf.RealToComplex().ForwardFft().ComplexMultiplyRealImag(inBuf.RealToComplex().ForwardFft().ConjugateRealImag()).ReverseFft().ComplexToMag();
     Array.Reverse(xc);
     int peakSampleIndex = xc.IndexOfMaxValue();

     TG.AddTabGraph("XCor", xc);
     if (peakSampleIndex > 0 && peakSampleIndex < 500)
     {
         // Re-center incoming data
         inBuf = inBuf.ShiftArrayLeft(peakSampleIndex + 1);
     }
 }

 TG.AddTabGraph("outBuf", outBuf);
 TG.AddTabGraph("inBuf", inBuf);

 double[] divResult = outBuf.RealToComplex().ForwardFft().ComplexDivision(inBuf.RealToComplex().ForwardFft());


 TG.AddTabGraph("divResult Mag", divResult.ComplexToMag().ToSingleSided());
 TG.AddTabGraph("divResult Phase", divResult.ComplexToPhase(true).ToSingleSided().RadiansToDegrees());

The extensions are nice because it allows you to chain a bunch of operations in a single line. For example, note above where the cross-correlation is computed.

double[] xc = outBuf.RealToComplex().ForwardFft().ComplexMultiplyRealImag(inBuf.RealToComplex().ForwardFft().ConjugateRealImag()).ReverseFft().ComplexToMag();
     Array.Reverse(xc);

Note in the line above how much is being done. And to me, it all works very intuitively based on high-level understandings of signal processing. So, you start with the live time domain data, pad every other sample with zero to make it complex, do the forward FFT on that. And you also to the same with the captured data (make complex, do the forward fft) BUT you then do a conjugate on the input data series. And then you do a complex multiply between the output and input data, reverse FFT on that, conver the complex to magnitude, reverse the array, and you have the cross correlation! Precisely as it is describe in a text book.

Each of the extensions do not compute in place–instead they clone the buffer to ensure you aren’t stomping the data. Not as efficient, but conceptually it ensures a variable you assign hasn’t been stomped behind the scenes.

For example, the extension for RealToComplex appears as follows:

 /// <summary>
 /// Takes an array of real data, and pads every other sample with a zero.
 /// </summary>
 /// <param name="realData"></param>
 /// <returns></returns>
 public static double[] RealToComplex(this double[] realData)
 {
     double[] r = new double[realData.Length * 2];

     for (int i = 0; i < realData.Length; i++)
     {
         r[i * 2] = realData[i];
         r[i * 2 + 1] = 0;
     }

     return r;
 }

And ToSingleSided:

 /// <summary>
 /// Converts a spectrum computed via FFT to a single sided spectrum. The remaining
 /// side is scaled by Sqrt(2) to account for half of the energy being lost. The scaling
 /// is not applied to DC
 /// </summary>
 /// <param name="complexFreqSeries"></param>
 /// <returns></returns>
 public static double[] ToSingleSided(this double[] complexFreqSeries)
 {
     //double[] singleSided = complexFreqSeries.TakeFirstHalf();
     double[] r = new double[complexFreqSeries.Length / 2];

     // Convert to single sided. See page 5 of NI app note 041 to see why 
     // multiplying by sqrt(2). Note we skip DC
     for (int i = 2; i < complexFreqSeries.Length / 2; i++)
     {
         r[i] = complexFreqSeries[i] * Math.Sqrt(2);
     }

     r[0] = complexFreqSeries[0];
     r[1] = complexFreqSeries[1];

     return r;
 }

Extensions naming is important, otherwise you will confuse yourself. The FFTs generally take interleaved real and imaginary data. You can build some very complex classes to handle it all. But I have tended to base most all structuring around the ways the FFT wants to handle the data. So, the base for all my libs is interleaved real and imaginary data which is then just arrays of doubles. Along with extensions to get from interleaved to something else (such as RealToComplex)

And then, notice the TabGraph. I can add tabs to a project, and then quickly add a bunch of plots looking at intermediate results along the way.

I spent some time a while back on Jupyter notebooks with live data from the QA403, which is very nice. But in the end I was relying more and more on external libs for the signal processing. But was running into issues understanding exactly what the libs were doing. And so, it just made more sense to me to build something more streamlined and part of that was a generic app.

In short, if you have a good foundation in signal processing as you do, think about an army of simple helper functions and just dealing with the live time domain data from the QA403. It’s really nice to deal with the data in absolute terms and building out the library of extensions will really help cement how a lot of this stuff works.

Hi Matt,

Thanks for posting the code snippets and the data processing/coding advice, that all makes a lot of sense. As I use the REST API more I think I will do that: a Python API that wraps it.

In any case, the REST API is really powerful, I have made some helper function in my Jupyter notebook and wrote a routine to measure inductance using the signal generator (rather than the chirp automated test). It is working now! Numpy makes doing this kind of thing really easy in Python, take a look (at my messy code) below.

I get the time-domain waveforms, apply the top-hat windowing function, do a “real” FFT that automatically truncates the spectrum, do a complex division to get impedance and then do a few calculations to get inductance.

import requests
import numpy as np
import base64
from scipy import signal

 # Use generator 1 to output a single frequency 
fset = 100.0  # Hz
ampset = -30.0  # dBV
ret = requests.put(
    url + f"/Settings/AudioGen/Gen1/On/{fset}/{ampset}"
)

# Get the data.
print("Start")
requests.post(
    url + "/Acquisition"
)
print("Got Data!")
data = requests.get(
    url + "/Data/Time/Input"
).json()

# Time-domain in volts
left = np.frombuffer(base64.b64decode(data["Left"]), np.float64)
  
# Current sense resistor on right channel to get current
r_sense = 0.1  
right = np.frombuffer(base64.b64decode(data["Right"]), np.float64) / r_sense
  
# Here is where Python really spoils us!

# Apply flattop windowing function to the data in the time-domain, this gives
# somewhat similar ferquency response as the same option in the QA403 software.
# Not identical, but good enough for what I need.
left = left * signal.windows.flattop(left.size)
right = right * signal.windows.flattop(right.size)

# Do a "real" FFT, i.e. throw away negative frequencies.
_left = np.fft.rfft(left)
_right = np.fft.rfft(right)

# Calculate the complex impedance
_z = _left / _right
zabs = np.abs(_z)  #  |Z|
# Phase angle between voltage and current using inductor lags voltage sign convention
phi = np.angle(1.0/_z) 

# Solve phasor
R = zabs / np.sqrt( np.tan( phi)**2 + 1 )  #  resistance
X = zabs * np.tan(phi) / np.sqrt( np.tan( phi)**2 + 1)  # inductive reactance 
L_mH = X / (2 * np.pi * fset) * 1000  # Inductance in mH

I cut out the plotting part, but it also saves an image of the data to disk etc. It’s a bit hacky, I’m doing some of the setup using the QA403, so it’s not exactly self contained. But the REST API is very empowering for when things get a bit too specialised for the app - and you have not yet written one of your helpful automated tests routines! :slight_smile:

It starts with waveforms like this, you immediately see they are very inductor-y.

1 Like

Hi @Dan, very impressive. Do you prefer Jupyter for this type of work? Maybe it would make sense for a python library that spoke directly to the QA40x hardware over USB (no intermediate program needed) and that also knew how to pull the calibration from the box and apply that. And then Jupyter could run standalone (no QA40x app), and with something like scipy for signal processing, most stuff is easy. And a bit of cleanup in a smaller library for things like THD, RMS, generating chirps, setting attens, etc. And then that lib could be pulled in via pip command.

It would be so nice to download a compact standalone environment of some kind like (Jupyter Lab Desktop App), install a lib from git via pip, and start working with live calibrated data, sharing code, applying filters, graphing intermediate results, etc.

Going back the whole GPT thing, I think if there’s a ton of info out there, GPT does a good job of helping you write it. And a simple, cross-platform popular environment with a simple to install lib maybe makes programming for audio tasks accessible to a wider group. Feels like we’re on the cusp, anyway.

PS. A general question for those with experience…@IDC_Dragon wrote a bare-metal USB QA403 player using python a while back . That code should be cross platform as-is, right? That is, it would play on Linux and play on Windows no problem??? Or does the USB stuff need TLC as you move from platform to platform?

My player snippet works with either Windows or Linux, no modification necessary.
Just by intuition, I’m however not convinced that the bare metal realtime buffer handling of both playback and recording would be in best hands with Python, since it does no real multithreading. I forgot the details, is there a timestamp attached to buffers, or how would you sync rec and play?

Hi @IDC_Dragon, the sync is maintained by the hardware. it’s very similar to your code. Looks like you initially submit 10 overlapped/async buffers, and then a worker with callback keeps the queue full. Since the hardware maintains the sync, for every DAC buffer you submit, you could also submit an ADC buffer. And then, as you are alerted that the DAC buffer has gone out, you submit another DAC buffer AND pull out a full ADC buffer. When all the DAC buffers have gone out, you know you have a few ADC buffers in flight. Grab those, and then you’re done.

But I think that your point is an keen observation: If just playback isn’t glitching in python cross-platform, then it seems playback and record together shouldn’t glitch since it’s the same mechanism.

Here some code that starts from @IDC_Dragon’s code and allows you send and receive buffers using the bare_metal interface. This just a test to see how it works and if it seems glitchy or not. It seems pretty solid, which is encouraging. I’m sure there’s a lot of improvement to the code here, Python isn’t something I’m that familiar with and without @IDC_Dragon’s code, I’d never have figured this out.

Next, when time permits, I’ll see if I can pull the calibration constants from the QA40x hardware and apply those so the amplitude accuracy of the python code is just as good as the .

# generate a sine and play that out the QA403 DAC, while capturing the same
# sine on the QA403 ADC input

import time
import struct
import numpy as np
import threading
import queue
import usb1  # 'pip install libusb1' if missing
import matplotlib.pyplot as plt  # for plotting

def generate_sine_wave(num_samples, frequency, amplitude, sample_rate=48000):
	"""
	Generates a sine wave with the specified parameters.

	Args:
		num_samples (int): Total number of samples.
		frequency (float): Frequency of the sine wave in Hz.
		amplitude (float): Peak amplitude of the sine wave.
		sample_rate (int, optional): Sample rate in Hz. Defaults to 48000 Hz.

	Returns:
		np.ndarray: Array of doubles representing the sine wave.
	"""
	# Generate the time array
	t = np.arange(num_samples) / sample_rate

	# Generate the sine wave
	sine_wave = amplitude * np.sin(2 * np.pi * frequency * t)

	return sine_wave

class Registers:  # low level register read/write
	def __init__(self, device):
		self.endpoint_read = usb1.ENDPOINT_IN | 0x01  # EP1 in
		self.endpoint_write = usb1.ENDPOINT_OUT | 0x01  # EP1 out
		self.device = device

	def read(self, reg):
		self.write(0x80 | reg, 0)  # write the address, with MSB set
		data = self.device.bulkRead(self.endpoint_read, 4, 1000)  # read result
		(val,) = struct.unpack('>I', data)  # 32bit big endian
		return val

	def write(self, reg, val):
		buf = struct.pack('>BI', reg, val)  # 8 bit address and 32 bit big endian value
		self.device.bulkWrite(self.endpoint_write, buf, 1000)

class Control:  # device settings
	def __init__(self, registers):
		self.registers = registers
		self.output2reg = {-12: 0, -2: 1, 8: 2, 18: 3}
		self.input2reg = {0: 0, 6: 1, 12: 2, 18: 3, 24:4, 30:5, 36:6, 42:7}
		self.samplerate2reg = {48000: 0, 96000: 1, 192000: 2}

	def set_output(self, gain):  # output relays
		val = self.output2reg[gain]
		self.registers.write(6, val)

	def set_input(self, gain):  # input relays
		val = self.input2reg[gain]
		self.registers.write(5, val)

	def set_samplerate(self, rate):  # streaming rate
		val = self.samplerate2reg[rate]
		self.registers.write(9, val)
		time.sleep(0.1)  # it seems a little pause is needed to swing in

class Stream:  # buffer streaming, with event worker thread
	def __init__(self, context, device, registers):
		self.context = context
		self.device = device
		self.registers = registers
		self.endpoint_read = usb1.ENDPOINT_IN | 0x02  # EP2 in
		self.endpoint_write = usb1.ENDPOINT_OUT | 0x02  # EP2 out
		self.dacQueue = queue.Queue(maxsize=5)  # max. 5 overlapping buffers in flight, block on more
		self.adcQueue = queue.Queue()  # unlimited queue for received data buffers
		self.transfer_helper = usb1.USBTransferHelper()  # use the callback dispatcher
		self.transfer_helper.setEventCallback(usb1.TRANSFER_COMPLETED, self.callback)  # set ours
		self.received_data = bytearray()  # collection of received data bytes

	def start(self):  # start streaming, spawn the thread
		self.thread = threading.Thread(target=self.worker)
		self.running = True
		self.thread.start()
		self.registers.write(8, 0x05)  # start streaming

	def stop(self):  # stop streaming, end the thread
		self.running = False
		self.thread.join()
		self.registers.write(8, 0x00)  # stop streaming

	def write(self, buffer):  # add a buffer to the playback queue
		transfer = self.device.getTransfer()
		transfer.setBulk(self.endpoint_write, buffer, self.transfer_helper, None, 1000)
		transfer.submit()  # asynchronous transfer
		self.dacQueue.put(transfer)  # it doesn't matter what we put in here

		# Submit a USB bulk transfer to read
		read_transfer = self.device.getTransfer()
		read_transfer.setBulk(self.endpoint_read, 16384, self.transfer_helper, None, 1000)
		read_transfer.submit()  # asynchronous transfer
		self.adcQueue.put(read_transfer)  # it doesn't matter what we put in here

	def worker(self):  # event loop for the async transfers
		while self.running or not (self.dacQueue.empty() and self.adcQueue.empty()):  # play until the last
			self.context.handleEvents()

	def callback(self, transfer):  # callback of the worker thread
		if transfer.getEndpoint() == self.endpoint_read:
			self.received_data.extend(transfer.getBuffer())  # collect received data bytes
			self.adcQueue.get()  # unblock the producer (should pop same transfer)
		else:
			self.dacQueue.get()  # unblock the producer (should pop same transfer)

	def collect_remaining_adc_data(self):
		# Wait for all remaining ADC transfers to complete
		while not self.adcQueue.empty():
			self.context.handleEvents()
		return self.received_data

def play():
	with usb1.USBContext() as context:
		# if below fails under Linux with LIBUSB_ERROR_ACCESS, add udev rule with access rights
		device = context.openByVendorIDAndProductID(0x16c0, 0x4e37)  # QA402
		if device is None:
			device = context.openByVendorIDAndProductID(0x16c0, 0x4e39)  # QA403
			if device is None:
				raise SystemExit("no QA402/QA403 analyzer found")
		device.resetDevice()
		with device.claimInterface(0):
			# create our objects
			registers = Registers(device)  # low-level register access
			control = Control(registers)  # device settings
			stream = Stream(context, device, registers)  # buffer streaming

			control.set_output(-2)  # set full scale output
			control.set_input(0)    # set full scale input

			# generate a 1 kHz sine, at an RMS level of 0.1 = -20 dBFS 
			# and a peak level of -17 dBFS
			outData = generate_sine_wave(32768, 1000, 0.141)
			print(f"outData Len: {len(outData)} outData Bytes: {outData.nbytes}")

			# Create left and right channels as 32-bit floats
			leftData = outData.astype(np.float32)
			rightData = outData.astype(np.float32)
			print(f"leftData Len: {len(leftData)} leftData Bytes: {leftData.nbytes}")
			print(f"rightData Len: {len(rightData)} rightData Bytes: {rightData.nbytes}")

			# Interleave the left and right channels
			interleaved_data = np.empty((leftData.size + rightData.size,), dtype=np.float32)
			interleaved_data[0::2] = leftData
			interleaved_data[1::2] = rightData
			print(f"interleaved_data Len: {len(interleaved_data)} interleaved_data Bytes: {interleaved_data.nbytes}")

			# Convert to bytes, multiplying by max int value
			max_int_value = 2**31 - 1
			interleaved_data = (interleaved_data * max_int_value).astype(np.int32)

			# Pack the data into chunks of 16k bytes
			chunk_size = 16384  # 16k bytes
			num_ints_per_chunk = chunk_size // 4  # 32-bit ints, so 4 bytes per int
			total_chunks = len(interleaved_data) // num_ints_per_chunk
			print(f"total_chunks: {total_chunks}")

			stream.start()

			try:
				for i in range(total_chunks):
					print(f"chunk {i} of {total_chunks}")
					chunk = interleaved_data[i * num_ints_per_chunk:(i + 1) * num_ints_per_chunk]
					# Pack the data as 32-bit ints
					buffer = struct.pack('<%di' % len(chunk), *chunk)
					stream.write(buffer)
			finally:
				stream.stop()

			# Collect remaining ADC data
			adc_data = stream.collect_remaining_adc_data()

			# Convert collected ADC data back to int
			adc_data = np.frombuffer(adc_data, dtype=np.int32)

			# Separate interleaved ADC data into left and right channels
			left_adc_data = adc_data[0::2]
			right_adc_data = adc_data[1::2]

			# Convert left and right channels back to float
			left_adc_data = left_adc_data.astype(np.float32) / max_int_value
			right_adc_data = right_adc_data.astype(np.float32) / max_int_value

			# Plot leftData
			plt.figure(figsize=(10, 4))
			plt.plot(leftData[:1000])  # Plot the first 1000 samples for clarity
			plt.title("DAC Data Left Channel")
			plt.xlabel("Sample Number")
			plt.ylabel("Amplitude")
			plt.grid(True)
			plt.show()

			# Plot adc_data
			plt.figure(figsize=(10, 4))
			plt.plot(left_adc_data[:1000])  # Plot the first 1000 samples for clarity
			plt.title("ADC Data Left Channel")
			plt.xlabel("Sample Number")
			plt.ylabel("Amplitude")
			plt.grid(True)
			plt.show()

			# Print received data length
			print(f"Received data length: {len(adc_data)}")

play()

The environment was Jupyter Lab 4.1.8. This comes with a default Python install, so you can install Jupyter Lab and then when it runs there’s an option at the bottom to install the Python environment.

The output is below. When you run it, you’ll see a constant phase relationship between the output and input. This means the adc left channel data can be slid to the left to get the desired sample offset you need.

If I get the calibration stuff working, I’ll move to another thread as Dan’s explorations and teachings here are very informative.

Can confirm this is running on my system too, nice!

Regarding the calibration part, I read the section about that here (GitHub - QuantAsylum/QA40x_BareMetal) and the source code comments. Do you have a routine to determine the calibration constant for a device or is this magic number constant enough for all units?

Hi @Dan, it’s different for every unit. The ADC/DAC combo probably have +/- 100mV of slop, and the attenuators and various opamps are made with 0.1%, but they add up to probably +/-0.5% or so. If you look in the Help->About in the QA40x app, you’ll see a section called Flash Page 0 (Factory Calibration Data). That is unique for each unit and it has the settings needed to bring in the error to +/- 0.02 dB or so. And when you first plug a new QA40x in, the flash data is copied from the flash and saved to the CalibrationData directory in the mydocs/QuantAsylum/QA40x/CalibrationData (the file name uses your device serial number). That is in case your flash ever gets erased some how.

Gotcha, I had a go a reading the file. I guessed the encoding, not too sure how to decode this data into useful numbers.

with open("SN_30ABCD3E_B343.DAT", "r", encoding="cp1252") as f:
    txt = f.read()

This is the binary for the file on my system, numpy has a helpful function for that.

"".join([str(int(x)) for x in np.fromfile('SN_30ABCD3E_B343.DAT', dtype=bool)])

00001000100010111000100000111100111110111110111110111110111110111110111110111110111110111110111110111110111110111110111111111111111111111111111110111110111110111110111110110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000011

A question Matt. If I buy/use a second QA-403, with its own calibration co-efficients/data, and they each get moved or interchanged from machine to machine, does the software account for that? Does it only load/read the files for the S/N that corresponds to the attached QA device?
Cheers.

Hi @restorer-john, yes, the software will always pull the calibration out of the unit that is currently plugged in and use that. So, you can plug in different QA40x and the correct calibration will always be used. Saving the cal data to your machine is done so that there’s a backup. The QA402 was the first product with cal data stored in the flash, and so I guess the worry was that there could be a flash update that accidentally erased everything. And rather than needing everyone to send their units back, the cal data could be restored via some extra software.

Hi @Dan, the flash data is mapped into a c# struct. The first 6 32-bit words are header info. Then comes chunks of data that are 6 bytes each–two byte int indicating the ADC level, and a 4-byte float indicating the gain that must be applied to that level. Ditto for DAC.

Looks like once the calibration is working, then that could become a library, and go into a repo on github, and then that could be installed via pip. And then very quickly it could get built out with basic measurements like RMS and THD if those were needed.

Is this Mac? Or a VM? or ??

Python install on macOS.