Skip to content

Instantly share code, notes, and snippets.

@Felhamed
Created September 22, 2017 01:10
Show Gist options
  • Select an option

  • Save Felhamed/8211d62d54bcb10e5dd6f8ff530ee962 to your computer and use it in GitHub Desktop.

Select an option

Save Felhamed/8211d62d54bcb10e5dd6f8ff530ee962 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#---------------------------------------------------------------------------#
# import needed libraries
#---------------------------------------------------------------------------#
from twisted.internet import reactor, protocol
from pymodbus.constants import Defaults
#---------------------------------------------------------------------------#
# choose the requested modbus protocol
#---------------------------------------------------------------------------#
from pymodbus.client.async import ModbusClientProtocol
#---------------------------------------------------------------------------#
# configure the client logging
#---------------------------------------------------------------------------#
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
#---------------------------------------------------------------------------#
# helper method to test deferred callbacks
#---------------------------------------------------------------------------#
def dassert(deferred, callback):
def _assertor(value):
assert(value)
deferred.addCallback(lambda r: _assertor(callback(r)))
deferred.addErrback(lambda _: _assertor(False))
#---------------------------------------------------------------------------#
# specify slave to query
#---------------------------------------------------------------------------#
# The slave to query is specified in an optional parameter for each
# individual request. This can be done by specifying the `unit` parameter
# which defaults to `0x00`
#---------------------------------------------------------------------------#
def exampleRequests(client):
rr = client.read_coils(1, 1, unit=0x02)
#---------------------------------------------------------------------------#
# example requests
#---------------------------------------------------------------------------#
# simply call the methods that you would like to use. An example session
# is displayed below along with some assert checks. Note that unlike the
# synchronous version of the client, the asynchronous version returns
# deferreds which can be thought of as a handle to the callback to send
# the result of the operation. We are handling the result using the
# deferred assert helper(dassert).
#---------------------------------------------------------------------------#
def beginAsynchronousTest(client):
rr = client.read_holding_registers(199,1)
# test that we are not an error
dassert(rr, lambda r: r.registers[0] == 10) # test the expected value
arguments = {
'read_address': 1,
'read_count': 8,
}
#-----------------------------------------------------------------------#
# close the client at some time later
#-----------------------------------------------------------------------#
reactor.callLater(1, client.transport.loseConnection)
reactor.callLater(2, reactor.stop)
#---------------------------------------------------------------------------#
# extra requests
#---------------------------------------------------------------------------#
# If you are performing a request that is not available in the client
# mixin, you have to perform the request like this instead::
#
# from pymodbus.diag_message import ClearCountersRequest
# from pymodbus.diag_message import ClearCountersResponse
#
# request = ClearCountersRequest()
# response = client.execute(request)
# if isinstance(response, ClearCountersResponse):
# ... do something with the response
#
#---------------------------------------------------------------------------#
#---------------------------------------------------------------------------#
# choose the client you want
#---------------------------------------------------------------------------#
# make sure to start an implementation to hit against. For this
# you can use an existing device, the reference implementation in the tools
# directory, or start a pymodbus server.
#---------------------------------------------------------------------------#
defer = protocol.ClientCreator(reactor, ModbusClientProtocol
).connectTCP("10.10.10.253", 502)
defer.addCallback(beginAsynchronousTest)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment