Created
December 10, 2024 13:41
-
-
Save kotobukid/0f586b1921dbd752e3f9d6fcdfc24341 to your computer and use it in GitHub Desktop.
任意の処理を挟むチャンネル
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use tokio::sync::mpsc::{self, Receiver, Sender}; | |
| use tokio::task; | |
| fn process_channel<T, F>( | |
| buffer_size: usize, // チャンネルのバッファサイズ | |
| processor: F, // データ処理用の関数ポインタ/クロージャ | |
| ) -> (Sender<T>, Receiver<T>) | |
| where | |
| T: Send + 'static, | |
| F: Fn(T) -> T + Send + Sync + 'static, | |
| { | |
| // 最初の (tx, rx) ペアを作成 | |
| let (tx, rx) = mpsc::channel::<T>(buffer_size); | |
| // 処理済み用の (tx_processed, rx_processed) ペアを作成 | |
| let (tx_processed, rx_processed) = mpsc::channel::<T>(buffer_size); | |
| // 中間処理を別スレッドで起動 | |
| task::spawn(async move { | |
| pipe_with_processing(rx, tx_processed, processor).await; | |
| }); | |
| // 最初の送信部分(tx)と最終的な受信部分(rx_processed)を返す | |
| (tx, rx_processed) | |
| } | |
| async fn pipe_with_processing<T, F>(mut rx: Receiver<T>, tx: Sender<T>, mut processor: F) | |
| where | |
| T: Send + 'static, | |
| F: FnMut(T) -> T + Send + 'static, | |
| { | |
| while let Some(data) = rx.recv().await { | |
| let processed = processor(data); // 加工処理 | |
| if tx.send(processed).await.is_err() { | |
| break; // チャンネルが閉じられたら終了 | |
| } | |
| } | |
| } | |
| fn x2(x: i32) -> i32 { | |
| x * 2 | |
| } | |
| #[tokio::main] | |
| async fn main() { | |
| // 関数を渡す処理(ここでは値を2倍にする) | |
| let (tx, mut rx) = process_channel(32, x2); | |
| // クロージャを関数ポインタとして渡す処理(ここでは値を10倍にする) | |
| // let (tx, mut rx) = process_channel(32, |x: i32| x * 10); | |
| // データ送信 | |
| tokio::spawn(async move { | |
| for i in 1..=5 { | |
| if tx.send(i).await.is_err() { | |
| break; | |
| } | |
| } | |
| }); | |
| // 処理済みデータを受信 | |
| while let Some(data) = rx.recv().await { | |
| println!("Processed: {}", data); | |
| } | |
| println!("Exit"); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment