Skip to content

Instantly share code, notes, and snippets.

@LukeGary462
Created November 9, 2022 17:52
Show Gist options
  • Select an option

  • Save LukeGary462/ba40afc73ecdbd7a9320a88351b0384c to your computer and use it in GitHub Desktop.

Select an option

Save LukeGary462/ba40afc73ecdbd7a9320a88351b0384c to your computer and use it in GitHub Desktop.
oscilloscope instrument python using instrument.py some things are a little broken...
from instruments.instrument import Instrument
from pyvisa import (VisaIOError, InvalidSession, VisaIOWarning, log_to_screen, ResourceManager)
import pprint as pp
import numpy as np
class OscilloscopeModels:
"""
This class describes oscilloscope models.
"""
def __init__(self):
self.models = {}
self.models['DS1074Z'] = DS1074Z
self.models['DSOX1102G'] = DSOX1102G
self.models['MSOX3032T'] = MSOX3032T
def get(self, model: str) -> Instrument:
"""
Gets the specified model.
:param model: The model
:type model: str
:returns: oscilloscope class
:rtype: Instrument
"""
return self.models.get(model, None)
def is_valid(self, model: str) -> bool:
return model in self.models.keys()
def connect_to_oscilloscope(
model: str,
scope_serial: str = None,
rs232: bool = False,
tcpip: bool = False) -> object:
"""
Connects to oscilloscope.
:param model: The model
:type model: str
:param scope_serial: The oscilloscope serial number
:type scope_serial: str
:param tcpip: The tcpip
:type tcpip: bool
:returns: oscilloscope obbject if model is valid, none if not
:rtype: object
"""
scope_obj = None
scope = OscilloscopeModels().get(model)
if scope:
try:
scope_obj = scope(
serial_number=scope_serial,
include_tcpip=tcpip,
include_rs232=rs232,
)
except (VisaIOError, VisaIOWarning, InvalidSession):
raise Exception(f'Could not connect to scope {model}:{scope_serial}')
return scope_obj
class DS1074Z(Instrument):
"""
This class describes a Rigol ds1074z scope.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.__inst_init__(model='DS1074Z', **kwargs)
def get_waveform(self, ch, **kwargs):
'''get waveform data'''
self._debug_enable = True
self.debug(f'waveform capture start : \n\tkwargs{kwargs}')
self.write_configs(
configs=[
f'wav:sour chan{ch}',
'wav:mode raw',
'wav:form byte',
'wav:star raw',
'wav:stop raw'
]
)
sample_rate = float(self.query('acq:srate?'))
preamble_resp = self.query('wav:pre?')
preamble_keys = [
'format', 'type', 'points',
'count', 'xinc', 'xorigin',
'xref', 'yinc', 'yorigin',
'yref'
]
preamble = {}
for idx, p in enumerate(preamble_resp.split(',')):
preamble[preamble_keys[idx]] = float(p)
self.debug(pp.pformat(preamble))
timestep = preamble.get('xinc')
timestart = preamble.get('xorigin')
data = self.device.query_binary_values(
'wav:data?\n',
datatype='B',
is_big_endian=True
)
data = data[11:-1]
data = [
float(d)*float(preamble.get('yinc', 1.0))
for d in data
]
self.debug(f'first sample : {int(data[0])}')
self.debug(f'last sample : {int(data[-1])}')
t = []
for idx, d in enumerate(data):
t.append(timestart+(idx*timestep))
return {
'time': t,
'data': data,
'preamble': preamble,
'sample_rate': sample_rate
}
def get_waveform_ascii(self, ch: int, **kwargs):
"""
Gets the waveform.
:param ch: channel index
:type ch: int
:param kwargs: The keywords arguments
:type kwargs: dictionary
"""
''' read the trace data in ascii and let the scope
format to floats of the sampled voltages
'''
self._debug_enable = True
self.debug(f'waveform capture start : \n\tkwargs{kwargs}')
waveform_mode = kwargs.get('mode', 'norm')
self.write_configs(
configs=[
f'wav:sour chan{ch}',
f'wav:mode {waveform_mode}',
'wav:form ascii',
]
)
sample_rate = float(self.query('acq:srate?'))
preamble_resp = self.query('wav:pre?')
preamble_keys = [
'format', 'type', 'points',
'count', 'xinc', 'xorigin',
'xref', 'yinc', 'yorigin',
'yref'
]
preamble = {}
for idx, p in enumerate(preamble_resp.split(',')):
preamble[preamble_keys[idx]] = float(p)
self.debug(pp.pformat(preamble))
timestep = preamble.get('xinc')
timestart = preamble.get('xorigin')
self.debug_enable = False
data = self.query(
f'wav:data?',
is_ascii=True,
)
self.debug_enable = True
''' data is returned as csv, char by char. join all chars, then split on commas '''
data = ''.join(data).split(',')
''' throw out the header since it just describes the number of datapoints'''
data[0] = '.'+data[0].split('.')[-1]
self.debug(f'first sample : {data[0]}')
self.debug(f'last sample : {data[-1]}')
data = [float(d) for d in data]
t = []
for idx, d in enumerate(data):
t.append(timestart+(idx*timestep))
return {
'time': t,
'data': data,
'preamble': preamble,
'sample_rate': sample_rate
}
class MSOX3032T(Instrument):
"""
This class describes a msox 3032 t.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.__inst_init__(model='MSO-X 3032T', **kwargs)
def get_waveform(self, ch, **kwargs):
"""
Gets the waveform.
:param ch: channel name
:type ch: string
:param kwargs: The keywords arguments
:type kwargs: dictionary
"""
acq_source = kwargs.get('acq_source', 'raw')
self.debug(f'waveform capture start : \n\tkwargs{kwargs}')
self.write_configs(
configs=[
f'wav:sour chan{ch}',
f'wav:points:mode {acq_source}',
'wav:form byte',
'wav:uns 1',
]
)
sample_rate = float(self.query('acq:srate?'))
preamble_resp = self.query('wav:pre?')
preamble_keys = [
'format', 'type', 'points',
'count', 'xinc', 'xorigin',
'xref', 'yinc', 'yorigin',
'yref'
]
preamble = {}
for idx, p in enumerate(preamble_resp.split(',')):
preamble[preamble_keys[idx]] = float(p)
self.debug(pp.pformat(preamble))
timestep = preamble.get('xinc')
timestart = preamble.get('xorigin')
data = self.device.query_binary_values(
'wav:data?',
datatype='B',
)
print(f'{data[0:100]}')
self.debug(f'first sample : {data[0]}')
self.debug(f'last sample : {data[-1]}')
yinc = float(preamble.get('yinc', 1.0))
yorg = float(preamble.get('yorigin', 0.0))
yref = float(preamble.get('yref', 1.0))
data = [yorg+((float(d)-yref)*yinc) for d in data]
t = []
for idx, d in enumerate(data):
t.append(timestart+(idx*timestep))
return {
'time': t,
'data': data,
'preamble': preamble,
'sample_rate': sample_rate
}
def _get_preamble(self) -> dict:
preamble_resp = self.query('wav:pre?')
preamble_keys = [
'format', 'type', 'points',
'count', 'xinc', 'xorigin',
'xref', 'yinc', 'yorigin',
'yref'
]
preamble = {}
for idx, p in enumerate(preamble_resp.split(',')):
preamble[preamble_keys[idx]] = float(p)
self.debug(pp.pformat(preamble))
return preamble
def get_digitized_waveform(self, ch, **kwargs):
"""
Gets the waveform from digitizer.
:param ch: channel name
:type ch: string
:param kwargs: The keywords arguments
:type kwargs: dictionary
"""
requested_sample_rate = float(kwargs.get('sample_rate', 20e6))
requested_sample_duration = float(kwargs.get('sample_duration', None))
''' set the sample rate '''
sample_rate = self.set_sample_rate(
sample_rate=requested_sample_rate,
)
''' calculate the minimum number of points needed given the
requested sample duration. This will keep captures fast when xfering
over scpi
'''
valid_memory_depths = [
100,250,500,1000,
2000,5000,10000,20000,
50000,100000,200000,500000,
1000000,2000000,4000000,8000000,
]
mdepth_to_use = 100
num_samples_needed = sample_rate * requested_sample_duration
self.debug(f'samples needed : {num_samples_needed}')
for idx, mdepth in enumerate(valid_memory_depths):
if num_samples_needed > mdepth:
continue
if num_samples_needed <= mdepth:
mdepth_to_use = valid_memory_depths[idx+1]
break
self.debug(f'setting mdepth to : {mdepth_to_use} samples')
self.write(f'ACQ:POINT:ANAL {mdepth_to_use}')
''' set time range of full window '''
self.write(f'tim:range {requested_sample_duration}')
self.write_configs(
configs=[
f'wav:sour chan{ch}',
f'wav:points:mode raw',
'wav:form byte',
'wav:uns 1',
]
)
self.write(f'dig chan{ch}')
data = self.device.query_binary_values(
'wav:data?',
datatype='B',
)
yinc = float(self.query('wav:yinc?'))
yorg = float(self.query('wav:yor?'))
yref = float(self.query('wav:yref?'))
timestep = float(self.query('wav:xinc?'))
timestart = float(self.query('wav:xor?'))
data = [yorg+((float(d)-yref)*yinc) for d in data]
t = []
for idx, d in enumerate(data):
t.append(timestart+(idx*timestep))
return {
'time': t,
'data': data,
'sample_rate': sample_rate,
'preamble': {
'yinc': yinc,
'yorg': yorg,
'yref': yref,
'xinc': timestep,
'xorg': timestart,
}
}
def set_sample_rate(self, **kwargs) -> float:
'''
set the sample rate of the scope
'''
requested_sample_rate = kwargs.get('sample_rate')
requested_sample_duration = kwargs.get('sample_duration', None)
# Sampling Rate = Memory Depth / Time on Screen
current_sample_rate = float(self.query('acq:srate?'))
if float(current_sample_rate) == float(kwargs.get('sample_rate')):
self.debug('Sample Rate already correct')
return current_sample_rate
self.write(f'ACQ:SRAT:ANAL {requested_sample_rate}')
return float(self.query('acq:srate?'))
class DSOX1102G(Instrument):
"""
This class describes a dsox1102g keysight oscilloscope.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.__inst_init__(model='DSO-X 1102G', **kwargs)
def set_bode_enable(self, **kwargs):
enable = kwargs.get('enable', 0)
self.write(f'FRAN:ENAB {enable}')
enable = self.query(f'FRAN:ENAB?')
self.debug(enable)
def set_bode_freq_start(self, **kwargs):
startfreq = kwargs.get('startfreq', 10)
units = kwargs.get('units', 'Hz')
self.write(f'FRAN:FREQ:STAR {startfreq}{units}')
startfreq = self.query(f'FRAN:FREQ:STAR?')
self.debug(startfreq)
def set_bode_freq_stop(self, **kwargs):
stopfreq = kwargs.get('stopfreq', 1000)
units = kwargs.get('units', 'kHz')
self.write(f'FRAN:FREQ:STOP {stopfreq}{units}')
stopfreq = self.query(f'FRAN:FREQ:STOP?')
self.debug(stopfreq)
def set_bode_sourcein(self, **kwargs):
inchan = kwargs.get('inchan', 1)
self.write(f'FRAN:SOUR:INP {inchan}')
inchan = self.query(f'FRAN:SOUR:INP?')
self.debug(inchan)
def set_bode_sourceout(self, **kwargs):
outchan = kwargs.get('outchan', 2)
self.write(f'FRAN:SOUR:OUTP {outchan}')
outchan = self.query(f'FRAN:SOUR:OUTP?')
self.debug(outchan)
def set_bode_genload(self, **kwargs):
genload = kwargs.get('genload', 'ONEM')
self.write(f'FRAN:WGEN:LOAD {genload}')
genload = self.query(f'FRAN:WGEN:LOAD?')
self.debug(genload)
def set_bode_genvolt(self, **kwargs):
genvolt = kwargs.get('genvolt', 1)
self.write(f'FRAN:WGEN:VOLT {genvolt}')
genvolt = self.query(f'FRAN:WGEN:VOLT?')
self.debug(genvolt)
def set_bode_run(self):
self.write(f'FRAN:RUN')
def get_stdevtstatusreg(self): #bit 0 will go HIGH when FRAN process is complete
stdevtstatusreg = self.query(f'*ESR?')
self.debug(stdevtstatusreg)
return stdevtstatusreg
def get_bode_status(self):
status = self.query(f'FRAN?')
self.debug(f'status')
self.debug(status)
def get_bode_data(self):
#self.debug(f'Getting bode data')
#wavfor = self.write(f'SAVE:WAV:FORM?')
#print(wavfor)
#self.debug(wavfor)
data = self.query(
f'FRAN:DATA?',
#is_ascii=True,
is_binary=True,
datatype='s',
#is_big_endian = False
#container = numpy.array
)
# data = self.query_binary_values(
# f'FRAN:DATA?',
# datatype = 'd',
# is_big_endian = True,
# )
return data
def set_vertical_scale(self, channel: int = 1, **kwargs):
scale = kwargs.get('scale', 1.0)
units = kwargs.get('units', 'V')
self.write(f'CHAN{channel}:SCAL {scale}{units}')
scale = self.query(f'CHAN{channel}:SCAL?')
self.debug(scale)
def set_horizontal_scale(self, **kwargs):
timescale = kwargs.get('timescale', 0.001)
self.write(f'TIM:SCAL {timescale}')
timescale = self.query(f'TIM:SCAL?')
self.debug(timescale)
def set_waveform_byte_order(self, **kwargs):
byte_order = kwargs.get('byte_order', 'MSBFirst')
self.write(f'WAV:BYT {byte_order}')
byte_order = self.query(f'WAV:BYT?')
self.debug(byte_order)
def set_waveform_format(self, **kwargs):
wfformat = kwargs.get('wfformat', 'WORD')
self.write(f'WAV:FORM {wfformat}')
wfformat = self.query(f'WAV:FORM?')
self.debug(wfformat)
def set_waveform_source(self, **kwargs):
wfsource = kwargs.get('wfsource', 'CHAN1')
self.write(f'WAV:SOUR {wfsource}')
wfsource = self.query(f'WAV:SOUR?')
self.debug(wfsource)
def set_waveform_points_mode(self, **kwargs):
pointsmode = kwargs.get('pointsmode', 'NORM')
self.write(f'WAV:POIN:MODE {pointsmode}')
pointsmode = self.query(f'WAV:POIN:MODE?')
self.debug(pointsmode)
def get_waveform_count(self):
wfcount = self.query(f'WAV:COUN?')
self.debug(wfcount)
return wfcount
def get_waveform_format(self):
wfformat = self.query(f'WAV:FORM?')
self.debug(wfformat)
return wfformat
def get_xinc(self):
xinc = self.query(f'WAV:XINC?')
self.debug(xinc)
return float (xinc)
def get_xor(self):
xor = self.query(f'WAV:XOR?')
self.debug(xor)
return float (xor)
def get_yinc(self):
yinc = self.query(f'WAV:YINC?')
self.debug(yinc)
return float (yinc)
def get_yor(self):
yor = self.query(f'WAV:YOR?')
self.debug(yor)
return float (yor)
def get_yref(self):
yref = self.query(f'WAV:YREF?')
self.debug(yref)
return float (yref)
def do_query_ieee_block(self, cmd:str, **kwargs):
result = self.device.query_binary_values(cmd, datatype='s', is_big_endian=True)
self.debug(result)
return result
def measure_vmax_FFT(self):
self.write(f'MEAS:VMAX FFT')
vmax = self.query(f'MEAS:VMAX? FFT')
self.debug(vmax)
return (vmax)
# def get_measurement_vmax_FFT(self, **kwargs):
# #source = kwargs.get('SOURCE', 'FFT')
# vmax = self.query(f'MEAS:VMAX')
# self.debug(vmax)
# return (vmax)
def measure_vpp(self, **kwargs):
chan = kwargs.get('chan', 1)
self.write(f'MEAS:VPP CHAN {chan}')
vpp = self.query(f'MEAS:VPP? CHAN{chan}')
self.debug(vpp)
return (vpp)
def measure_vrms(self, **kwargs):
chan = kwargs.get('CHAN', 1)
self.write(f'MEAS:VRMS CHAN {chan}')
vrms = self.query(f'MEAS:VRMS? CHAN{chan}')
self.debug(vrms)
return (vrms)
def measure_freq(self, **kwargs):
chan = kwargs.get('chan', 1)
self.write(f'MEAS:FREQ CHAN {chan}')
freq = self.query(f'MEAS:FREQ? CHAN{chan}')
self.debug(freq)
return (freq)
def set_wgen_freq(self, **kwargs):
frequency = kwargs.get('FREQ', 1000)
self.write(f'WGEN:FREQ {frequency}')
frequency = self.query(f'WGEN:FREQ?')
self.debug(frequency)
def set_wgen_volt(self, **kwargs):
voltage = kwargs.get('VOLT', 1)
self.write(f'WGEN:VOLT {voltage}')
voltage = self.query(f'WGEN:VOLT?')
self.debug(voltage)
def set_wgen_offs(self, **kwargs):
offset = kwargs.get('OFFS', 1)
self.write(f'WGEN:VOLT:OFFS {offset}')
offset = self.query(f'WGEN:VOLT:OFFS?')
self.debug(offset)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment