Секвенирование и реорганизация задач

У меня есть следующий сценарий, который, я думаю, может быть довольно распространенным:

  1. Существует задача (обработчик команд UI), которая может выполняться синхронно или асинхронно.

  2. Команды могут поступать быстрее, чем они обрабатываются.

  3. Если для команды уже есть ожидающая задача, задача нового обработчика команд должна быть поставлена ​​в очередь и обрабатываться последовательно.

  4. Результат каждой новой задачи может зависеть от результата предыдущей задачи.

Отмена должна быть соблюдена, но я хотел бы оставить ее вне сферы применения этого вопроса для простоты. Кроме того, безопасность streamов (параллелизм) не является обязательным требованием, но требуется поддержка повторного входа.

Вот основной пример того, чего я пытаюсь достичь (как консольное приложение, для простоты):

using System; using System.Threading.Tasks; namespace ConsoleApp { class Program { static void Main(string[] args) { var asyncOp = new AsyncOp(); Func<int, Task> handleAsync = async (arg) => { Console.WriteLine("this task arg: " + arg); //await Task.Delay(arg); // make it async return await Task.FromResult(arg); // sync }; Console.WriteLine("Test #1..."); asyncOp.RunAsync(() => handleAsync(1000)); asyncOp.RunAsync(() => handleAsync(900)); asyncOp.RunAsync(() => handleAsync(800)); asyncOp.CurrentTask.Wait(); Console.WriteLine("\nPress any key to continue to test #2..."); Console.ReadLine(); asyncOp.RunAsync(() => { asyncOp.RunAsync(() => handleAsync(200)); return handleAsync(100); }); asyncOp.CurrentTask.Wait(); Console.WriteLine("\nPress any key to exit..."); Console.ReadLine(); } // AsyncOp class AsyncOp { Task _pending = Task.FromResult(default(T)); public Task CurrentTask { get { return _pending; } } public Task RunAsync(Func<Task> handler) { var pending = _pending; Func<Task> wrapper = async () => { // await the prev task var prevResult = await pending; Console.WriteLine("\nprev task result: " + prevResult); // start and await the handler return await handler(); }; _pending = wrapper(); return _pending; } } } } 

Выход:

 Тест № 1 ...

 предыдущая задача: 0
 эта задача arg: 1000

 результат предыдущей задачи: 1000
 эта задача arg: 900

 предыдущая задача: 900
 эта задача arg: 800

 Нажмите любую клавишу, чтобы продолжить тестирование # 2 ...


 предыдущая задача задачи: 800

 предыдущая задача задачи: 800
 эта задача arg: 200
 эта задача arg: 100

 Нажмите любую клавишу для выхода...

Он работает в соответствии с требованиями, пока в тест № 2 не будет введено повторное включение:

 asyncOp.RunAsync(() => { asyncOp.RunAsync(() => handleAsync(200)); return handleAsync(100); }); 

Желаемый результат должен быть 100 , 200 , а не 200 , 100 , потому что уже существует ожидающая внешняя задача на 100 . Это, очевидно, потому, что внутренняя задача выполняется синхронно, ломая логику var pending = _pending; /* ... */ _pending = wrapper() var pending = _pending; /* ... */ _pending = wrapper() для внешней задачи.

Как заставить его работать для теста №2?

Одним из решений было бы обеспечить асинхронность для каждой задачи с помощью Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext() . Однако я не хочу накладывать асинхронное выполнение на обработчики команд, которые могут быть синхронны внутри. , Я не хочу зависеть от поведения какого-либо конкретного контекста синхронизации (т. Task.Factory.StartNew на то, что Task.Factory.StartNew должен вернуться до того, как созданная задача была фактически запущена).

В реальном проекте я отвечаю за то, что AsyncOp выше, но не имеет никакого контроля над обработчиками команд (то есть, что бы внутри handleAsync ).

Я почти забыл, что можно создать Task вручную, без запуска или планирования. Затем «Task.Factory.StartNew» и «новая задача (…). Начните« верните меня в нужное русло ». Я думаю, что это один из тех немногих случаев, когда конструктор Task может быть полезен вместе с вложенными задачами ( Task> ) и Task.Unwrap() :

 // AsyncOp class AsyncOp { Task _pending = Task.FromResult(default(T)); public Task CurrentTask { get { return _pending; } } public Task RunAsync(Func> handler, bool useSynchronizationContext = false) { var pending = _pending; Func> wrapper = async () => { // await the prev task var prevResult = await pending; Console.WriteLine("\nprev task result: " + prevResult); // start and await the handler return await handler(); }; var task = new Task>(wrapper); var inner = task.Unwrap(); _pending = inner; task.RunSynchronously(useSynchronizationContext ? TaskScheduler.FromCurrentSynchronizationContext() : TaskScheduler.Current); return inner; } } 

Выход:

 Тест № 1 ...

 предыдущая задача: 0
 эта задача arg: 1000

 результат предыдущей задачи: 1000
 эта задача arg: 900

 предыдущая задача: 900
 эта задача arg: 800

 Нажмите любую клавишу, чтобы продолжить тестирование # 2 ...


 предыдущая задача задачи: 800
 эта задача arg: 100

 предыдущая задача: 100
 эта задача arg: 200

Теперь также очень легко сделать AsyncOp streamобезопасным, добавив lock для защиты _pending , если это необходимо.


Обновлено ниже – это самая последняя версия этого шаблона, которая использует TaskCompletionSource и является streamобезопасной:

 ///  /// AsyncOperation /// By Noseratio - http://stackoverflow.com/a/21427264 ///  /// Task result type class AsyncOperation { readonly object _lock = new Object(); Task _currentTask = null; CancellationTokenSource _currentCts = null; // a client of this class (eg a ViewModel) has an option // to handle TaskSucceeded or TaskFailed, if needed public event EventHandler TaskSucceeded = null; public event EventHandler TaskFailing = null; public Task CurrentTask { get { lock (_lock) return _currentTask; } } public bool IsCurrent(Task task) { lock (_lock) return task == _currentTask; } public bool IsPending { get { lock (_lock) return _currentTask != null && !_currentTask.IsCompleted; } } public bool IsCancellationRequested { get { lock (_lock) return _currentCts != null && _currentCts.IsCancellationRequested; } } public void Cancel() { lock (_lock) { if (_currentTask != null && !_currentTask.IsCompleted) _currentCts.Cancel(); } } ///  /// Start the task routine and observe the result of the previous task routine ///  ///  ///  ///  ///  public Task StartAsync( Func> routine, CancellationToken token, bool cancelPrevious = true, bool throwImmediately = true) { Task previousTask = null; // pending instance CancellationTokenSource previousCts = null; // pending instance CTS CancellationTokenSource thisCts = CancellationTokenSource.CreateLinkedTokenSource(token); TaskCompletionSource thisTcs = new TaskCompletionSource(); // this task CancellationToken thisToken; // this task's cancellation Token Task routineTask = null; // as returned by routine lock (_lock) { // remember the _currentTask as previousTask previousTask = _currentTask; previousCts = _currentCts; thisToken = thisCts.Token; // set the new _currentTask _currentTask = thisTcs.Task; _currentCts = thisCts; } Action startAsync = async () => { // because startAsync is "async void" method, // any exception not handled inside it // will be immediately thrown on the current synchronization context, // more details: http://stackoverflow.com/a/22395161/1768303 // run and await this task try { // await the previous task instance if (previousTask != null) { if (cancelPrevious) previousCts.Cancel(); try { await previousTask; } catch (OperationCanceledException) { // ignore previous cancellations } } thisToken.ThrowIfCancellationRequested(); routineTask = routine(thisToken); await routineTask; } catch (Exception ex) { // ignore cancellation if (ex is OperationCanceledException) { System.Diagnostics.Debug.Print("Task cancelled, id={0}", thisTcs.Task.Id); thisTcs.SetCanceled(); return; } // fire TaskFailing System.Diagnostics.Debug.Print("Task failing, id={0}", thisTcs.Task.Id); if (this.TaskFailing != null) { var args = new TaskEventArgs(thisTcs.Task, ex); this.TaskFailing(this, args); if (args.Handled) { // exception handled // make thisTcs cancelled rather than faulted thisTcs.SetCanceled(); return; } } // exception unhandled thisTcs.SetException(ex); if (throwImmediately) throw; // rethrow on the current synchronization context // exception should be observed via CurrentTask.Exception return; } // success, fire TaskSucceeded System.Diagnostics.Debug.Print("Task succeded, id={0}", thisTcs.Task.Id); thisTcs.SetResult(routineTask.Result); if (this.TaskSucceeded != null) this.TaskSucceeded(this, new TaskEventArgs(thisTcs.Task)); }; startAsync(); return thisTcs.Task; } // StartAsync with CancellationToken.None public Task StartAsync( Func> routine, bool cancelPrevious = true, bool throwImmediately = true) { return StartAsync(routine, CancellationToken.None, cancelPrevious: true, throwImmediately: true); } ///  /// TaskEventArgs ///  public class TaskEventArgs : EventArgs { public Task Task { get; private set; } public Exception Exception { get; private set; } public bool Handled { get; set; } public TaskEventArgs(Task task, Exception exception = null) { this.Task = task; this.Exception = exception; } } } 
  • Когда использовать Task.Delay, когда использовать Thread.Sleep?
  • Почему это асинхронное действие зависает?
  • Каков правильный способ отмены асинхронной операции, которая не принимает CancellationToken?
  • WaitAll vs WhenAll
  • Запуск двух асинхронных задач параллельно и сбор результатов в .NET 4.5
  • Очередь процесса с многопоточным или задачами
  • Выполнение задачи в фоновом режиме в приложении WPF
  • Давайте будем гением компьютера.