-
-
Save tiagocoutinho/fcfff62e6872553b8942a4cd953af700 to your computer and use it in GitHub Desktop.
| """ | |
| tsb.py -- A telnet <-> serial port bridge | |
| Copyright (C) 2005 Eli Fulkerson | |
| This program is free software; you can redistribute it and/or | |
| modify it under the terms of the GNU General Public License | |
| as published by the Free Software Foundation; either version 2 | |
| of the License, or (at your option) any later version. | |
| This program is distributed in the hope that it will be useful, | |
| but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| GNU General Public License for more details. | |
| You should have received a copy of the GNU General Public License | |
| along with this program; if not, write to the Free Software | |
| Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |
| ---------------------------------------------------------------------- | |
| Other license terms may be negotiable. Contact the author if you would | |
| like copy that is licensed differently. | |
| Additionally, this script requires the use of "pyserial", which is licensed | |
| separately by its author. This library can be found at http://pyserial.sourceforge.net/ | |
| Contact information (as well as this script) lives at http://www.elifulkerson.com | |
| Original version available at: | |
| https://www.elifulkerson.com/projects/telnet-serial-bridge.php | |
| Usage: tsb [OPTIONS] | |
| Creates a TCP<->Serial port bridge, which allows a telnet client to | |
| cross over a Serial port. | |
| Serial Options: | |
| -p, --port : Specify the desired serial port. | |
| (0 for COM1, 1 for COM2 etc) | |
| -r, --baudrate : Specify baudrate | |
| -s, --bytesize : Specify bytesize | |
| -y, --parity : Specify parity options | |
| -b, --stopbits : Specify number of stopbits | |
| -t, --timeout : Specify timeout | |
| -f, --flow : Specify flow-control options | |
| TCP Options: | |
| -l, --listen : Specify a TCP port to listen on | |
| General Options: | |
| -h, --help : Display this help messsage | |
| """ | |
| # standard libraries | |
| from socket import * | |
| from select import * | |
| from string import * | |
| import sys | |
| from getopt import getopt, GetoptError | |
| # nonstandard library | |
| import serial | |
| """ | |
| print usage, then exit | |
| """ | |
| def usage(): | |
| usagestring = """Usage: tsb [OPTIONS] | |
| Creates a TCP<->Serial port bridge, which allows a telnet client to | |
| cross over a Serial port. | |
| Serial Options: | |
| -p, --port : Specify the desired serial port. | |
| (0 for COM1, 1 for COM2 etc) | |
| -r, --baudrate : Specify baudrate | |
| -s, --bytesize : Specify bytesize | |
| -y, --parity : Specify parity options | |
| -b, --stopbits : Specify number of stopbits | |
| -t, --timeout : Specify timeout | |
| -f, --flow : Specify flow-control options | |
| TCP Options: | |
| -l, --listen : Specify a TCP port to listen on | |
| General Options: | |
| -h, --help : Display this help messsage | |
| """ | |
| print usagestring | |
| sys.exit(0) | |
| """ | |
| Function cleans up telnet's output for input into the serial port. | |
| Telnet is fancier than serial, so we have to strip some things out. | |
| """ | |
| def cleanup_for_serial(text): | |
| """ | |
| chr(255) is the "we are negotiating" leading bit. If it is the first bit in | |
| a packet, we do not want to send it on to the serial port | |
| """ | |
| if ord(text[:1]) == 255: | |
| return "" | |
| """ | |
| For some reason, windows likes to send "cr/lf" when you send a "cr". | |
| Strip that so we don't get a double prompt.\ | |
| """ | |
| text = replace(text, chr(13) + chr(10), chr(13)) | |
| return text | |
| class Connection: | |
| "A connection is a class that forwards requests between TCP and Serial" | |
| def __init__(self, socket, com): | |
| self.socket = socket | |
| self.com = com | |
| def fileno(self): | |
| "Required, look it up" | |
| return self.socket.fileno() | |
| def init_tcp(self): | |
| "Set up the TCP connection and do telnet negotiation" | |
| "telnet negotiation: we don't want linemode" | |
| " COMMAND, DONT, linemode" | |
| data = chr(255) + chr(254) + chr(34) | |
| self.socket.send(data) | |
| "telnet negotation: we don't want local echo" | |
| " COMMAND, DONT, echo" | |
| data = chr(255) + chr(254) + chr(1) | |
| self.socket.send(data) | |
| "send the header" | |
| self.socket.send("************************************************\r\n") | |
| self.socket.send("Telnet <--> Serial Bridge by Eli Fulkerson\r\n") | |
| self.socket.send("http://www.elifulkerson.com for updates \r\n") | |
| self.socket.send("\r\n") | |
| self.socket.send("This program uses non-standard python libraries:\r\n") | |
| self.socket.send(" - pyserial by Chris Liechti\r\n") | |
| self.socket.send(" - pywin32 by Mark Hammond (et al)\r\n") | |
| self.socket.send("\r\n") | |
| self.socket.send("************************************************\r\n") | |
| self.socket.send("\r\n") | |
| self.socket.send("You are now connected to %s.\r\n" % self.com.portstr) | |
| def recv_tcp(self): | |
| "Receive some data from the telnet client" | |
| data = self.socket.recv(1024) | |
| return data | |
| def send_tcp(self, data): | |
| "Send some data out to the telnet client" | |
| self.socket.send(data) | |
| def recv_serial(self): | |
| "Recieve some data from the serial port" | |
| data = self.com.read(self.com.inWaiting() ) | |
| return data | |
| def send_serial(self,data): | |
| "Send some data out to the serial port" | |
| data = cleanup_for_serial(data) | |
| try: | |
| if ord(data) == 3: | |
| self.com.sendbreak() | |
| return | |
| except: | |
| pass | |
| self.com.write(data) | |
| class Handler: | |
| def __init__(self): | |
| global LISTEN | |
| global com | |
| self.clist = [ ] | |
| self.tcpconnected = False | |
| self.serialconnected = False | |
| self.start_new_listener() | |
| print "TCP to Serial bridge is up: telnet to localhost:%s to access %s." % (LISTEN, com.portstr) | |
| print "(Control-C to exit)" | |
| def start_new_listener(self): | |
| self.listener = socket(AF_INET, SOCK_STREAM) | |
| self.listener.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | |
| self.listener.bind(('', LISTEN)) | |
| self.listener.listen(32) | |
| def run(self): | |
| """ | |
| yes, this was originally going to be multi-user, I don't feel like changing it | |
| now though. We shall loop. | |
| """ | |
| for conn in self.clist[:]: | |
| if conn.com.isOpen(): | |
| "pull data from serial and send it to tcp if possible" | |
| data = conn.recv_serial() | |
| if not data: | |
| pass | |
| else: | |
| conn.send_tcp(data) | |
| ready = self.clist[:] | |
| if self.listener: | |
| ready.append(self.listener) | |
| ready = select(ready, [], [], 0.1)[0] | |
| for conn in ready: | |
| if conn is self.listener: | |
| socket, address = self.listener.accept() | |
| global com | |
| try: | |
| com.close() | |
| com.open() | |
| except serial.SerialException: | |
| print "Error opening serial port. Is it in use?" | |
| sys.exit(1) | |
| conn = Connection(socket, com) | |
| self.clist.append(conn) | |
| "set up our initial telnet environment" | |
| conn.init_tcp() | |
| "we don't need to listen anymore" | |
| self.listener = None | |
| else: | |
| "pull some data from tcp and send it to serial, if possible." | |
| data = conn.recv_tcp() | |
| if not data: | |
| print "TCP connection closed." | |
| self.clist.remove(conn) | |
| self.start_new_listener() | |
| else: | |
| conn.send_serial(data) | |
| def main(argv=None): | |
| "Pull in our arguments if we were not spoonfed some" | |
| if argv is None: | |
| argv = sys.argv | |
| "Parse our arguments" | |
| try: | |
| options, args = getopt(argv[1:], "p:r:s:y:b:t:f:l:h", ["port=", "baudrate=", "bytesize=", "parity=", "stopbits=", "timeout=", "flow=", "listen=", "help"]) | |
| except GetoptError: | |
| usage() | |
| return | |
| global LISTEN # int, the TCP port to listen on | |
| global com # the serial connection itself | |
| "first, loop through and open the right port" | |
| got_a_serial_port = False | |
| for o,a in options: | |
| if o in ("-p", "--port"): | |
| a = int(a) | |
| try: | |
| com = serial.Serial(a) | |
| #print "Serial port opened: %s" % (com.portstr) | |
| got_a_serial_port = True | |
| except: | |
| print "Couldn't open serial port: %s" % (a) | |
| print "This should be a numerical value. 0 == COM1, 1 == COM2, etc" | |
| sys.exit(1) | |
| if o in ("-h", "--help"): | |
| usage() | |
| return | |
| if not got_a_serial_port: | |
| # we don't have a port. Fine, use the default. | |
| try: | |
| com = serial.Serial(0) | |
| #print "Serial port opened: %s" % (com.portstr) | |
| except: | |
| print "Couldn't open serial port: %s" % (0) | |
| sys.exit(1) | |
| # sensible defaults | |
| com.baudrate = 9600 | |
| com.timeout = 0 | |
| com.bytesize = serial.EIGHTBITS | |
| com.parity = serial.PARITY_NONE | |
| com.stopbits = serial.STOPBITS_ONE | |
| com.xonxoff = 0 | |
| com.rtscts = 0 | |
| LISTEN = 23 | |
| # now loop through the other options | |
| for o,a in options: | |
| if o in ("-l", "--listen"): | |
| a = int(a) | |
| if a < 1 or a > 65535: | |
| print "Invalid listening (tcp) port. Valid ports are 1-65535" | |
| sys.exit(1) | |
| else: | |
| LISTEN = a | |
| if o in ("-r", "--baudrate"): | |
| a = int(a) | |
| if a in com.BAUDRATES: | |
| #print "Setting baudrate to %s." % (a) | |
| com.baudrate = a | |
| else: | |
| print "Valid baudrates are:", com.BAUDRATES | |
| sys.exit(1) | |
| if o in ("-s", "--bytesize"): | |
| a = int(a) | |
| if a in com.BYTESIZES: | |
| #print "Setting bytesize to %s." % (a) | |
| com.bytesize = a | |
| else: | |
| print "Valid bytesizes are:", com.BYTESIZES | |
| sys.exit(1) | |
| if o in ("-y", "--parity"): | |
| if a in com.PARITIES: | |
| #print "Setting parity to %s." % (a) | |
| com.parity = a | |
| else: | |
| print "Valid parities are:", com.PARITIES | |
| sys.exit(1) | |
| if o in ("-b", "--stopbits"): | |
| a = float(a) | |
| if a in com.STOPBITS: | |
| #print "Setting stopbits to %s." % (a) | |
| com.stopbits = a | |
| else: | |
| print "Valid stopbits are:", com.STOPBITS | |
| sys.exit(1) | |
| if o in ("-t", "--timeout"): | |
| a = int(a) | |
| if a < 0 or a > 100: | |
| print "Valid timesouts are 0-100." | |
| sys.exit(1) | |
| else: | |
| com.timeout = a | |
| if o in ("-f", "--flow"): | |
| FLOWS = ("xonxoff", "rtscts", "none") | |
| if a in FLOWS: | |
| #print "Setting flow control to %s" % (a) | |
| if a == "xonxoff": | |
| com.xonxoff = True | |
| if a == "rtscts": | |
| com.rtscts = True | |
| else: | |
| print "Valid flow-controls are:", FLOWS | |
| sys.exit(1) | |
| # print out com's statistics | |
| print "------------------------" | |
| print "Serial Port Information:" | |
| print "------------------------" | |
| print "port: %s" % com.portstr | |
| print "baudrate: %s" % com.baudrate | |
| print "bytesize: %s" % com.bytesize | |
| print "parity: %s" % com.parity | |
| print "stopbits: %s" % com.stopbits | |
| print "timeout: %s" % com.timeout | |
| print "xonxoff: %s" % com.xonxoff | |
| print "rtscts: %s" % com.rtscts | |
| print "" | |
| print "------------------------" | |
| print "TCP/IP Port Information:" | |
| print "------------------------" | |
| print "host: %s" % "localhost" | |
| print "port: %s" % LISTEN | |
| print "" | |
| # start up our run loop | |
| connections = Handler() | |
| while 1: | |
| connections.run() | |
| if __name__== "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print "Keyboard Interrupt" |
I also recommend having a look at the pyserial project.
The documentation provides examples on how to build:
I also develop a library called serialio which comes with a ready to use TCP bridge.
It is far less tested than the well known pyserial but if you want to have a try I would welcome the feedback :-)
What I wanted was quite straight forward so I used pyserial. I used this code:
import serial
import serial.tools.list_ports as port_list
import socket
host = '127.0.0.1'
port = 23200
port1 = 23300
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ports = list(port_list.comports())
for p in ports:
print (p)
ser = serial.Serial('com6', baudrate=9600)
print (ser.name)
while True:
data=(ser.readline())
print (data)
s.sendto(data, (host, port))
s.sendto(data, (host, port1))
ser.close()
I needed UDP but if you need TCP just change de socket definition.
The original code is here.
This is where you can also find instructions.
I strongly recommend fetching the code from the original author. I cannot find a good explanation on why I decided to make a gist out of the original code.
I also added some more information above on the usage but I strongly recommend checking the original version.
I hope it helps