Created
August 26, 2025 23:50
-
-
Save jmcph4/a456b1558fd5512f28b0c3c9398f9cab to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // cargo-deps: tokio={version="1", features=["full"]} | |
| // Event-driven Tokio example that reacts to multiple kinds of events using `tokio::select!`. | |
| // | |
| // It concurrently handles: | |
| // - incoming TCP connections | |
| // - periodic timer ticks | |
| // - messages arriving on an mpsc channel | |
| // - Ctrl+C for graceful shutdown | |
| // | |
| // Run: `cargo run` then (optionally) `nc 127.0.0.1 8080` in another shell. | |
| use std::time::Duration; | |
| use tokio::{ | |
| io::{AsyncReadExt, AsyncWriteExt}, | |
| net::TcpListener, | |
| signal, | |
| sync::{broadcast, mpsc}, | |
| time, | |
| }; | |
| #[tokio::main] | |
| async fn main() -> eyre::Result<()> { | |
| color_eyre::install()?; | |
| // === Event sources === | |
| let listener = TcpListener::bind("127.0.0.1:8080").await?; | |
| let mut ticker = time::interval(Duration::from_secs(1)); // periodic event | |
| // mpsc: external producer pushes "work items" into the system | |
| let (tx, mut rx) = mpsc::channel::<String>(128); | |
| // broadcast: one-to-many shutdown signal | |
| let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(1); | |
| // === Spawn a simple producer that feeds the mpsc channel === | |
| tokio::spawn({ | |
| let tx = tx.clone(); | |
| async move { | |
| let mut i = 0u64; | |
| loop { | |
| time::sleep(Duration::from_millis(700)).await; | |
| if tx.send(format!("job-{i}")).await.is_err() { | |
| break; // receiver dropped | |
| } | |
| i += 1; | |
| } | |
| } | |
| }); | |
| // === Spawn Ctrl+C handler to trigger shutdown === | |
| tokio::spawn({ | |
| let shutdown_tx = shutdown_tx.clone(); | |
| async move { | |
| // Wait for SIGINT/Ctrl+C and notify all listeners | |
| let _ = signal::ctrl_c().await; | |
| let _ = shutdown_tx.send(()); | |
| } | |
| }); | |
| println!("listening on 127.0.0.1:8080 — press Ctrl+C to stop"); | |
| // OPTIONAL: an "idle" timer that fires if nothing else happens for 5s. | |
| // Because `sleep` is not cancellation-safe if recreated each loop, we pin one and reset it after firing. | |
| let mut idle = time::sleep(Duration::from_secs(5)); | |
| tokio::pin!(idle); | |
| // === Event loop === | |
| loop { | |
| tokio::select! { | |
| // 1) New TCP connection arrived | |
| accept_res = listener.accept() => { | |
| match accept_res { | |
| Ok((mut sock, addr)) => { | |
| println!("[net] connection from {addr}"); | |
| // Handle the socket without blocking other events | |
| tokio::spawn(async move { | |
| let _ = sock.write_all(b"hello from tokio\n").await; | |
| let mut buf = [0u8; 1024]; | |
| loop { | |
| match sock.read(&mut buf).await { | |
| Ok(0) => break, // closed | |
| Ok(n) => { | |
| // simple echo | |
| if sock.write_all(&buf[..n]).await.is_err() { | |
| break; | |
| } | |
| } | |
| Err(_) => break, | |
| } | |
| } | |
| }); | |
| // Reset idle timer because we just handled an event | |
| idle.as_mut().reset(time::Instant::now() + Duration::from_secs(5)); | |
| } | |
| Err(err) => { | |
| eprintln!("[net] accept error: {err:?}"); | |
| } | |
| } | |
| } | |
| // 2) Periodic tick | |
| _ = ticker.tick() => { | |
| println!("[tick] {}", chrono::Local::now().format("%H:%M:%S")); | |
| idle.as_mut().reset(time::Instant::now() + Duration::from_secs(5)); | |
| } | |
| // 3) Application message from mpsc producer | |
| Some(msg) = rx.recv() => { | |
| println!("[mpsc] received {msg}"); | |
| // ...do some useful work here... | |
| idle.as_mut().reset(time::Instant::now() + Duration::from_secs(5)); | |
| } | |
| // 4) Idle timer elapsed (no other events for a while) | |
| _ = &mut idle => { | |
| println!("[idle] nothing happened for 5s"); | |
| // Arm it again | |
| idle.as_mut().reset(time::Instant::now() + Duration::from_secs(5)); | |
| } | |
| // 5) Global shutdown broadcast | |
| _ = shutdown_rx.recv() => { | |
| println!("[shutdown] received signal, exiting event loop"); | |
| break; | |
| } | |
| } | |
| } | |
| // Notify any background tasks (if not already) and give them a moment to finish | |
| let _ = shutdown_tx.send(()); | |
| time::sleep(Duration::from_millis(200)).await; | |
| Ok(()) | |
| } | |
| // Dependencies: | |
| // | |
| // [dependencies] | |
| // tokio = { version = "1", features = ["full"] } | |
| // eyre = "0.6" | |
| // color-eyre = "0.6" | |
| // chrono = { version = "0.4", default-features = false, features = ["clock"] } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment