Skip to content

Instantly share code, notes, and snippets.

@Pymmdrza
Created January 21, 2026 15:11
Show Gist options
  • Select an option

  • Save Pymmdrza/65325d401cbe7039d71a74ad20d39b0c to your computer and use it in GitHub Desktop.

Select an option

Save Pymmdrza/65325d401cbe7039d71a74ad20d39b0c to your computer and use it in GitHub Desktop.
Bitcoin Blockchain Scanner
import time
import threading
from queue import Queue
from rich.live import Live
from rich.table import Table
from rich import box
import requests
import json
BLOCKCHAIN_ENDPOINT = "https://blockchain.info/unconfirmed-transactions?format=json"
# Styles for different columns
TIME_STYLE = "cyan"
FROM_STYLE = "grey70"
TO_STYLE = "dark_green"
VALUE_STYLE = "white on dark_green"
FEE_STYLE = "grey19"
TXID_STYLE = "grey70"
FROM_STYLE_2 = "light_green on dark_green"
TO_STYLE_2 = "grey35 on grey70"
VALUE_STYLE_2 = "white on dark_blue"
TXID_STYLE_2 = "light_green on dark_green"
def fetch_txs():
"""Fetch unconfirmed transactions from blockchain API"""
try:
response = requests.get(BLOCKCHAIN_ENDPOINT, timeout=10)
if response.status_code == 200:
data = response.json()
return data.get('txs', [])
else:
return []
except Exception as e:
return []
def parse_tx(tx):
"""Parse transaction data from API response"""
txid = tx.get('hash', 'N/A')
time_unix = tx.get('time', 0)
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time_unix))
inputs = tx.get('inputs', [])
outputs = tx.get('out', [])
total_input = sum(inp.get('prev_out', {}).get('value', 0) for inp in inputs) / 1e8
total_output = sum(out.get('value', 0) for out in outputs) / 1e8
fee = (total_input - total_output) if total_input > total_output else 0
return {
'txid': txid,
'time': time_str,
'total_input': total_input,
'total_output': total_output,
'inputs': inputs,
'outputs': outputs,
'fee': fee
}
def show_value(val):
# Using right-aligned format with fixed total width
formatted_num = f"{val:>10.6f}"
rateNum = f"{formatted_num} BTC"
return rateNum
def format_tx_row(tx):
"""Format parsed transaction into a styled table row, highlighting high-value transactions"""
from_addr = tx['inputs'][0]['prev_out'].get('addr', 'N/A') if tx['inputs'] else 'N/A'
to_addr = tx['outputs'][0].get('addr', 'N/A') if tx['outputs'] else 'N/A'
# Highlight transactions with value >= 1 BTC
is_high_value = tx['total_output'] >= 1.0
from_style = FROM_STYLE_2 if is_high_value else FROM_STYLE
to_style = TO_STYLE_2 if is_high_value else TO_STYLE
value_style = VALUE_STYLE_2 if is_high_value else VALUE_STYLE
txid_style = TXID_STYLE_2 if is_high_value else TXID_STYLE
fee_style = FEE_STYLE
return [
f"[{TIME_STYLE}]{tx['time']}[/{TIME_STYLE}]",
f"[{value_style}]{show_value(tx['total_output'])}[/{value_style}]",
f"[{fee_style}]{tx['fee']:.8f} BTC[/{fee_style}]",
f"[{txid_style}]{tx['txid']}[/{txid_style}]",
f"[{from_style}]{from_addr}[/{from_style}]",
f"[{to_style}]{to_addr}[/{to_style}]",
]
def create_table():
"""Create table with formatted headers"""
table = Table(
box=box.ROUNDED,
show_header=True,
title="[bold white]Bitcoin Blockchain Scanner[/bold white]",
header_style="grey66 on grey7",
style="grey37",
show_edge=True,
expand=False,
padding=(0, 1),
row_styles=["none", "grey11"]
)
table.add_column("Time", justify="center", no_wrap=True)
table.add_column("Value", justify="full", no_wrap=True)
table.add_column("Fee", no_wrap=False)
table.add_column("TXID", no_wrap=False, max_width=64, min_width=32)
table.add_column("From", justify="full", no_wrap=False)
table.add_column("To", justify="full", no_wrap=False)
return table
def build_table(rows_data):
"""Build table from existing rows data"""
table = create_table()
for row in rows_data:
table.add_row(*row)
return table
def fetch_txs_background(tx_queue, seen_txids, stop_event):
"""Background thread to continuously fetch transactions from API"""
while not stop_event.is_set():
try:
raw_txs = fetch_txs()
if raw_txs:
for tx in raw_txs:
tx_hash = tx.get('hash')
if tx_hash and tx_hash not in seen_txids:
seen_txids.add(tx_hash)
parsed = parse_tx(tx)
formatted_row = format_tx_row(parsed)
tx_queue.put(formatted_row)
time.sleep(2) # Fetch every 2 seconds for faster updates
except Exception as e:
time.sleep(2)
def main():
max_rows = 23
rows_data = []
tx_queue = Queue()
seen_txids = set()
stop_event = threading.Event()
# Start background fetching thread
fetch_thread = threading.Thread(
target=fetch_txs_background,
args=(tx_queue, seen_txids, stop_event),
daemon=True
)
fetch_thread.start()
refresh_rate = 9
update_interval = 0.3
# Initialize table with initial rows from queue
initial_fill_time = time.time()
while len(rows_data) < max_rows and (time.time() - initial_fill_time) < 5:
if not tx_queue.empty():
new_row = tx_queue.get()
rows_data.append(new_row)
else:
time.sleep(0.05)
with Live(build_table(rows_data), refresh_per_second=refresh_rate, screen=True,
transient=True, auto_refresh=True) as live:
try:
while True:
# Add new row at the beginning
if not tx_queue.empty():
new_row = tx_queue.get()
rows_data.insert(0, new_row)
# Keep only max_rows
if len(rows_data) > max_rows:
rows_data = rows_data[:max_rows]
# Update display
live.update(build_table(rows_data))
time.sleep(update_interval)
except KeyboardInterrupt:
stop_event.set()
fetch_thread.join(timeout=2)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment