https://github.com/pyvisa/pyvisa-py is a backend for https://github.com/pyvisa/pyvisa/ How do I add install_handler() to pyvisa-py so that I don't need to install NI VISA backend?
Implementing install_handler() in pyvisa-py to avoid the NI-VISA dependency is a significant but achievable task. It requires modifying the pyvisa-py source code to implement the event handling logic that the proprietary drivers usually handle for you.
To do this, you essentially need to build an event registry and a listening mechanism (like a background thread) that monitors the hardware interface for interrupts or service requests.
Here is a step-by-step guide on how to architect and implement this in the pyvisa-py codebase.
In pyvisa, the frontend (what you use in your script) calls the backend (the library implementing Visalib).
NI-VISA: The backend calls a C DLL which manages threads and hardware interrupts.
PyVISA-py: You are the backend. You must implement the logic to store the callback and the logic to trigger it when the hardware speaks.
You need to modify the main library class in pyvisa-py. This is typically found in pyvisa_py/highlevel.py. You will be looking for the class that inherits from VisaLibraryBase (usually named PyVisaLibrary).
You need to add or modify three key methods:
install_handler (Registers the callback)
uninstall_handler (Removes the callback)
enable_event (Starts the mechanism that listens for the event)
First, your library instance needs a place to store the handlers. In the __init__ method of PyVisaLibrary, initialize a dictionary structure.
# In pyvisa_py/highlevel.py -> PyVisaLibrary.__init__
def __init__(self, argument=None):
super(PyVisaLibrary, self).__init__(argument)
# Dictionary to map sessions to their event handlers
# Structure: { session_id: { event_type: [ (handler, user_handle), ... ] } }
self.handlers = {}This function does not talk to the hardware; it simply registers the Python function you want to call.
def install_handler(self, session, event_type, handler, user_handle=None):
"""Stores the handler for a specific session and event type."""
try:
# Ensure the session entry exists
if session not in self.handlers:
self.handlers[session] = {}
# Ensure the event_type entry exists
if event_type not in self.handlers[session]:
self.handlers[session][event_type] = []
# Store the handler and the user_handle
self.handlers[session][event_type].append((handler, user_handle))
return constants.VI_SUCCESS
except Exception as e:
return constants.VI_ERROR_SYSTEM_ERRORThis is where pyvisa-py struggles and where you must write specific logic. When a user calls enable_event, they are asking you to start listening.
Depending on your interface (USB, TCPIP, GPIB), "listening" means different things:
TCPIP (VXI-11): You must listen on the interrupt channel.
USBTMC: You must read from the Interrupt IN endpoint.
You likely need to spawn a thread that monitors the specific interface session.
import threading
def enable_event(self, session, event_type, mechanism, context=None):
"""Enables notification for an event."""
# 1. Retrieve the actual session object (e.g., USBSession, TCPIPSession)
# In pyvisa-py, sessions are usually stored in self.sessions
obj = self.sessions[session]
# 2. Start a listener thread for this specific event/session
if event_type == constants.VI_EVENT_SERVICE_REQ:
# This is a hypothetical method you must add to the specific session classes
# (e.g., in pyvisa_py/usb.py or pyvisa_py/tcpip.py)
if hasattr(obj, 'start_srq_listener'):
obj.start_srq_listener(callback=self._dispatch_event)
else:
return constants.VI_ERROR_NSUP_OPER
return constants.VI_SUCCESS
def _dispatch_event(self, session_id, event_type):
"""Internal method called by the listener thread when hardware triggers."""
if session_id in self.handlers and event_type in self.handlers[session_id]:
for handler, user_handle in self.handlers[session_id][event_type]:
# Call the user's registered function
# Note: VISA handlers require specific arguments: (session, event_type, event_context, user_handle)
# You may need to create a dummy event_context object here
handler(session_id, event_type, None, user_handle)You will likely need to modify the specific interface files (e.g., pyvisa_py/usb.py).
If you are using USBTMC, the device has an "Interrupt IN" endpoint. You need a loop running in a thread that constantly tries to read from this endpoint.
# Pseudo-code for pyvisa_py/usb.py modification
def start_srq_listener(self, callback):
self.listening = True
t = threading.Thread(target=self._listen_loop, args=(callback,))
t.daemon = True
t.start()
def _listen_loop(self, callback):
while self.listening:
try:
# PyUSB call to read the interrupt endpoint
# endpoint address usually found in descriptors
data = self.dev.read(self.interrupt_in_endpoint, 1, timeout=100)
# If we get data, check if it is an SRQ (Service Request)
if self._is_srq(data):
# Trigger the callback in the main library
callback(self.session_id, constants.VI_EVENT_SERVICE_REQ)
except usb.core.USBError:
# Handle timeouts (expected if no event occurs)
passhighlevel.py: Add install_handler, uninstall_handler, and the registry dict.
highlevel.py: Update enable_event to trigger a background listener.
usb.py / tcpip.py: Add the threading logic to physically read the hardware interrupt line or packet.
By implementing these, you bridge the gap between the Python-only backend and the hardware events, removing the need for NI-VISA.
I want this to work with TCPIP, not USB
This is significantly more complex than USB because TCP/IP instruments (specifically those using the TCPIP::...::INSTR resource string) generally use the VXI-11 protocol.
Unlike USB, where you poll a specific endpoint, VXI-11 interrupts require a Reverse Channel. You (the host) must become a server. You open a port, tell the instrument "If you need me, call me at this IP and Port," and then wait for the instrument to connect to you.
Here is how to implement the architecture for TCPIP in pyvisa-py.
You need to modify pyvisa_py/tcpip.py. The standard VXI-11 procedure for events is:
Host: Creates a temporary TCP Server socket (on a random available port).
Host: Sends the create_intr_chan RPC command to the Instrument, passing the Host's IP and the new temporary Port.
Host: Spawns a thread to accept() connections on that port.
Instrument: When an SRQ occurs, connects to that port and sends a data packet.
You need to find the TCPIPSession class (or the class handling INSTR resources). You will add the logic to start the "Reverse Channel" listener.
Add this method to the session class. It sets up a server socket and runs it in a background thread.
import socket
import threading
import struct
# In pyvisa_py/tcpip.py -> TCPIPSession (or similar class)
def start_tcp_listener(self, callback):
"""
1. Opens a server socket on a random port.
2. Tells the device where to connect (RPC 'create_intr_chan').
3. Starts a thread to listen for the incoming connection.
"""
# --- 1. Create a server socket on the host ---
# AF_INET = IPv4, SOCK_STREAM = TCP
self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_sock.bind(('', 0)) # Bind to any available port
self.server_sock.listen(1)
# Get the port we were assigned
host_ip, host_port = self.server_sock.getsockname()
# --- 2. Send 'create_intr_chan' RPC to the device ---
# Note: This is the tricky part. You need to construct the VXI-11 RPC packet.
# If pyvisa-py has an internal RPC client, use it.
# Otherwise, you are sending raw bytes.
# This is a conceptual example of the RPC call:
error_code = self._send_create_intr_chan_rpc(host_ip, host_port)
if error_code != 0:
raise Exception("Device refused to create interrupt channel")
# --- 3. Start the Listener Thread ---
self.listening = True
t = threading.Thread(target=self._tcp_listen_loop, args=(callback,))
t.daemon = True
t.start()
def _tcp_listen_loop(self, callback):
"""Waits for the instrument to connect back to us."""
while self.listening:
try:
# Wait for the instrument to connect (Blocking)
conn, addr = self.server_sock.accept()
with conn:
# Read the header (RPC header is usually 4 bytes + X)
# We expect a 'device_intr_srq' packet
data = conn.recv(1024)
if self._validate_srq_packet(data):
# Trigger the HighLevel callback
# We pass the Session ID and the Event Type (Service Request)
callback(self.session.session_id, constants.VI_EVENT_SERVICE_REQ)
except socket.error:
if not self.listening:
breakThe hardest part of this is _send_create_intr_chan_rpc. VXI-11 is based on ONC RPC (Sun RPC). You cannot just send text like ENABLE SRQ. You must send binary structs.
If pyvisa-py does not strictly implement the RPC layer (it often relies on basic socket streams), you might need to manually construct the packet.
Here is a simplified look at what that helper method logic looks like (conceptually):
def _send_create_intr_chan_rpc(self, host_ip, port):
# VXI-11 RPC 'create_intr_chan' procedure ID is 12
# This requires XDR encoding.
# Simplified structure: [XID][MsgType][RPCVersion][Program][Version][Proc][Creds][Verf][Args]
# You effectively need to tell the device:
# "Call program 0x0607AF (VXI-11 Intr) at Port <port>"
# If you don't have an RPC builder, this is very difficult to hardcode
# because of XIDs (Transaction IDs).
passGo back to pyvisa_py/highlevel.py and ensure the enable_event method routes to your new TCP logic.
# In pyvisa_py/highlevel.py
def enable_event(self, session, event_type, mechanism, context=None):
obj = self.sessions[session]
if event_type == constants.VI_EVENT_SERVICE_REQ:
# Check if this is a TCP session
if hasattr(obj, 'start_tcp_listener'):
# Pass the internal dispatch method as the callback
obj.start_tcp_listener(callback=self._dispatch_event)
return constants.VI_SUCCESS
return constants.VI_ERROR_NSUP_OPERImportant Warning: INSTR vs SOCKET
You must verify your resource string.
TCPIP::192.168.1.5::INSTR (VXI-11):
This requires the RPC implementation described above.
The instrument expects to connect back to you.
If you don't implement the RPC create_intr_chan, the instrument will never send the event, even if you are listening on a port.
TCPIP::192.168.1.5::5025::SOCKET (Raw Socket):
This uses raw SCPI over a port (usually 5025).
There is no standard "interrupt" mechanism for Raw Sockets.
Some devices send "Out of Band" (OOB) data, but most simply do not support SRQ over raw sockets.
If you are using SOCKET, install_handler is effectively impossible unless your specific device has a proprietary way of sending a specialized packet stream.
Next Step
To get this working, you likely need a lightweight RPC packer to send that create_intr_chan command.
Would you like me to provide a minimal pure-Python VXI-11 RPC packet structure so you can implement _send_create_intr_chan_rpc without adding large external dependencies?
Yes.
My resource command is open_resource('TCPIP::192.168.1.157') Is this INSTR or SOCKET?
It is INSTR (VXI-11).
When you omit the suffix in PyVISA (e.g., TCPIP::192.168.1.157), it defaults to TCPIP::192.168.1.157::INSTR.
This is good news. It means your device definitely supports the standard RPC interrupt mechanism. If it were SOCKET, you would be out of luck.
The Minimal Pure-Python VXI-11 RPC Implementation
To make install_handler work without external C-bindings, you need to manually construct the binary packet that tells the instrument: "I am ready for interrupts. Call me back at this IP and Port."
You can add this code directly into pyvisa_py/tcpip.py.
VXI-11 uses XDR (External Data Representation). You need a few helper functions to pack data correctly (specifically strings, which must be padded to 4 bytes).
import struct
import random
# VXI-11 Constants
DEVICE_CORE_PROG = 0x0607B0
DEVICE_CORE_VERS = 1
CREATE_INTR_CHAN_PROC = 12
DEVICE_INTR_PROG = 0x0607AF # The ID the device uses to call us back
DEVICE_INTR_VERS = 1
IPPROTO_TCP = 6
def pack_uint(n):
"""Pack a 32-bit unsigned integer (Big Endian)."""
return struct.pack('>I', n)
def pack_string(s):
"""Pack a variable length string (XDR format: Length + Data + Padding)."""
if isinstance(s, str):
s = s.encode('ascii')
length = len(s)
padding = (4 - (length % 4)) % 4
return struct.pack('>I', length) + s + (b'\x00' * padding)This function constructs the raw bytes required to send the create_intr_chan command.
def build_create_intr_chan_packet(host_ip, host_port, xid=None):
"""
Constructs the VXI-11 'create_intr_chan' RPC packet.
Args:
host_ip (str): The IP address of YOUR computer (where the device should connect).
host_port (int): The port YOUR computer is listening on.
"""
if xid is None:
xid = random.randint(1, 0xFFFFFFFF)
# --- 1. RPC Header ---
# Field: [XID] [MsgType:Call=0] [RPCVers:2] [Prog:Core] [Vers:1] [Proc:CreateIntr]
header = (
pack_uint(xid) + # Transaction ID
pack_uint(0) + # Message Type (0 = Call)
pack_uint(2) + # RPC Version (2)
pack_uint(DEVICE_CORE_PROG) + # Program (Device Core)
pack_uint(DEVICE_CORE_VERS) + # Program Version
pack_uint(CREATE_INTR_CHAN_PROC) # Procedure (12 = create_intr_chan)
)
# --- 2. Authentication (None) ---
# Field: [Cred Flavor:0] [Cred Len:0] [Verf Flavor:0] [Verf Len:0]
auth = (
pack_uint(0) + pack_uint(0) + # Credentials (Auth_None)
pack_uint(0) + pack_uint(0) # Verifier (Auth_None)
)
# --- 3. Arguments ---
# The arguments defined in VXI-11 spec for create_intr_chan:
# (host_addr, host_port, prog_num, prog_vers, prog_prot)
args = (
pack_string(host_ip) + # arg1: Host Address (String)
pack_uint(host_port) + # arg2: Host Port (Uint)
pack_uint(DEVICE_INTR_PROG) + # arg3: Program Number (0x0607AF)
pack_uint(DEVICE_INTR_VERS) + # arg4: Program Version (1)
pack_uint(IPPROTO_TCP) # arg5: Protocol (6 = TCP)
)
# --- 4. Final Framing (Record Marking) ---
# ONC RPC over TCP requires a 4-byte fragment header.
# The top bit (0x80000000) indicates this is the "Last Fragment".
payload = header + auth + args
fragment_header = 0x80000000 | len(payload)
return struct.pack('>I', fragment_header) + payloadNow, integrate this into the listener logic I gave you in the previous step.
# In pyvisa_py/tcpip.py
def _send_create_intr_chan_rpc(self, host_ip, host_port):
"""Sends the raw RPC packet to the device."""
# 1. Build the packet
packet = build_create_intr_chan_packet(host_ip, host_port)
# 2. Send it over the EXISTING connection to the device
# self.interface is usually the socket object in pyvisa-py TCPIPSession
# Note: You might need to check how pyvisa-py stores the socket.
# It is often in self.visalib.sessions[self.session].interface
# Assuming 'self.interface' is the active socket to the instrument:
self.interface.sendall(packet)
# 3. Read the RPC Reply (Important!)
# We need to ensure the device accepted it.
# Read the 4-byte fragment header first
header_data = self.interface.recv(4)
frag_len = struct.unpack('>I', header_data)[0] & 0x7FFFFFFF
# Read the rest of the reply
reply_data = self.interface.recv(frag_len)
# 4. Parse Reply (Simplified)
# Skip XID(4), MsgType(4), ReplyState(4), Verifier(8), AcceptStatus(4)
# We just want to check if AcceptStatus is 0 (SUCCESS)
# This is at offset 24 (4+4+4+8)
accept_status = struct.unpack('>I', reply_data[24:28])[0]
if accept_status != 0:
return -1 # RPC Error
# The actual return value of create_intr_chan is an "Error" object (integer)
# It is usually the last 4 bytes of the packet
error_code = struct.unpack('>I', reply_data[-4:])[0]
return error_codeHow this flow works conceptually
You: Start a listener on your PC at port 54321.
- You: Build that binary packet above. It effectively says: "Hey Instrument, call program
0x0607AFat192.168.1.5, port54321via TCP if you have an SRQ." - Instrument: Decodes the binary, says "Okay", and sends back a success packet.
- Instrument: (Later, when an event happens) connects to your PC at port 54321.
- You: Accept the connection in your thread, read the data, and trigger the
install_handlercallback.
Immediate Next Step
Do you want the logic for parsing the incoming callback packet (step 5)? When the instrument connects back to you, it will send a similar binary RPC packet (calling the device_intr_srq procedure) that you need to decode to confirm it is actually an SRQ.
Yes
Yes, this is the final piece of the puzzle.
When the instrument connects back to your computer to trigger the interrupt, it behaves like a client. It sends an RPC "Call" packet. You need to decode this packet to ensure it is actually an SRQ (Service Request) and not some other network noise.
Crucially, VXI-11 is a Request-Response protocol. If you receive the SRQ and do not send a "Reply" packet back to the instrument, the instrument may think the interrupt failed and might retry or throw a system error.
You need to add this method to your TCPIPSession class (or wherever you placed the listener loop). This function inspects the binary data sent by the instrument.
import struct
# Constants for Parsing
DEVICE_INTR_PROG = 0x0607AF
DEVICE_INTR_SRQ_PROC = 1
RPC_MSG_CALL = 0
def _validate_and_handle_srq(self, conn):
"""
Reads the incoming RPC packet, validates it is an SRQ,
extracts the Status Byte, and sends a Success Reply.
Returns:
status_byte (int) if valid SRQ, None otherwise.
"""
# 1. Read the Fragment Header (4 bytes)
header_data = conn.recv(4)
if not header_data:
return None
# The last 31 bits are the length
frag_len = struct.unpack('>I', header_data)[0] & 0x7FFFFFFF
# 2. Read the RPC Packet
data = conn.recv(frag_len)
if len(data) < 24:
return None # Too short to be a valid RPC Call
# 3. Unpack Key Fields
# Struct: [XID:4] [MsgType:4] [RPCVer:4] [Prog:4] [ProgVer:4] [Proc:4]
xid, msg_type, rpc_vers, prog, prog_vers, proc = struct.unpack('>IIIIII', data[:24])
# 4. Validate Context
if (msg_type == RPC_MSG_CALL and
prog == DEVICE_INTR_PROG and
proc == DEVICE_INTR_SRQ_PROC):
# This IS a valid SRQ!
# 5. Extract the Status Byte (STB)
# The arguments start after the Creds and Verifier.
# We need to skip Creds(8 bytes) + Verifier(8 bytes) = 16 bytes.
# Standard RPC header is 24 bytes. Total offset = 40 bytes.
# The argument is the Status Byte (passed as an int).
try:
stb = struct.unpack('>I', data[40:44])[0]
except:
stb = 0 # Default if parsing fails
# 6. SEND THE REPLY (Mandatory!)
# We must tell the instrument we received the message.
self._send_srq_reply(conn, xid)
return stb
return NoneYou must send an RPC "Reply" packet with the same XID (Transaction ID) that the instrument sent you. This closes the loop.
def _send_srq_reply(self, conn, xid):
"""Sends a successful RPC reply back to the instrument."""
# Structure: [XID] [MsgType:Reply=1] [ReplyState:Accepted=0]
# [Verf:None] [AcceptStatus:Success=0]
reply_payload = (
struct.pack('>I', xid) + # Match the XID from the Request
struct.pack('>I', 1) + # MsgType: Reply
struct.pack('>I', 0) + # ReplyState: Accepted
struct.pack('>I', 0) + # Verifier Flavor (None)
struct.pack('>I', 0) + # Verifier Length (0)
struct.pack('>I', 0) # AcceptStatus: Success
)
# Add Fragment Header (Last Fragment bit set)
frag_header = 0x80000000 | len(reply_payload)
full_packet = struct.pack('>I', frag_header) + reply_payload
conn.sendall(full_packet)Finally, update the _tcp_listen_loop method I gave you in step 2 to use these new validators.
def _tcp_listen_loop(self, callback):
"""Waits for the instrument to connect back to us."""
while self.listening:
try:
conn, addr = self.server_sock.accept()
with conn:
# Validate, Reply, and Get Status Byte
stb = self._validate_and_handle_srq(conn)
if stb is not None:
# Trigger the HighLevel callback
# We pass the Session ID, Event Type, and the STB
# Note: PyVISA handlers usually expect (session, event_type, context, user_handle)
# We can pass the STB inside a context object or wrap it if needed.
callback(self.session.session_id, constants.VI_EVENT_SERVICE_REQ, status_byte=stb)
except socket.error:
if not self.listening:
break
- Script Start: You call enable_event.
- PyVISA-Py: Creates a server socket (e.g., port 4000).
- PyVISA-Py: Sends create_intr_chan RPC to instrument ("Call me at port 4000").
- PyVISA-Py: Enters background thread _tcp_listen_loop.
- Hardware: An event happens (e.g., measurement done).
- Instrument: Connects to your PC on port 4000 and sends the device_intr_srq RPC packet.
- PyVISA-Py: * Accepts connection.
- _validate_and_handle_srq parses the packet.
- Sends "Success" reply back to instrument.
- Calls callback(...).
- User Code: Your Python function defined in install_handler executes.
You now have a complete, pure-Python implementation of VXI-11 Interrupts without needing the heavy NI-VISA drivers.
Would you like me to summarize the exact list of files and method signatures you need to touch to paste this into your project?
Yes
Here is the summary of the exact files and method signatures you need to modify in the pyvisa-py source code.
Class: PyVisaLibrary
You need to add the event registry and the logic to delegate enable_event to the specific session implementations.
| Method | Action | Implementation Summary |
|---|---|---|
__init__ |
Modify | Initialize self.handlers = {} (nested dict structure). |
install_handler |
Add | def install_handler(self, session, event_type, handler, user_handle=None): Store handler in self.handlers[session][event_type]." |
uninstall_handler |
Add | def uninstall_handler(self, session, event_type, handler, user_handle=None): Remove handler from self.handlers. |
enable_event |
Modify | def enable_event(self, session, event_type, mechanism, context=None): Retrieve session object. If TCPIPSession, call obj.start_tcp_listener(self._dispatch_event). |
_dispatch_event |
Add | def _dispatch_event(self, session_id, event_type, status_byte=None):Look up session in self.handlers and execute the user's callback functions. |
Class: TCPIPSession
You need to add the VXI-11 RPC packet builders, the server socket listener, and the parsing logic.
Imports to add: struct, socket, threading, random
DEVICE_CORE_PROG, DEVICE_INTR_PROG, etc. (RPC Constants)
pack_uint(n), pack_string(s) (XDR packers)
build_create_intr_chan_packet(host_ip, host_port, xid) (The RPC builder)
def start_tcp_listener(self, callback):
"""
1. Bind server socket (port 0).
2. Call self._send_create_intr_chan_rpc().
3. Start thread target=self._tcp_listen_loop.
"""
def _send_create_intr_chan_rpc(self, host_ip, host_port):
"""
1. Construct packet using build_create_intr_chan_packet.
2. Send via self.interface.sendall().
3. Recv and parse confirmation.
"""
def _tcp_listen_loop(self, callback):
"""
1. While self.listening:
2. conn, addr = self.server_sock.accept()
3. stb = self._validate_and_handle_srq(conn)
4. If stb is valid, call callback(session_id, event_type).
"""
def _validate_and_handle_srq(self, conn):
"""
1. Recv header & payload.
2. Parse XDR (check it is 'device_intr_srq').
3. Extract Status Byte.
4. Call self._send_srq_reply(conn, xid).
5. Return status_byte.
"""
def _send_srq_reply(self, conn, xid):
"""
1. Build 'RPC Reply' success packet (matching XID).
2. conn.sendall(packet).
"""Once you have pasted these in, your user script should look like this and work without NI-VISA:
import pyvisa
def my_handler(session, event_type, context, user_handle):
print("Interrupt Received! SRQ Triggered.")
# You might need to query the device here to clear the bit
# inst.query("*ESR?")
rm = pyvisa.ResourceManager('@py') # Forces pyvisa-py backend
inst = rm.open_resource('TCPIP::192.168.1.157::INSTR')
# 1. Register the python function
inst.install_handler(pyvisa.constants.VI_EVENT_SERVICE_REQ, my_handler)
# 2. Tell the hardware to start monitoring (Starts your thread)
inst.enable_event(pyvisa.constants.VI_EVENT_SERVICE_REQ, pyvisa.constants.VI_HNDLR)
# 3. Enable SRQ on the device side (Standard SCPI)
inst.write("*SRE 16") # Enable Message Available bit (example)
inst.write("*CLS")
print("Waiting for interrupt...")
input("Press Enter to exit") # Keep script alive to listen