Лучший способ в .NET управлять очередью задач на отдельном (одном) streamе

Я знаю, что за эти годы асинхронное программирование сильно изменилось. Я несколько смущен, что позволил себе получить этот ржавый всего лишь 34 года, но я рассчитываю на StackOverflow, чтобы довести меня до скорости.

То, что я пытаюсь сделать, это управлять очередью «работы» в отдельном streamе, но таким образом, что обрабатывается только один элемент за раз. Я хочу опубликовать работу над этим streamом, и ему не нужно передавать что-либо обратно вызывающему. Конечно, я мог бы просто развернуть новый объект Thread и связать его с общим объектом Queue , используя спящие, прерывания, дескрипторы ожидания и т. Д. Но я знаю, что с тех пор все стало лучше. У нас есть BlockingCollection , Task , async / await , не говоря уже о пакетах NuGet, которые, вероятно, многое абстрагируют.

Я знаю, что вопросы «Что является лучшим …», как правило, не одобряются, поэтому я буду перефразировать его, сказав «Что такое рекомендуемый …» способ сделать что-то подобное, используя предпочтительные механизмы .NET. Но если сторонний пакет NuGet упрощает работу, это так же хорошо.

Я считал экземпляр TaskScheduler с фиксированным максимальным параллелизмом в 1, но, похоже, на данный момент существует, вероятно, гораздо менее неуклюжий способ сделать это.

Задний план

В частности, то, что я пытаюсь сделать в этом случае, – это очередь задачи геолокации IP во время веб-запроса. Один и тот же IP-адрес может несколько раз оказаться в очереди на геолокацию, но задача будет знать, как его обнаружить и выпустить раньше, если оно уже разрешено. Но обработчик запроса просто собирается бросить эти () => LocateAddress(context.Request.UserHostAddress) вызовы в очередь и позволить методу LocateAddress обрабатывать LocateAddress обнаружение работы. API-интерфейс геолокации, который я использую, не любит бомбардировать запросы, поэтому я хочу ограничить его одной одновременной задачей одновременно. Однако было бы неплохо, если бы этот подход позволил легко масштабировать до более параллельных задач с простым изменением параметров.

    Чтобы создать асинхронную единую степень параллелизма в очереди, вы можете просто создать SemaphoreSlim , инициализированный одним, а затем использовать метод enqueing при приобретении этого семафора перед началом запрошенной работы.

     public class TaskQueue { private SemaphoreSlim semaphore; public TaskQueue() { semaphore = new SemaphoreSlim(1); } public async Task Enqueue(Func> taskGenerator) { await semaphore.WaitAsync(); try { return await taskGenerator(); } finally { semaphore.Release(); } } public async Task Enqueue(Func taskGenerator) { await semaphore.WaitAsync(); try { await taskGenerator(); } finally { semaphore.Release(); } } } 

    Конечно, чтобы иметь фиксированную степень параллелизма, отличную от одной, просто инициализировать семафор другому ряду.

    Ваш лучший вариант, как я вижу, это использовать ActionBlock TPL Dataflow ActionBlock :

     var actionBlock = new ActionBlock(address => { if (!IsDuplicate(address)) { LocateAddress(address); } }); actionBlock.Post(context.Request.UserHostAddress); 

    TPL Dataflow – это надежная, streamобезопасная, async и очень настраиваемая структура, основанная на актерах (доступная как nuget)

    Вот простой пример для более сложного случая. Предположим, вы хотите:

    • Включить параллелизм (ограничено доступными ядрами).
    • Ограничьте размер очереди (чтобы у вас не хватило памяти).
    • И как LocateAddress и вставка очереди будут async .
    • Отмените все через час.
     var actionBlock = new ActionBlock(async address => { if (!IsDuplicate(address)) { await LocateAddressAsync(address); } }, new ExecutionDataflowBlockOptions { BoundedCapacity = 10000, MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token }); await actionBlock.SendAsync(context.Request.UserHostAddress); 

    Используйте BlockingCollection чтобы создать шаблон производителя / потребителя с одним потребителем (только одна вещь, работающая одновременно, как вы хотите) и один или несколько производителей.

    Сначала определите общую очередь:

     BlockingCollection queue = new BlockingCollection(); 

    В вашей потребительской Thread или Task вы берете из нее:

     //This will block until there's an item available Action itemToRun = queue.Take() 

    Затем от любого количества производителей на других streamах просто добавьте в очередь:

     queue.Add(() => LocateAddress(context.Request.UserHostAddress)); 

    На самом деле вам не нужно запускать задачи в одном streamе, вам нужно, чтобы они запускались последовательно (один за другим) и FIFO. У TPL нет classа для этого, но вот моя очень легкая реализация с тестами. https://github.com/Gentlee/SerialQueue

    Также есть реализация @Servy, тесты показывают, что он в два раза медленнее, чем мой, и он не гарантирует FIFO.

    Пример:

     private readonly SerialQueue queue = new SerialQueue(); async Task SomeAsyncMethod() { var result = await queue.Enqueue(DoSomething); } 
    Давайте будем гением компьютера.