Skip to content

Instantly share code, notes, and snippets.

@N0dr4x
Last active January 11, 2025 23:21
Show Gist options
  • Select an option

  • Save N0dr4x/ffe99618a738978605719ce525a33042 to your computer and use it in GitHub Desktop.

Select an option

Save N0dr4x/ffe99618a738978605719ce525a33042 to your computer and use it in GitHub Desktop.
Simple Scapy TCP Session
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# Author: N0dr4x ([email protected])
'''
Simple Scapy TCP Session class that provide ability
to :
- execute the 3-way handshake (eg. connect)
- properly close connection (->FIN/ACK, <-FIN/ACK, ->ACK )
- send automatic acknowledgment of received tcp data packet
- build a next packet to send with correct sequence number
- directly send data through the session
HINT : Don't forget to block TCP/RST packet that was send
by the linux kernel because no source port was bound.
# iptables -A OUTPUT -p tcp --sport 1337 --tcp-flags RST RST -j DROP
Source port is, for now, fixed to 1337 to facilitate wireshark filtering.
The purpose of this class is to easily build a working tcp session and
have complete scapy control of the next tcp packet.
Usage & example :
# Create the session object and connect to host 192.168.13.37 port 80
>>> sess = TcpSession(('192.168.13.37',80))
>>> sess.connect()
# Build next packet and send it fragmented (layer 2)
>>> p = sess.build('GET / HTTP/1.1\r\n\r\n')
>>> send(fragment(p, fragsize=16))
# Direct send data through the session and close
>>> sess.send('GET /index.html HTTP/1.1\r\n\r\n')
>>> sess.close()
# Session object can be reusable
>>> sess.connect()
>>> sess.send('GET /robot.txt HTTP/1.1\r\n\r\n')
>>> sess.close()
TODO :
1/ Optionally dump received data to a file
2/ Proper logging
'''
from scapy.all import *
from threading import Thread
class TcpSession:
def __init__(self,target):
self.seq = 0
self.ack = 0
self.ip = IP(dst=target[0])
self.sport = 1337
self.dport = target[1]
self.connected = False
self._ackThread = None
self._timeout = 3
def _ack(self, p):
self.ack = p[TCP].seq + len(p[Raw])
ack = self.ip/TCP(sport=self.sport, dport=self.dport, flags='A', seq=self.seq, ack=self.ack)
send(ack)
def _ack_rclose(self):
self.connected = False
self.ack += 1
fin_ack = self.ip/TCP(sport=self.sport, dport=self.dport, flags='FA', seq=self.seq, ack=self.ack)
ack = sr1(fin_ack, timeout=self._timeout)
self.seq += 1
assert ack.haslayer(TCP), 'TCP layer missing'
assert ack[TCP].flags & 0x10 == 0x10 , 'No ACK flag'
assert ack[TCP].ack == self.seq , 'Acknowledgment number error'
def _sniff(self):
s = L3RawSocket()
while self.connected:
p = s.recv(MTU)
if p.haslayer(TCP) and p.haslayer(Raw) \
and p[TCP].dport == self.sport :
self._ack(p)
if p.haslayer(TCP) and p[TCP].dport == self.sport \
and p[TCP].flags & 0x01 == 0x01 : # FIN
self._ack_rclose()
s.close()
self._ackThread = None
print('Acknowledgment thread stopped')
def _start_ackThread(self):
self._ackThread = Thread(name='AckThread',target=self._sniff)
self._ackThread.start()
def connect(self):
self.seq = random.randrange(0,(2**32)-1)
syn = self.ip/TCP(sport=self.sport, dport=self.dport, seq=self.seq, flags='S')
syn_ack = sr1(syn, timeout=self._timeout)
self.seq += 1
assert syn_ack.haslayer(TCP) , 'TCP layer missing'
assert syn_ack[TCP].flags & 0x12 == 0x12 , 'No SYN/ACK flags'
assert syn_ack[TCP].ack == self.seq , 'Acknowledgment number error'
self.ack = syn_ack[TCP].seq + 1
ack = self.ip/TCP(sport=self.sport, dport=self.dport, seq=self.seq, flags='A', ack=self.ack)
send(ack)
self.connected = True
self._start_ackThread()
print('Connected')
def close(self):
self.connected = False
fin = self.ip/TCP(sport=self.sport, dport=self.dport, flags='FA', seq=self.seq, ack=self.ack)
fin_ack = sr1(fin, timeout=self._timeout)
self.seq += 1
assert fin_ack.haslayer(TCP), 'TCP layer missing'
assert fin_ack[TCP].flags & 0x11 == 0x11 , 'No FIN/ACK flags'
assert fin_ack[TCP].ack == self.seq , 'Acknowledgment number error'
self.ack = fin_ack[TCP].seq + 1
ack = self.ip/TCP(sport=self.sport, dport=self.dport, flags='A', seq=self.seq, ack=self.ack)
send(ack)
print('Disconnected')
def build(self, payload):
psh = self.ip/TCP(sport=self.sport, dport=self.dport, flags='PA', seq=self.seq, ack=self.ack)/payload
self.seq += len(psh[Raw])
return psh
def send(self, payload):
psh = self.build(payload)
ack = sr1(psh, timeout=self._timeout)
assert ack.haslayer(TCP), 'TCP layer missing'
assert ack[TCP].flags & 0x10 == 0x10, 'No ACK flag'
assert ack[TCP].ack == self.seq , 'Acknowledgment number error'
@BrandonFanti
Copy link

Can similar be done to emulate a TCP server? Can Scapy receive TCP SYN packets and manually process them with a similar script set to receive SYN-ACK and respond with ACK, or will that host machine kernel always try to respond to the TCP session?

The answer to your first question is yes! I have a very basic little HTTP server running which sends the same small and lame response back - and it is stateless (as it sits - but that must change for my goals.)

Anything more complicated than what I have and I would imagine it must be stateful - in fact I can't really "officially" complete the transaction - (I just fire out the ACK for the page request, then server response + page content packet with TCP flags PSH ACK and FIN back-to-back after the request.) So, I never ACK the FIN+ACK, but the clients seem OK with that...

On the 2 devices I've tested there's no indicators of failure.
Total transaction takes 160ms - and I have a TON of mid-connection logging, and among other sub-optimal architecture decisions.

I'm using scapy.SuperSocket (not necessary, the sniffer would do) and scapy.layers.http.HTTPResponse

If you hadn't asked this - I wouldn't have stumbled into your gist. It's been over a year, are you still interested in it?
I would post it, but I'll have to do some refactoring, because this part is a snippet of a larger project I don't want to publish it yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment