Created
January 21, 2026 15:11
-
-
Save Pymmdrza/65325d401cbe7039d71a74ad20d39b0c to your computer and use it in GitHub Desktop.
Bitcoin Blockchain Scanner
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| import threading | |
| from queue import Queue | |
| from rich.live import Live | |
| from rich.table import Table | |
| from rich import box | |
| import requests | |
| import json | |
| BLOCKCHAIN_ENDPOINT = "https://blockchain.info/unconfirmed-transactions?format=json" | |
| # Styles for different columns | |
| TIME_STYLE = "cyan" | |
| FROM_STYLE = "grey70" | |
| TO_STYLE = "dark_green" | |
| VALUE_STYLE = "white on dark_green" | |
| FEE_STYLE = "grey19" | |
| TXID_STYLE = "grey70" | |
| FROM_STYLE_2 = "light_green on dark_green" | |
| TO_STYLE_2 = "grey35 on grey70" | |
| VALUE_STYLE_2 = "white on dark_blue" | |
| TXID_STYLE_2 = "light_green on dark_green" | |
| def fetch_txs(): | |
| """Fetch unconfirmed transactions from blockchain API""" | |
| try: | |
| response = requests.get(BLOCKCHAIN_ENDPOINT, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data.get('txs', []) | |
| else: | |
| return [] | |
| except Exception as e: | |
| return [] | |
| def parse_tx(tx): | |
| """Parse transaction data from API response""" | |
| txid = tx.get('hash', 'N/A') | |
| time_unix = tx.get('time', 0) | |
| time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time_unix)) | |
| inputs = tx.get('inputs', []) | |
| outputs = tx.get('out', []) | |
| total_input = sum(inp.get('prev_out', {}).get('value', 0) for inp in inputs) / 1e8 | |
| total_output = sum(out.get('value', 0) for out in outputs) / 1e8 | |
| fee = (total_input - total_output) if total_input > total_output else 0 | |
| return { | |
| 'txid': txid, | |
| 'time': time_str, | |
| 'total_input': total_input, | |
| 'total_output': total_output, | |
| 'inputs': inputs, | |
| 'outputs': outputs, | |
| 'fee': fee | |
| } | |
| def show_value(val): | |
| # Using right-aligned format with fixed total width | |
| formatted_num = f"{val:>10.6f}" | |
| rateNum = f"{formatted_num} BTC" | |
| return rateNum | |
| def format_tx_row(tx): | |
| """Format parsed transaction into a styled table row, highlighting high-value transactions""" | |
| from_addr = tx['inputs'][0]['prev_out'].get('addr', 'N/A') if tx['inputs'] else 'N/A' | |
| to_addr = tx['outputs'][0].get('addr', 'N/A') if tx['outputs'] else 'N/A' | |
| # Highlight transactions with value >= 1 BTC | |
| is_high_value = tx['total_output'] >= 1.0 | |
| from_style = FROM_STYLE_2 if is_high_value else FROM_STYLE | |
| to_style = TO_STYLE_2 if is_high_value else TO_STYLE | |
| value_style = VALUE_STYLE_2 if is_high_value else VALUE_STYLE | |
| txid_style = TXID_STYLE_2 if is_high_value else TXID_STYLE | |
| fee_style = FEE_STYLE | |
| return [ | |
| f"[{TIME_STYLE}]{tx['time']}[/{TIME_STYLE}]", | |
| f"[{value_style}]{show_value(tx['total_output'])}[/{value_style}]", | |
| f"[{fee_style}]{tx['fee']:.8f} BTC[/{fee_style}]", | |
| f"[{txid_style}]{tx['txid']}[/{txid_style}]", | |
| f"[{from_style}]{from_addr}[/{from_style}]", | |
| f"[{to_style}]{to_addr}[/{to_style}]", | |
| ] | |
| def create_table(): | |
| """Create table with formatted headers""" | |
| table = Table( | |
| box=box.ROUNDED, | |
| show_header=True, | |
| title="[bold white]Bitcoin Blockchain Scanner[/bold white]", | |
| header_style="grey66 on grey7", | |
| style="grey37", | |
| show_edge=True, | |
| expand=False, | |
| padding=(0, 1), | |
| row_styles=["none", "grey11"] | |
| ) | |
| table.add_column("Time", justify="center", no_wrap=True) | |
| table.add_column("Value", justify="full", no_wrap=True) | |
| table.add_column("Fee", no_wrap=False) | |
| table.add_column("TXID", no_wrap=False, max_width=64, min_width=32) | |
| table.add_column("From", justify="full", no_wrap=False) | |
| table.add_column("To", justify="full", no_wrap=False) | |
| return table | |
| def build_table(rows_data): | |
| """Build table from existing rows data""" | |
| table = create_table() | |
| for row in rows_data: | |
| table.add_row(*row) | |
| return table | |
| def fetch_txs_background(tx_queue, seen_txids, stop_event): | |
| """Background thread to continuously fetch transactions from API""" | |
| while not stop_event.is_set(): | |
| try: | |
| raw_txs = fetch_txs() | |
| if raw_txs: | |
| for tx in raw_txs: | |
| tx_hash = tx.get('hash') | |
| if tx_hash and tx_hash not in seen_txids: | |
| seen_txids.add(tx_hash) | |
| parsed = parse_tx(tx) | |
| formatted_row = format_tx_row(parsed) | |
| tx_queue.put(formatted_row) | |
| time.sleep(2) # Fetch every 2 seconds for faster updates | |
| except Exception as e: | |
| time.sleep(2) | |
| def main(): | |
| max_rows = 23 | |
| rows_data = [] | |
| tx_queue = Queue() | |
| seen_txids = set() | |
| stop_event = threading.Event() | |
| # Start background fetching thread | |
| fetch_thread = threading.Thread( | |
| target=fetch_txs_background, | |
| args=(tx_queue, seen_txids, stop_event), | |
| daemon=True | |
| ) | |
| fetch_thread.start() | |
| refresh_rate = 9 | |
| update_interval = 0.3 | |
| # Initialize table with initial rows from queue | |
| initial_fill_time = time.time() | |
| while len(rows_data) < max_rows and (time.time() - initial_fill_time) < 5: | |
| if not tx_queue.empty(): | |
| new_row = tx_queue.get() | |
| rows_data.append(new_row) | |
| else: | |
| time.sleep(0.05) | |
| with Live(build_table(rows_data), refresh_per_second=refresh_rate, screen=True, | |
| transient=True, auto_refresh=True) as live: | |
| try: | |
| while True: | |
| # Add new row at the beginning | |
| if not tx_queue.empty(): | |
| new_row = tx_queue.get() | |
| rows_data.insert(0, new_row) | |
| # Keep only max_rows | |
| if len(rows_data) > max_rows: | |
| rows_data = rows_data[:max_rows] | |
| # Update display | |
| live.update(build_table(rows_data)) | |
| time.sleep(update_interval) | |
| except KeyboardInterrupt: | |
| stop_event.set() | |
| fetch_thread.join(timeout=2) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment