Created
August 19, 2025 05:18
-
-
Save mosajjal/19a49cd75e39b411696106199aa3e0f7 to your computer and use it in GitHub Desktop.
Get first and last packet timestamp of a huge pcap instantly.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import struct | |
| import datetime | |
| import sys | |
| def read_pcap_timestamps(filename): | |
| """ | |
| Quickly extract first and last packet timestamps from a pcap file | |
| by reading the binary structure directly. | |
| """ | |
| with open(filename, 'rb') as f: | |
| # Read pcap global header (24 bytes) | |
| global_header = f.read(24) | |
| if len(global_header) < 24: | |
| raise ValueError("File too small to be a valid pcap") | |
| # Check magic number to determine endianness | |
| magic = struct.unpack('I', global_header[:4])[0] | |
| if magic == 0xa1b2c3d4: | |
| endian = '<' # Little endian | |
| elif magic == 0xd4c3b2a1: | |
| endian = '>' # Big endian | |
| else: | |
| raise ValueError("Not a valid pcap file") | |
| # Read first packet header (16 bytes) | |
| packet_header = f.read(16) | |
| if len(packet_header) < 16: | |
| raise ValueError("No packets in file") | |
| # Extract first packet timestamp | |
| ts_sec, ts_usec, caplen, origlen = struct.unpack(f'{endian}IIII', packet_header) | |
| first_timestamp = ts_sec + ts_usec / 1000000.0 | |
| # Find last packet by reading backwards from end more efficiently | |
| f.seek(0, 2) # Seek to end | |
| file_size = f.tell() | |
| # Read the last few KB of the file to find the last packet | |
| # Most packets are much smaller than this | |
| tail_size = min(65536, file_size - 24) # Read last 64KB or less | |
| f.seek(file_size - tail_size) | |
| tail_data = f.read(tail_size) | |
| last_valid_timestamp = first_timestamp | |
| # Parse through the tail data looking for packet headers | |
| i = 0 | |
| while i < len(tail_data) - 16: | |
| try: | |
| ts_sec, ts_usec, caplen, origlen = struct.unpack(f'{endian}IIII', tail_data[i:i+16]) | |
| # Sanity checks for valid packet header | |
| if (631152000 <= ts_sec <= 2208988800 and # Reasonable timestamp range | |
| ts_usec < 1000000 and # Valid microseconds | |
| 0 < caplen <= 65536 and # Reasonable capture length | |
| caplen <= origlen and # Captured <= original | |
| i + 16 + caplen <= len(tail_data)): # Enough data for packet | |
| candidate_timestamp = ts_sec + ts_usec / 1000000.0 | |
| if candidate_timestamp >= last_valid_timestamp: | |
| last_valid_timestamp = candidate_timestamp | |
| # Skip to next potential packet (current packet header + data) | |
| i += 16 + caplen | |
| else: | |
| i += 1 | |
| except (struct.error, OverflowError): | |
| i += 1 | |
| # If we didn't find any valid packets in the tail, | |
| # fall back to scanning the entire file (but efficiently) | |
| if last_valid_timestamp == first_timestamp and file_size > 24 + tail_size: | |
| f.seek(24) # Start after global header | |
| last_valid_timestamp = scan_file_for_last_timestamp(f, endian, file_size) | |
| # Convert to readable format and return | |
| first_dt = datetime.datetime.fromtimestamp(first_timestamp) | |
| last_dt = datetime.datetime.fromtimestamp(last_valid_timestamp) | |
| return { | |
| 'first_timestamp': first_timestamp, | |
| 'last_timestamp': last_valid_timestamp, | |
| 'first_datetime': first_dt, | |
| 'last_datetime': last_dt, | |
| 'duration_seconds': last_valid_timestamp - first_timestamp | |
| } | |
| def scan_file_for_last_timestamp(f, endian, file_size): | |
| """Fallback function to scan file if tail search fails""" | |
| last_timestamp = 0 | |
| chunk_size = 1024 * 1024 # 1MB chunks | |
| while f.tell() < file_size - 16: | |
| pos = f.tell() | |
| chunk = f.read(min(chunk_size, file_size - pos)) | |
| if len(chunk) < 16: | |
| break | |
| # Process this chunk | |
| i = 0 | |
| while i < len(chunk) - 16: | |
| try: | |
| ts_sec, ts_usec, caplen, origlen = struct.unpack(f'{endian}IIII', chunk[i:i+16]) | |
| if (631152000 <= ts_sec <= 2208988800 and | |
| ts_usec < 1000000 and | |
| 0 < caplen <= 65536 and | |
| caplen <= origlen): | |
| timestamp = ts_sec + ts_usec / 1000000.0 | |
| if timestamp > last_timestamp: | |
| last_timestamp = timestamp | |
| # Skip to next packet | |
| i += 16 + caplen | |
| if pos + i >= len(chunk): | |
| # Seek to continue from where this packet ends | |
| f.seek(pos + i) | |
| break | |
| else: | |
| i += 1 | |
| except (struct.error, OverflowError): | |
| i += 1 | |
| else: | |
| # If we processed the whole chunk, continue from current position | |
| continue | |
| break | |
| return last_timestamp | |
| def main(): | |
| if len(sys.argv) != 2: | |
| print("Usage: python pcap_timestamps.py <pcap_file>") | |
| sys.exit(1) | |
| filename = sys.argv[1] | |
| try: | |
| result = read_pcap_timestamps(filename) | |
| print(f"First packet: {result['first_datetime']} ({result['first_timestamp']})") | |
| print(f"Last packet: {result['last_datetime']} ({result['last_timestamp']})") | |
| print(f"Duration: {result['duration_seconds']:.2f} seconds") | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment