Skip to content

Instantly share code, notes, and snippets.

@kotobukid
Created December 10, 2024 13:41
Show Gist options
  • Select an option

  • Save kotobukid/0f586b1921dbd752e3f9d6fcdfc24341 to your computer and use it in GitHub Desktop.

Select an option

Save kotobukid/0f586b1921dbd752e3f9d6fcdfc24341 to your computer and use it in GitHub Desktop.
任意の処理を挟むチャンネル
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task;
fn process_channel<T, F>(
buffer_size: usize, // チャンネルのバッファサイズ
processor: F, // データ処理用の関数ポインタ/クロージャ
) -> (Sender<T>, Receiver<T>)
where
T: Send + 'static,
F: Fn(T) -> T + Send + Sync + 'static,
{
// 最初の (tx, rx) ペアを作成
let (tx, rx) = mpsc::channel::<T>(buffer_size);
// 処理済み用の (tx_processed, rx_processed) ペアを作成
let (tx_processed, rx_processed) = mpsc::channel::<T>(buffer_size);
// 中間処理を別スレッドで起動
task::spawn(async move {
pipe_with_processing(rx, tx_processed, processor).await;
});
// 最初の送信部分(tx)と最終的な受信部分(rx_processed)を返す
(tx, rx_processed)
}
async fn pipe_with_processing<T, F>(mut rx: Receiver<T>, tx: Sender<T>, mut processor: F)
where
T: Send + 'static,
F: FnMut(T) -> T + Send + 'static,
{
while let Some(data) = rx.recv().await {
let processed = processor(data); // 加工処理
if tx.send(processed).await.is_err() {
break; // チャンネルが閉じられたら終了
}
}
}
fn x2(x: i32) -> i32 {
x * 2
}
#[tokio::main]
async fn main() {
// 関数を渡す処理(ここでは値を2倍にする)
let (tx, mut rx) = process_channel(32, x2);
// クロージャを関数ポインタとして渡す処理(ここでは値を10倍にする)
// let (tx, mut rx) = process_channel(32, |x: i32| x * 10);
// データ送信
tokio::spawn(async move {
for i in 1..=5 {
if tx.send(i).await.is_err() {
break;
}
}
});
// 処理済みデータを受信
while let Some(data) = rx.recv().await {
println!("Processed: {}", data);
}
println!("Exit");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment