Skip to content

Instantly share code, notes, and snippets.

@Austinhs
Created December 13, 2022 19:21
Show Gist options
  • Select an option

  • Save Austinhs/6fa37d7e43b85848b89f7c4f1fa5a71b to your computer and use it in GitHub Desktop.

Select an option

Save Austinhs/6fa37d7e43b85848b89f7c4f1fa5a71b to your computer and use it in GitHub Desktop.
package observer
import (
"context"
"da/go-server/src/models"
"da/go-server/src/utils/constants"
"da/go-server/src/utils/database"
"database/sql"
"errors"
"log"
"math/big"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
ens "github.com/wealdtech/go-ens/v3"
"gorm.io/gorm"
)
var (
// Go Ethereum light client connection
soc = constants.RPC_WS
Client, sockerr = ethclient.Dial(soc)
// Channels
blocks = make(chan *types.Header)
logs = make(chan types.Log)
// Retries
logRetries = 0
blockRetries = 0
maxRetry = 10
// Context
blockCtx = context.Background()
logCtx = context.Background()
// Possible usable features
currentBlock = types.Header{}
)
func Run() {
if sockerr != nil {
log.Fatal(sockerr)
panic("Error creating ws connection. (" + soc + ")")
}
// Query for what we're looking for
topics := [][]common.Hash{}
transferHash := crypto.Keccak256Hash([]byte("Transfer(address,address,uint256)"))
topics = append(topics, []common.Hash{transferHash})
query := ethereum.FilterQuery{
Topics: topics,
Addresses: []common.Address{
common.HexToAddress(constants.ANIMAL_CONTRACT),
common.HexToAddress(constants.APPLE_CONTRACT),
common.HexToAddress(constants.WASTELAND_CONTRACT),
common.HexToAddress(constants.OG_CONTRACT),
},
}
// Subscribe to the logs
logSub := subscribeToLogs(query)
defer logSub.Unsubscribe()
// Subscribe to the blocks
blockSub := subscribeToBlocks()
defer blockSub.Unsubscribe()
for {
select {
case err := <-logSub.Err():
logSub.Unsubscribe()
handleSubError(err, &logRetries, &logSub, query, "logs")
case err := <-blockSub.Err():
blockSub.Unsubscribe()
handleSubError(err, &blockRetries, &blockSub, ethereum.FilterQuery{}, "block")
case block := <-blocks:
if blockRetries > 0 {
log.Println("Successfully reconnected to block subscription")
blockRetries = 0
}
currentBlock = *block
// log.Println("[ETH-OBSERVER] - New block:", block.Number, "@", block.Time)
case ethLog := <-logs:
if logRetries > 0 {
log.Println("Successfully reconnected to log subscription")
logRetries = 0
}
// What topic is this for?
isTransfer := ethLog.Topics[0].String() == transferHash.String()
// Handle topic
if isTransfer {
log.Println("[ETH-OBSERVER] - Transfer found in block:", currentBlock.Number, "@", currentBlock.Time)
if len(ethLog.Topics) > 3 {
ProcessTransferNFT(ethLog)
} else {
ProcessTransferERC20(ethLog)
}
}
}
}
}
func handleSubError(err error, retries *int, sub *ethereum.Subscription, query ethereum.FilterQuery, subType string) {
log.Println("Found error in finding "+subType+", trying to reconnect", err)
if maxRetry > *retries {
panic("Max retries reached for " + subType)
}
// Dial opposite RPC
if soc == constants.RPC_WS {
soc = constants.BACKUP_RPC_WS
} else {
soc = constants.RPC_WS
}
newClient, err := ethclient.Dial(soc)
if err != nil {
panic("Error creating ws connection. (" + soc + "): " + err.Error())
}
log.Println("Changing socket to", soc)
Client = newClient
var newSub ethereum.Subscription
if subType == "logs" {
newSub = subscribeToLogs(query)
} else {
newSub = subscribeToBlocks()
}
*retries++
*sub = newSub
}
func subscribeToBlocks() ethereum.Subscription {
sub, suberr := Client.SubscribeNewHead(blockCtx, blocks)
if suberr != nil {
panic(suberr)
}
return sub
}
func subscribeToLogs(query ethereum.FilterQuery) ethereum.Subscription {
sub, filtererr := Client.SubscribeFilterLogs(logCtx, query, logs)
if filtererr != nil {
panic(filtererr)
}
return sub
}
func ProcessTransferERC20(ethLog types.Log) error {
tx := ethLog.TxHash.String()
contract := ethLog.Address.String()
from := common.BytesToAddress(ethLog.Topics[1].Bytes()).Hex()
to := common.BytesToAddress(ethLog.Topics[2].Bytes()).Hex()
big_value := common.BytesToHash(ethLog.Data).Big()
value := big_value.Div(big_value, big.NewInt(1000000)).String()
log.Printf("Tx: %s\nContract: %s\nFrom: %s\nTo: %s\nValue: %s\n", tx, contract, from, to, value)
log.Println("===================^ ERC20 ^===========================")
return nil
}
func ProcessTransferNFT(ethLog types.Log) error {
tx := ethLog.TxHash.String()
contract := ethLog.Address.String()
from := common.BytesToAddress(ethLog.Topics[1].Bytes()).Hex()
to := common.BytesToAddress(ethLog.Topics[2].Bytes()).Hex()
token_id := ethLog.Topics[3].Big().Int64()
log.Printf("Tx: %s\nContract: %s\nFrom: %s\nTo: %s\nToken id: %v\n", tx, contract, from, to, token_id)
log.Println("===================^ NFT ^===========================")
updateMetadata(to, token_id)
err := updateLeaderboard(from, to, contract)
if err != nil {
return err
}
return nil
}
func updateMetadata(to string, token_id int64) {
database.DB.Exec(`UPDATE "Metadata" SET owner = ? WHERE token_id = ?`, to, token_id)
}
func updateLeaderboard(from string, to string, contract string) error {
if contract == constants.APPLE_CONTRACT {
return nil
}
return database.DB.Transaction(func(tx *gorm.DB) error {
// If wallet exists, update it else create it
fromRecord := models.Leaderboards{Address: from}
toRecord := models.Leaderboards{Address: to}
tx.FirstOrInit(&fromRecord, fromRecord)
tx.FirstOrInit(&toRecord, toRecord)
// Handle change in balance
switch contract {
case constants.OG_CONTRACT:
fromRecord.OgNftCount = fromRecord.OgNftCount - 1
toRecord.OgNftCount = toRecord.OgNftCount + 1
case constants.ANIMAL_CONTRACT:
fromRecord.SaNftCount = fromRecord.SaNftCount - 1
toRecord.SaNftCount = toRecord.SaNftCount + 1
case constants.WASTELAND_CONTRACT:
fromRecord.WlNftCount = fromRecord.WlNftCount - 1
toRecord.WlNftCount = toRecord.WlNftCount + 1
default:
// This will cause a rollback
return errors.New("Picked up wrong contract " + contract)
}
// ENS update
updateEns(&fromRecord)
updateEns(&toRecord)
// Update the records
tx.Save(&fromRecord)
tx.Save(&toRecord)
// Refresh view
tx.Exec("REFRESH MATERIALIZED VIEW mvw_leaderboard_ranks")
log.Println("Updated leaderboard")
return nil
})
}
func updateEns(record *models.Leaderboards) {
domain, err := ens.ReverseResolve(Client, common.HexToAddress(record.Address))
if err != nil {
log.Println("Error resolving ENS for " + record.Address)
return
}
sqlDomain := sql.NullString{
String: domain,
Valid: true,
}
if sqlDomain.String != record.Ens.String {
log.Println("Domain found: " + domain)
record.Ens = sqlDomain
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment