Last active
June 27, 2025 23:02
-
-
Save allenhark/510d3c8e8448af0bb1e952fde3475fa0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Import required modules | |
| import { Kafka, CompressionTypes, CompressionCodecs } from 'kafkajs'; | |
| import bs58 from 'bs58'; | |
| //@ts-ignore | |
| import { loadProto } from 'bitquery-protobuf-schema'; | |
| import LZ4 from 'kafkajs-lz4'; | |
| import { v4 as uuidv4 } from 'uuid'; | |
| import chalk from 'chalk'; | |
| import { extractPumpFunTokenDetails } from './ExtractToken' | |
| import { Connection, PublicKey, Finality, Keypair } from '@solana/web3.js'; | |
| import dotenv from 'dotenv'; | |
| import { PumpFunSDK } from "./pumpv3/pumpfunv2"; | |
| import NodeWallet from "@coral-xyz/anchor/dist/cjs/nodewallet"; | |
| import { AnchorProvider } from "@coral-xyz/anchor"; | |
| import connectionPool from './connectionpool'; | |
| // Types | |
| import { Buffer } from 'buffer'; | |
| dotenv.config(); | |
| // Enable LZ4 compression | |
| CompressionCodecs[CompressionTypes.LZ4] = new LZ4().codec; | |
| const username = 'xx'; | |
| const password = 'xx'; | |
| const topic = 'solana.tokens.proto'; | |
| const pumpFunProgram = '6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P'; | |
| const id = uuidv4(); | |
| // Initialize wallet | |
| const wallet = new NodeWallet(new Keypair()); | |
| //const connectionPool = new ConnectionPool(RPC_ENDPOINTS, rpcWsUrl); | |
| let isProcessing = false; | |
| // Initialize provider | |
| const provider = new AnchorProvider(connectionPool.getConnection(), wallet, { | |
| commitment: "processed", | |
| }); | |
| // Initialize SDKs | |
| const sdk = new PumpFunSDK(provider); | |
| const kafka = new Kafka({ | |
| clientId: username, | |
| brokers: ['rpk0.bitquery.io:9092', 'rpk1.bitquery.io:9092', 'rpk2.bitquery.io:9092'], | |
| sasl: { | |
| mechanism: 'scram-sha-512', | |
| username, | |
| password, | |
| }, | |
| }); | |
| const convertBytes = (buffer: Buffer, encoding: 'base58' | 'hex' = 'base58'): string => { | |
| return encoding === 'base58' ? bs58.encode(buffer) : buffer.toString('hex'); | |
| }; | |
| const convertProtobufToJson = (msg: any, encoding: 'base58' | 'hex' = 'base58'): any => { | |
| if (Array.isArray(msg)) { | |
| return msg.map(item => convertProtobufToJson(item, encoding)); | |
| } else if (msg && typeof msg === 'object' && !Buffer.isBuffer(msg)) { | |
| const result: Record<string, any> = {}; | |
| for (const [key, value] of Object.entries(msg)) { | |
| result[key] = convertProtobufToJson(value, encoding); | |
| } | |
| return result; | |
| } else if (Buffer.isBuffer(msg)) { | |
| return convertBytes(msg, encoding); | |
| } else { | |
| return msg; | |
| } | |
| }; | |
| const consumer = kafka.consumer({ groupId: `${username}-${id}` }); | |
| const run = async () => { | |
| try { | |
| const ParsedIdlBlockMessage = await loadProto(topic); | |
| await consumer.connect(); | |
| await consumer.subscribe({ topic, fromBeginning: false }); | |
| await consumer.run({ | |
| autoCommit: false, | |
| eachMessage: async ({ message }) => { | |
| if (!message.value) return; | |
| queueMicrotask(() => { | |
| processMessage(ParsedIdlBlockMessage, message) | |
| }) | |
| }, | |
| }); | |
| } catch (err) { | |
| console.error('Kafka consumer startup failed:', err); | |
| } | |
| }; | |
| async function processMessage(ParsedIdlBlockMessage, message: any) { | |
| try { | |
| const decoded = ParsedIdlBlockMessage.decode(message.value); | |
| const msgObj = ParsedIdlBlockMessage.toObject(decoded, { bytes: Buffer }); | |
| const jsonOutput = convertProtobufToJson(msgObj); | |
| const transactions = jsonOutput.Transactions || []; | |
| const filteredTxs = transactions.filter(tx => | |
| Array.isArray(tx.InstructionBalanceUpdates) && | |
| tx.InstructionBalanceUpdates.length > 0 && | |
| tx.Status?.Success === true | |
| ); | |
| for (const tx of filteredTxs) { | |
| const isPumpTx = tx.InstructionBalanceUpdates.some(inst => | |
| inst.Instruction?.Program?.Address === pumpFunProgram && | |
| inst.Instruction?.Program?.Method === 'create' && | |
| inst.Instruction?.Program?.Name === 'pump' | |
| ); | |
| if (!isPumpTx) continue; | |
| console.log(chalk.cyan('๐ Pump.fun token creation detected')); | |
| const meta = extractPumpFunTokenDetails(tx); | |
| // ๐ Fire-and-forget background task (non-blocking) | |
| setImmediate(() => { | |
| processToken(meta).catch((err) => | |
| console.error('Error in processToken:', err) | |
| ); | |
| }); | |
| } | |
| } catch (err) { | |
| console.error('Error decoding or processing Kafka message:', err); | |
| } | |
| } | |
| async function processToken(meta) { | |
| console.log(meta) | |
| let parsedTx = await connectionPool.getConnection().getParsedTransaction(meta.txSignature, { | |
| commitment: "confirmed" as Finality, | |
| maxSupportedTransactionVersion: 0 | |
| }); | |
| if (!parsedTx?.meta) { | |
| await new Promise(resolve => setTimeout(resolve, 100)); | |
| parsedTx = await connectionPool.getConnection().getParsedTransaction(meta.txSignature, { | |
| commitment: "confirmed" as Finality, | |
| maxSupportedTransactionVersion: 0 | |
| }); | |
| }; | |
| if (!parsedTx?.meta) { | |
| console.log('Meta missing') | |
| return; | |
| } | |
| console.time('Calculate time difference'); | |
| // Calculate time difference | |
| const blockTime = parsedTx.blockTime ? parsedTx.blockTime * 1000 : Date.now(); | |
| const timeDiff = Date.now() - blockTime; | |
| console.timeEnd('Calculate time difference'); | |
| console.log(chalk.red('Time Diff: ', timeDiff, 'ms')) | |
| } | |
| run().catch(console.error); | |
| process.on('SIGINT', async () => { | |
| console.log('\nCaught SIGINT, disconnecting Kafka consumer...'); | |
| try { | |
| await consumer.disconnect(); | |
| console.log('Kafka consumer disconnected.'); | |
| } catch (err) { | |
| console.error('Error disconnecting consumer:', err); | |
| } finally { | |
| process.exit(0); | |
| } | |
| }); | |
| process.on('SIGTERM', async () => { | |
| console.log('\nCaught SIGTERM, disconnecting Kafka consumer...'); | |
| try { | |
| await consumer.disconnect(); | |
| console.log('Kafka consumer disconnected.'); | |
| } catch (err) { | |
| console.error('Error disconnecting consumer:', err); | |
| } finally { | |
| process.exit(0); | |
| } | |
| }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment