Skip to content

Instantly share code, notes, and snippets.

@vlapenkov
Last active April 23, 2025 13:16
Show Gist options
  • Select an option

  • Save vlapenkov/27e33302cc7e14a87757ca94ce418c3a to your computer and use it in GitHub Desktop.

Select an option

Save vlapenkov/27e33302cc7e14a87757ca94ce418c3a to your computer and use it in GitHub Desktop.
TaskThrottling выполняют любые N задач в M потоках и возвращает результат в порядок, даже с Exception
Func<int, Task<string>> newAction = async (item) =>
{
Console.WriteLine($"Task #{item} started");
string content;
using (HttpClient client = new())
{
using HttpResponseMessage result = await client.GetAsync("https://ya.ru");
content = await result.Content.ReadAsStringAsync();
// пример Exception на 3-ем элементе
if (item == 3)
{
throw new Exception("something get wrong");
}
}
//await Task.Delay(item * 1000);
Console.WriteLine($"Task #{item} completed");
return content[..100];
};
try
{
var service = new TasksHelper<string>();
results = await service.RunThreads(newAction, 10, 5, string.Empty);
}
catch (Exception e)
{
Console.WriteLine(e);
}
Console.WriteLine(results);
namespace My.TaskThrottling
{
internal class TasksHelper<TResult>
{
public class TaskResult<TResult>
{
public TResult Result { get; set; }
public int Index { get; set; }
}
public Task<TaskResult<TResult>> WhenAny(Dictionary<int, Task<TResult>> taskDict, TResult defaultError = default(TResult))
{
var taskCompletionSource = new TaskCompletionSource<TaskResult<TResult>>();
foreach (var dictItem in taskDict)
{
dictItem.Value.ContinueWith(x =>
{
if (x.Status == TaskStatus.RanToCompletion)
{
taskCompletionSource.TrySetResult(new TaskResult<TResult>
{
Index = dictItem.Key,
Result = x.Result
});
}
if (x.Status == TaskStatus.Faulted)
{
taskCompletionSource.TrySetResult(new TaskResult<TResult>
{
Index = dictItem.Key,
Result = defaultError
});
}
}
);
}
return taskCompletionSource.Task;
}
public async Task<TResult[]> RunThreads(Func<int, Task<TResult>> DoSomething, int totalThreads, int throttle, TResult defaultError = default(TResult)!)
{
TResult[] results = new TResult[totalThreads];
Dictionary<int, Task<TResult>> dict = new Dictionary<int, Task<TResult>>();
for (int n = 0; n < totalThreads; n++)
{
Task<TResult> task = DoSomething(n);
dict.Add(n, task);
if (dict.Count == throttle)
{
var taskResult = await WhenAny(dict, defaultError);
results[taskResult.Index] = taskResult.Result;
dict.Remove(taskResult.Index);
}
Console.WriteLine($"Number of tasks is:{dict.Count}");
}
//return results;
TasksHelper<TResult>.TaskResult<TResult>[] resultsOthers = await Task.WhenAll(dict.Select(async dictItem =>
{
var tr = new TaskResult<TResult>();
try
{
tr.Result = await dictItem.Value;
tr.Index = dictItem.Key;
}
catch (Exception ex) when (ex is OperationCanceledException || ex is TimeoutException)
{
throw; // Propagate cancellation or timeout exceptions
}
catch (Exception ex)
{
tr.Result = defaultError;
tr.Index = dictItem.Key;
}
return tr;
}
));
foreach (var item in resultsOthers)
{
results[item.Index] = item.Result;
}
return results;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment