Skip to content

Instantly share code, notes, and snippets.

@Austinhs
Created February 20, 2025 14:13
Show Gist options
  • Select an option

  • Save Austinhs/9c8a0a48fa6edd7de8f6f04047a9f4dd to your computer and use it in GitHub Desktop.

Select an option

Save Austinhs/9c8a0a48fa6edd7de8f6f04047a9f4dd to your computer and use it in GitHub Desktop.
Simple Anvil Wrapper
import { ethers } from "ethers";
import { getChain } from "./helpers.js";
import { spawn } from "child_process";
import { openSync, existsSync, mkdirSync } from "fs";
import net from 'net';
export default class Anvil {
forkRpc;
forkProvider;
blockNumber;
process;
provider;
chainId;
chainName;
port;
blockTime;
constructor(chain_name, block_start, port = 8545, blockTime = undefined) {
const { rpc: forkRpc, provider: forkProvider } = getChain(chain_name);
this.forkRpc = forkRpc;
this.forkProvider = forkProvider;
this.blockNumber = block_start;
this.chainName = chain_name;
this.port = port;
this.blockTime = blockTime;
}
async start() {
let extraOptions = [];
const { chainId } = await this.forkProvider.getNetwork();
this.chainId = chainId;
const logPath = `${process.cwd()}/storage/${this.chainName}-anvil.log`;
if(!existsSync(`${process.cwd()}/storage`)) {
mkdirSync(`${process.cwd()}/storage`);
}
const logFile = openSync(logPath, 'w');
const isPortUsed = await isPortInUse(this.port);
if(isPortUsed) {
throw new Error(`Anvil Port ${this.port} is already in use, make sure to terminate any existing anvil process`);
}
if(!(this.blockNumber instanceof Number)) {
const block = await this.forkProvider.getBlock('latest');
this.blockNumber = block.number;
}
if(this.blockTime != undefined) {
extraOptions.push("--block-time", this.blockTime);
}
const anvil = spawn("anvil", [
"--fork-url", this.forkRpc,
"--fork-chain-id", this.chainId,
"--fork-block-number", this.blockNumber,
"--port", this.port,
"--no-rate-limit",
"--no-cors",
].concat(extraOptions), {
detached: false,
stdio: ['ignore', logFile, logFile],
});
this.process = anvil;
await waitUntilPortIsUsed(this.port);
this.provider = new ethers.JsonRpcProvider(`http://localhost:${this.port}`);
this.provider.send("anvil_setRpcUrl", [this.forkRpc]);
this.provider.send("anvil_setLoggingEnabled", [true]);
}
stop() {
if(this.process) {
this.process.kill();
}
}
async modifyBalance(wallet_address, amount) {
return this.provider.send("anvil_setBalance", [wallet_address, amount]);
}
async getTimestamp() {
const block = await this.provider.getBlock('latest');
return block.timestamp;
}
async setBlockTimestamp(block_timestamp) {
await this.provider.send("evm_setNextBlockTimestamp", [block_timestamp]);
await this.mineBlock();
// Wait block time seconds
await new Promise(resolve => setTimeout(resolve, this.blockTime * 1000));
}
async mineBlock() {
await this.provider.send("evm_mine", []);
}
async impersonate(address) {
await this.provider.send("anvil_impersonateAccount", [address]);
}
async stopImpersonating(address) {
await this.provider.send("anvil_stopImpersonatingAccount", [address]);
}
}
function waitUntilPortIsUsed(port, retries = 10, retryTime = 250) {
return new Promise((resolve, reject) => {
let currentRetries = 0;
const checkPort = async () => {
if (await isPortInUse(port)) {
resolve();
} else {
if (currentRetries >= retries) {
reject(`Port ${port} is not in use after ${retries} retries. Ensure the port is available & there is no errors in the anvil log.`);
} else {
currentRetries++;
setTimeout(checkPort, retryTime);
}
}
};
checkPort();
});
}
function isPortInUse(port, host = '127.0.0.1') {
return new Promise((resolve, reject) => {
const server = net.createServer();
server.once('error', (err) => {
if (err.code === 'EADDRINUSE') {
resolve(true); // Port is in use
} else {
server.close();
reject(err); // Other errors (e.g., permission issues)
}
});
server.once('listening', () => {
server.close(); // Close the server if it successfully listens
resolve(false); // Port is not in use
});
server.listen(port, host);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment