-
-
Save wirepair/c0db3d67e599a5a0cc8fdbda47f6dc8f to your computer and use it in GitHub Desktop.
| // An attempt at an optimized udp client/server implementation that has clients sending 10pps. | |
| // run the server: go build && main -server -num 5000 | |
| // run the client: go build && main -num 5000 | |
| // i was only able to get 9000 clients sending for 30 seconds with 0 packet loss in windows | |
| // after that i started get drops | |
| // | |
| // author: isaac dawson @ https://twitter.com/_wirepair | |
| package main | |
| import ( | |
| "encoding/binary" | |
| "errors" | |
| "flag" | |
| "github.com/pkg/profile" | |
| "log" | |
| "math/rand" | |
| "net" | |
| "os" | |
| "os/signal" | |
| "sync" | |
| "sync/atomic" | |
| "time" | |
| ) | |
| const ( | |
| MAX_PACKET_BYTES = 1220 | |
| SOCKET_BUF_SIZE = 1024 * 1024 | |
| ) | |
| var serverMode bool | |
| var maxClients int | |
| var runTime float64 | |
| var runProfile bool | |
| var randomizeStart bool | |
| var clientTotalSend uint64 | |
| var clientTotalRecv uint64 | |
| var addr = &net.UDPAddr{IP: net.ParseIP("::1"), Port: 40000} | |
| func init() { | |
| flag.BoolVar(&serverMode, "server", false, "pass this flag to run the server") | |
| flag.BoolVar(&runProfile, "prof", false, "pass this flag to enable profiling") | |
| flag.BoolVar(&randomizeStart, "rand", false, "pass this flag to randomize client startups") | |
| flag.IntVar(&maxClients, "num", 64, "number of clients to serve or to create") | |
| flag.Float64Var(&runTime, "runtime", 5.0, "how long to run clients for/clear client buffer in seconds") | |
| } | |
| // our struct for passing data and client addresses around | |
| type netcodeData struct { | |
| data []byte | |
| from *net.UDPAddr | |
| } | |
| // allows for supporting custom handlers | |
| type NetcodeRecvHandler func(data *netcodeData) | |
| type NetcodeConn struct { | |
| conn *net.UDPConn // the underlying connection | |
| closeCh chan struct{} // used for closing the connection/signaling | |
| isClosed bool // is this connection open/closed? | |
| maxBytes int // maximum allowed bytes for read/write | |
| xmitBuf sync.Pool // re-use recv buf to reduce allocs | |
| recvSize int | |
| sendSize int | |
| recvHandlerFn NetcodeRecvHandler // allow custom recv handlers | |
| } | |
| // Creates a new netcode connection | |
| func NewNetcodeConn() *NetcodeConn { | |
| c := &NetcodeConn{} | |
| c.closeCh = make(chan struct{}) | |
| c.isClosed = true | |
| c.maxBytes = MAX_PACKET_BYTES | |
| return c | |
| } | |
| // set a custom recv handler (must be called before Dial or Listen) | |
| func (c *NetcodeConn) SetRecvHandler(recvHandlerFn NetcodeRecvHandler) { | |
| c.recvHandlerFn = recvHandlerFn | |
| } | |
| // Write to the connection | |
| func (c *NetcodeConn) Write(b []byte) (int, error) { | |
| if c.isClosed { | |
| return -1, errors.New("unable to write, socket has been closed") | |
| } | |
| return c.conn.Write(b) | |
| } | |
| // Write to an address (only usable via Listen) | |
| func (c *NetcodeConn) WriteTo(b []byte, to *net.UDPAddr) (int, error) { | |
| if c.isClosed { | |
| return -1, errors.New("unable to write, socket has been closed") | |
| } | |
| return c.conn.WriteToUDP(b, to) | |
| } | |
| // Shutdown time. | |
| func (c *NetcodeConn) Close() error { | |
| if !c.isClosed { | |
| close(c.closeCh) | |
| } | |
| c.isClosed = true | |
| return c.conn.Close() | |
| } | |
| // Dial the server | |
| func (c *NetcodeConn) Dial(address *net.UDPAddr) error { | |
| var err error | |
| if c.recvHandlerFn == nil { | |
| return errors.New("packet handler must be set before calling listen") | |
| } | |
| c.closeCh = make(chan struct{}) | |
| c.conn, err = net.DialUDP(address.Network(), nil, address) | |
| if err != nil { | |
| return err | |
| } | |
| c.sendSize = SOCKET_BUF_SIZE | |
| c.recvSize = SOCKET_BUF_SIZE | |
| c.create() | |
| return nil | |
| } | |
| // Listen for connections on address | |
| func (c *NetcodeConn) Listen(address *net.UDPAddr) error { | |
| var err error | |
| if c.recvHandlerFn == nil { | |
| return errors.New("packet handler must be set before calling listen") | |
| } | |
| c.conn, err = net.ListenUDP(address.Network(), address) | |
| if err != nil { | |
| return err | |
| } | |
| c.sendSize = SOCKET_BUF_SIZE * maxClients | |
| c.recvSize = SOCKET_BUF_SIZE * maxClients | |
| c.create() | |
| return nil | |
| } | |
| // setup xmit buffer pool, socket read/write sizes and kick off readloop | |
| func (c *NetcodeConn) create() { | |
| c.isClosed = false | |
| c.xmitBuf.New = func() interface{} { | |
| return make([]byte, c.maxBytes) | |
| } | |
| c.conn.SetReadBuffer(c.recvSize) | |
| c.conn.SetWriteBuffer(c.sendSize) | |
| go c.readLoop() | |
| } | |
| // read blocks, so this must be called from a go routine | |
| func (c *NetcodeConn) receiver(ch chan *netcodeData) { | |
| for { | |
| if err := c.read(); err == nil { | |
| select { | |
| case <-c.closeCh: | |
| return | |
| default: | |
| } | |
| } else { | |
| log.Printf("error reading data from socket: %s\n", err) | |
| } | |
| } | |
| } | |
| // read does the actual connection read call, verifies we have a | |
| // buffer > 0 and < maxBytes before we bother to attempt to actually | |
| // dispatch it to the recvHandlerFn. | |
| func (c *NetcodeConn) read() error { | |
| var n int | |
| var from *net.UDPAddr | |
| var err error | |
| data := c.xmitBuf.Get().([]byte)[:c.maxBytes] | |
| n, from, err = c.conn.ReadFromUDP(data) | |
| if err != nil { | |
| return err | |
| } | |
| if n == 0 { | |
| return errors.New("socket error: 0 byte length recv'd") | |
| } | |
| if n > c.maxBytes { | |
| return errors.New("packet size was > maxBytes") | |
| } | |
| netData := &netcodeData{} | |
| netData.data = data[:n] | |
| netData.from = from | |
| go c.recvHandlerFn(netData) | |
| return nil | |
| } | |
| // dispatch the netcodeData to the bound recvHandler function. | |
| func (c *NetcodeConn) readLoop() { | |
| dataCh := make(chan *netcodeData) | |
| c.receiver(dataCh) | |
| <-c.closeCh | |
| } | |
| func main() { | |
| flag.Parse() | |
| buf := make([]byte, MAX_PACKET_BYTES) | |
| for i := 0; i < len(buf); i += 1 { | |
| buf[i] = byte(i) | |
| } | |
| if runProfile { | |
| p := profile.Start(profile.CPUProfile, profile.ProfilePath("."), profile.NoShutdownHook) | |
| defer p.Stop() | |
| } | |
| if serverMode { | |
| runServer(buf) | |
| return | |
| } | |
| wg := &sync.WaitGroup{} | |
| for i := 0; i < maxClients; i += 1 { | |
| wg.Add(1) | |
| go runClient(wg, buf, i) | |
| } | |
| wg.Wait() | |
| log.Printf("client total send/recv: %d/%d\n", clientTotalSend, clientTotalRecv) | |
| } | |
| func runServer(buf []byte) { | |
| conn := NewNetcodeConn() | |
| recvCount := make([]uint64, maxClients) | |
| // set our recv handler to just get client ids, increment and spew a buffer back to client | |
| conn.SetRecvHandler(func(data *netcodeData) { | |
| // obviously this is dumb and you'd never use userinput to index into a slice, but, testing. | |
| clientId := binary.LittleEndian.Uint16(data.data) | |
| atomic.AddUint64(&recvCount[clientId], 1) | |
| conn.WriteTo(buf, data.from) | |
| }) | |
| if err := conn.Listen(addr); err != nil { | |
| log.Fatalf("error in listen: %s\n", err) | |
| } | |
| log.Printf("listening on %s\n", addr.String()) | |
| c := make(chan os.Signal, 1) | |
| signal.Notify(c, os.Interrupt) | |
| // wait for the good ol' ctrl+c | |
| <-c | |
| total := uint64(0) | |
| for i := 0; i < maxClients; i += 1 { | |
| log.Printf("clientId: %d recv'd/sent %d", i, recvCount[i]) | |
| total += recvCount[i] | |
| } | |
| log.Printf("\ntotal: %d\n", total) | |
| conn.Close() | |
| } | |
| // run our client, sending packets at 10z | |
| func runClient(wg *sync.WaitGroup, buf []byte, index int) { | |
| clientBuf := make([]byte, len(buf)) | |
| copy(clientBuf, buf) | |
| binary.LittleEndian.PutUint16(clientBuf[:2], uint16(index)) | |
| doneTimer := time.NewTimer(time.Duration(runTime * float64(time.Second))) | |
| sendPacket := time.NewTicker(100 * time.Millisecond) // 10hz | |
| sendCount := uint64(0) | |
| recvCount := uint64(0) | |
| conn := NewNetcodeConn() | |
| conn.SetRecvHandler(func(data *netcodeData) { | |
| atomic.AddUint64(&recvCount, 1) | |
| }) | |
| // randomize start up of clients | |
| if randomizeStart { | |
| time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) | |
| } | |
| if err := conn.Dial(addr); err != nil { | |
| log.Fatalf("error connecting to %s\n", err) | |
| } | |
| for { | |
| select { | |
| // time to send the packets! | |
| case <-sendPacket.C: | |
| if _, err := conn.Write(clientBuf); err != nil { | |
| log.Fatalf("error sending packets: %s\n", err) | |
| } | |
| atomic.AddUint64(&sendCount, 1) | |
| case <-doneTimer.C: | |
| sendPacket.Stop() | |
| doneTimer.Stop() | |
| time.Sleep(500 * time.Millisecond) | |
| rxcnt := atomic.LoadUint64(&recvCount) | |
| txcnt := atomic.LoadUint64(&sendCount) | |
| log.Printf("client: %d recv'd: %d sent: %d\n", index, rxcnt, txcnt) | |
| atomic.AddUint64(&clientTotalRecv, rxcnt) | |
| atomic.AddUint64(&clientTotalSend, txcnt) | |
| wg.Done() | |
| return | |
| } | |
| } | |
| } |
@gafferongames, wow yeah i multiplied SOCKET_BUF_SIZE by # of clients and i can now get about 9000 clients with 0 packet loss!
BAM!
@wirepair I came across this on twitter and thought it was good example app to replicate to learn Rust/Tokio. I'm still smoothing out some things and figuring out the best way to do things, but here's what I have:
https://gist.github.com/bschwind/c2d9ab615a78f6370890f31f061b1a01
Repo here: https://github.com/bschwind/udp-stress-test/tree/master
You can run the server with cargo run --release -- -s. Currently the server is single-threaded...I'm curious to see how it performs on your machine with your go clients running against it. I'm on a 2014 Macbook now and don't have a lot of RAM to spare, I was getting no buffer space available errors at ~4000-5000 clients.
Sorry, the config for client count and duration don't have command line args yet, they're defined near the top of the file. I also need to add a Ctrl-C handler to stop the server and print the statistics.
Try increasing SOCKET_BUF_SIZE? I've found it needs to be proportional to the # of clients on the server, but the client can have a smaller buffer size.