Skip to content

Instantly share code, notes, and snippets.

@flatline
Last active June 19, 2025 19:43
Show Gist options
  • Select an option

  • Save flatline/e09bcb937c1a964196e17a2e9d6935af to your computer and use it in GitHub Desktop.

Select an option

Save flatline/e09bcb937c1a964196e17a2e9d6935af to your computer and use it in GitHub Desktop.
Parse SOL transactions from helius enhanced transaction endpoint
from datetime import datetime, timezone
SOL_ADDRESSES = [
"So11111111111111111111111111111111111111111", # SOL
"So11111111111111111111111111111111111111112", # WSOL
]
IGNORE_ADDRESSES = [
"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", # USDC
]
def is_sol(addr: str):
return addr in SOL_ADDRESSES
def parse_trade(tx: dict) -> Trade:
"""
Parse the helius `tx` json payload. Will return `None` if the transaction
is not to/from SOL, e.g. for USDC or other cross-chain swaps.
"""
ts = datetime.fromtimestamp(tx["timestamp"], tz=timezone.utc)
id = tx["signature"]
wallet = tx["feePayer"]
transfers = [
t
for t in tx.get("tokenTransfers", [])
if (t["fromUserAccount"] == wallet or t["toUserAccount"] == wallet)
]
token_transfers = [t for t in transfers if not is_sol(t["mint"])]
sol_transfers = [t for t in transfers if is_sol(t["mint"])]
if len(token_transfers) == 0 or len(sol_transfers) == 0:
print(
f"No SOL token transfers found for txn {id}",
file=sys.stderr,
)
return None
token_address = token_transfers[0]["mint"]
if token_address in IGNORE_ADDRESSES:
print(f"Address in ignore list for {id}", file=sys.stderr)
return None
is_buy = token_transfers[0]["toUserAccount"] == wallet
token_amount = sum([float(t["tokenAmount"]) for t in token_transfers])
sol_amount = sum([float(t["tokenAmount"]) for t in sol_transfers])
return {
"time": ts,
"is_buy": is_buy,
"token_address": token_address,
"token_amount": token_amount,
"sol_amount": sol_amount
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment