Skip to content

Instantly share code, notes, and snippets.

@jmcph4
Created August 26, 2025 23:50
Show Gist options
  • Select an option

  • Save jmcph4/a456b1558fd5512f28b0c3c9398f9cab to your computer and use it in GitHub Desktop.

Select an option

Save jmcph4/a456b1558fd5512f28b0c3c9398f9cab to your computer and use it in GitHub Desktop.
// cargo-deps: tokio={version="1", features=["full"]}
// Event-driven Tokio example that reacts to multiple kinds of events using `tokio::select!`.
//
// It concurrently handles:
// - incoming TCP connections
// - periodic timer ticks
// - messages arriving on an mpsc channel
// - Ctrl+C for graceful shutdown
//
// Run: `cargo run` then (optionally) `nc 127.0.0.1 8080` in another shell.
use std::time::Duration;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
signal,
sync::{broadcast, mpsc},
time,
};
#[tokio::main]
async fn main() -> eyre::Result<()> {
color_eyre::install()?;
// === Event sources ===
let listener = TcpListener::bind("127.0.0.1:8080").await?;
let mut ticker = time::interval(Duration::from_secs(1)); // periodic event
// mpsc: external producer pushes "work items" into the system
let (tx, mut rx) = mpsc::channel::<String>(128);
// broadcast: one-to-many shutdown signal
let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(1);
// === Spawn a simple producer that feeds the mpsc channel ===
tokio::spawn({
let tx = tx.clone();
async move {
let mut i = 0u64;
loop {
time::sleep(Duration::from_millis(700)).await;
if tx.send(format!("job-{i}")).await.is_err() {
break; // receiver dropped
}
i += 1;
}
}
});
// === Spawn Ctrl+C handler to trigger shutdown ===
tokio::spawn({
let shutdown_tx = shutdown_tx.clone();
async move {
// Wait for SIGINT/Ctrl+C and notify all listeners
let _ = signal::ctrl_c().await;
let _ = shutdown_tx.send(());
}
});
println!("listening on 127.0.0.1:8080 — press Ctrl+C to stop");
// OPTIONAL: an "idle" timer that fires if nothing else happens for 5s.
// Because `sleep` is not cancellation-safe if recreated each loop, we pin one and reset it after firing.
let mut idle = time::sleep(Duration::from_secs(5));
tokio::pin!(idle);
// === Event loop ===
loop {
tokio::select! {
// 1) New TCP connection arrived
accept_res = listener.accept() => {
match accept_res {
Ok((mut sock, addr)) => {
println!("[net] connection from {addr}");
// Handle the socket without blocking other events
tokio::spawn(async move {
let _ = sock.write_all(b"hello from tokio\n").await;
let mut buf = [0u8; 1024];
loop {
match sock.read(&mut buf).await {
Ok(0) => break, // closed
Ok(n) => {
// simple echo
if sock.write_all(&buf[..n]).await.is_err() {
break;
}
}
Err(_) => break,
}
}
});
// Reset idle timer because we just handled an event
idle.as_mut().reset(time::Instant::now() + Duration::from_secs(5));
}
Err(err) => {
eprintln!("[net] accept error: {err:?}");
}
}
}
// 2) Periodic tick
_ = ticker.tick() => {
println!("[tick] {}", chrono::Local::now().format("%H:%M:%S"));
idle.as_mut().reset(time::Instant::now() + Duration::from_secs(5));
}
// 3) Application message from mpsc producer
Some(msg) = rx.recv() => {
println!("[mpsc] received {msg}");
// ...do some useful work here...
idle.as_mut().reset(time::Instant::now() + Duration::from_secs(5));
}
// 4) Idle timer elapsed (no other events for a while)
_ = &mut idle => {
println!("[idle] nothing happened for 5s");
// Arm it again
idle.as_mut().reset(time::Instant::now() + Duration::from_secs(5));
}
// 5) Global shutdown broadcast
_ = shutdown_rx.recv() => {
println!("[shutdown] received signal, exiting event loop");
break;
}
}
}
// Notify any background tasks (if not already) and give them a moment to finish
let _ = shutdown_tx.send(());
time::sleep(Duration::from_millis(200)).await;
Ok(())
}
// Dependencies:
//
// [dependencies]
// tokio = { version = "1", features = ["full"] }
// eyre = "0.6"
// color-eyre = "0.6"
// chrono = { version = "0.4", default-features = false, features = ["clock"] }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment