-
-
Save alamb/bd0e086448ef9b438aeebd6f550e23ed to your computer and use it in GitHub Desktop.
| // Please find the full, tested version in | |
| // https://github.com/influxdata/influxdb_iox/blob/fe155e15fb2ad166aee66b0458e63c24a8128dd4/query/src/exec/task.rs#L101-L118 | |
| pub struct DedicatedExecutor { | |
| state: Arc<Mutex<State>>, | |
| } | |
| /// Runs futures (and any `tasks` that are `tokio::task::spawned` by | |
| /// them) on a separate Tokio Executor | |
| struct State { | |
| /// Channel for requests -- the dedicated executor takes requests | |
| /// from here and runs them. | |
| requests: Option<std::sync::mpsc::Sender<Task>>, | |
| /// Thread which has a different Tokio runtime | |
| /// installed and spawns tasks there | |
| thread: Option<std::thread::JoinHandle<()>>, | |
| } | |
| impl DedicatedExecutor { | |
| /// Creates a new `DedicatedExecutor` with a dedicated Tokio | |
| /// executor that is separate from the threadpool created via | |
| /// `[tokio::main]`. | |
| pub fn new(thread_name: &str, num_threads: usize) -> Self { | |
| let thread_name = thread_name.to_string(); | |
| let (tx, rx) = std::sync::mpsc::channel::<Task>(); | |
| let thread = std::thread::spawn(move || { | |
| // Create a new Runtime to run tasks | |
| let runtime = tokio::runtime::Builder::new_multi_thread() | |
| .enable_all() | |
| .thread_name(&thread_name) | |
| .worker_threads(num_threads) | |
| // Lower OS priority of worker threads to prioritize main runtime | |
| .on_thread_start(move || set_current_thread_priority_low()) | |
| .build() | |
| .expect("Creating Tokio runtime"); | |
| // Pull task requests off the channel and send them to the executor | |
| runtime.block_on(async move { | |
| while let Ok(task) = rx.recv() { | |
| tokio::task::spawn(async move { | |
| task.run().await; | |
| }); | |
| } | |
| let state = State { | |
| requests: Some(tx), | |
| thread: Some(thread), | |
| }; | |
| Self { | |
| state: Arc::new(Mutex::new(state)), | |
| } | |
| } | |
| /// Runs the specified Future (and any tasks it spawns) on the | |
| /// `DedicatedExecutor`. | |
| pub fn spawn<T>(&self, task: T) -> Job<T::Output> | |
| where | |
| T: Future + Send + 'static, | |
| T::Output: Send + 'static, | |
| { | |
| let (tx, rx) = tokio::sync::oneshot::channel(); | |
| let fut = Box::pin(async move { | |
| let task_output = task.await; | |
| tx.send(task_output).ok() | |
| }); | |
| let mut state = self.state.lock(); | |
| let task = Task { | |
| fut, | |
| }; | |
| if let Some(requests) = &mut state.requests { | |
| // would fail if someone has started shutdown | |
| requests.send(task).ok(); | |
| } else { | |
| warn!("tried to schedule task on an executor that was shutdown"); | |
| } | |
| Job { rx, cancel } | |
| } | |
| #[pin_project(PinnedDrop)] | |
| pub struct Job<T> { | |
| #[pin] | |
| rx: Receiver<T>, | |
| } | |
| impl<T> Future for Job<T> { | |
| type Output = Result<T, Error>; | |
| fn poll( | |
| self: Pin<&mut Self>, | |
| cx: &mut std::task::Context<'_>, | |
| ) -> std::task::Poll<Self::Output> { | |
| let this = self.project(); | |
| this.rx.poll(cx) | |
| } | |
| } |
Thanks a lot, I learned something. Notably the spawn_blocking docs mentions rayon, and because it mentions it right after spawn_blocking:
Hint: If using rayon, you can use a oneshot channel to send the result back to Tokio when the rayon task finishes.
I thought it meant that one would run the rayon task inside the spawn_blocking using one shot channel to talk back.
It seems like spawn_blocking is not required at all to run rayon tasks in Tokio. Only thing needed is oneshot channel and rayon::spawn...
Just noting you can see ^that in tokio-rayon: https://github.com/andybarron/tokio-rayon/blob/main/src/global.rs#L26
Is there any reason why don't use the max_blocking_threads operation when configuring your default Tokio Runtime?
I'm using this approach and spawn all blocking tasks through tokio::task::spawn_blocking here, but I'm wondering if I'm missing something.
I do not know of any reason to avoid max_blocking_threads
@alamb , so you think it's a viable alternative for the dedicated executor?
If the proposal is to use tokio::task::spawn_blocking instead of a dedicated executor, that is also a possibility but it comes with tradeoffs:
- Creating new threads is expensive (it requires several sys calls and allocations) compared to scheduling a new task on an existing executor
- As the number of threads increases past the available cores, the overhead of context switching as the OS does the scheduling becomes substantial as well
@Ciantic please ref https://ryhl.io/blog/async-what-is-blocking/. No need use rayon in said spawn_block call, which use tokio runtime. just only use the rayon runtime/threadpool.