Last active
April 23, 2025 13:16
-
-
Save vlapenkov/27e33302cc7e14a87757ca94ce418c3a to your computer and use it in GitHub Desktop.
TaskThrottling выполняют любые N задач в M потоках и возвращает результат в порядок, даже с Exception
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Func<int, Task<string>> newAction = async (item) => | |
| { | |
| Console.WriteLine($"Task #{item} started"); | |
| string content; | |
| using (HttpClient client = new()) | |
| { | |
| using HttpResponseMessage result = await client.GetAsync("https://ya.ru"); | |
| content = await result.Content.ReadAsStringAsync(); | |
| // пример Exception на 3-ем элементе | |
| if (item == 3) | |
| { | |
| throw new Exception("something get wrong"); | |
| } | |
| } | |
| //await Task.Delay(item * 1000); | |
| Console.WriteLine($"Task #{item} completed"); | |
| return content[..100]; | |
| }; | |
| try | |
| { | |
| var service = new TasksHelper<string>(); | |
| results = await service.RunThreads(newAction, 10, 5, string.Empty); | |
| } | |
| catch (Exception e) | |
| { | |
| Console.WriteLine(e); | |
| } | |
| Console.WriteLine(results); | |
| namespace My.TaskThrottling | |
| { | |
| internal class TasksHelper<TResult> | |
| { | |
| public class TaskResult<TResult> | |
| { | |
| public TResult Result { get; set; } | |
| public int Index { get; set; } | |
| } | |
| public Task<TaskResult<TResult>> WhenAny(Dictionary<int, Task<TResult>> taskDict, TResult defaultError = default(TResult)) | |
| { | |
| var taskCompletionSource = new TaskCompletionSource<TaskResult<TResult>>(); | |
| foreach (var dictItem in taskDict) | |
| { | |
| dictItem.Value.ContinueWith(x => | |
| { | |
| if (x.Status == TaskStatus.RanToCompletion) | |
| { | |
| taskCompletionSource.TrySetResult(new TaskResult<TResult> | |
| { | |
| Index = dictItem.Key, | |
| Result = x.Result | |
| }); | |
| } | |
| if (x.Status == TaskStatus.Faulted) | |
| { | |
| taskCompletionSource.TrySetResult(new TaskResult<TResult> | |
| { | |
| Index = dictItem.Key, | |
| Result = defaultError | |
| }); | |
| } | |
| } | |
| ); | |
| } | |
| return taskCompletionSource.Task; | |
| } | |
| public async Task<TResult[]> RunThreads(Func<int, Task<TResult>> DoSomething, int totalThreads, int throttle, TResult defaultError = default(TResult)!) | |
| { | |
| TResult[] results = new TResult[totalThreads]; | |
| Dictionary<int, Task<TResult>> dict = new Dictionary<int, Task<TResult>>(); | |
| for (int n = 0; n < totalThreads; n++) | |
| { | |
| Task<TResult> task = DoSomething(n); | |
| dict.Add(n, task); | |
| if (dict.Count == throttle) | |
| { | |
| var taskResult = await WhenAny(dict, defaultError); | |
| results[taskResult.Index] = taskResult.Result; | |
| dict.Remove(taskResult.Index); | |
| } | |
| Console.WriteLine($"Number of tasks is:{dict.Count}"); | |
| } | |
| //return results; | |
| TasksHelper<TResult>.TaskResult<TResult>[] resultsOthers = await Task.WhenAll(dict.Select(async dictItem => | |
| { | |
| var tr = new TaskResult<TResult>(); | |
| try | |
| { | |
| tr.Result = await dictItem.Value; | |
| tr.Index = dictItem.Key; | |
| } | |
| catch (Exception ex) when (ex is OperationCanceledException || ex is TimeoutException) | |
| { | |
| throw; // Propagate cancellation or timeout exceptions | |
| } | |
| catch (Exception ex) | |
| { | |
| tr.Result = defaultError; | |
| tr.Index = dictItem.Key; | |
| } | |
| return tr; | |
| } | |
| )); | |
| foreach (var item in resultsOthers) | |
| { | |
| results[item.Index] = item.Result; | |
| } | |
| return results; | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment