Секвенирование и реорганизация задач
У меня есть следующий сценарий, который, я думаю, может быть довольно распространенным:
-
Существует задача (обработчик команд UI), которая может выполняться синхронно или асинхронно.
-
Команды могут поступать быстрее, чем они обрабатываются.
-
Если для команды уже есть ожидающая задача, задача нового обработчика команд должна быть поставлена в очередь и обрабатываться последовательно.
-
Результат каждой новой задачи может зависеть от результата предыдущей задачи.
Отмена должна быть соблюдена, но я хотел бы оставить ее вне сферы применения этого вопроса для простоты. Кроме того, безопасность streamов (параллелизм) не является обязательным требованием, но требуется поддержка повторного входа.
Вот основной пример того, чего я пытаюсь достичь (как консольное приложение, для простоты):
using System; using System.Threading.Tasks; namespace ConsoleApp { class Program { static void Main(string[] args) { var asyncOp = new AsyncOp(); Func<int, Task> handleAsync = async (arg) => { Console.WriteLine("this task arg: " + arg); //await Task.Delay(arg); // make it async return await Task.FromResult(arg); // sync }; Console.WriteLine("Test #1..."); asyncOp.RunAsync(() => handleAsync(1000)); asyncOp.RunAsync(() => handleAsync(900)); asyncOp.RunAsync(() => handleAsync(800)); asyncOp.CurrentTask.Wait(); Console.WriteLine("\nPress any key to continue to test #2..."); Console.ReadLine(); asyncOp.RunAsync(() => { asyncOp.RunAsync(() => handleAsync(200)); return handleAsync(100); }); asyncOp.CurrentTask.Wait(); Console.WriteLine("\nPress any key to exit..."); Console.ReadLine(); } // AsyncOp class AsyncOp { Task _pending = Task.FromResult(default(T)); public Task CurrentTask { get { return _pending; } } public Task RunAsync(Func<Task> handler) { var pending = _pending; Func<Task> wrapper = async () => { // await the prev task var prevResult = await pending; Console.WriteLine("\nprev task result: " + prevResult); // start and await the handler return await handler(); }; _pending = wrapper(); return _pending; } } } }
Выход:
Тест № 1 ... предыдущая задача: 0 эта задача arg: 1000 результат предыдущей задачи: 1000 эта задача arg: 900 предыдущая задача: 900 эта задача arg: 800 Нажмите любую клавишу, чтобы продолжить тестирование # 2 ... предыдущая задача задачи: 800 предыдущая задача задачи: 800 эта задача arg: 200 эта задача arg: 100 Нажмите любую клавишу для выхода...
Он работает в соответствии с требованиями, пока в тест № 2 не будет введено повторное включение:
asyncOp.RunAsync(() => { asyncOp.RunAsync(() => handleAsync(200)); return handleAsync(100); });
Желаемый результат должен быть 100
, 200
, а не 200
, 100
, потому что уже существует ожидающая внешняя задача на 100
. Это, очевидно, потому, что внутренняя задача выполняется синхронно, ломая логику var pending = _pending; /* ... */ _pending = wrapper()
var pending = _pending; /* ... */ _pending = wrapper()
для внешней задачи.
Как заставить его работать для теста №2?
Одним из решений было бы обеспечить асинхронность для каждой задачи с помощью Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext()
. Однако я не хочу накладывать асинхронное выполнение на обработчики команд, которые могут быть синхронны внутри. , Я не хочу зависеть от поведения какого-либо конкретного контекста синхронизации (т. Task.Factory.StartNew
на то, что Task.Factory.StartNew
должен вернуться до того, как созданная задача была фактически запущена).
В реальном проекте я отвечаю за то, что AsyncOp
выше, но не имеет никакого контроля над обработчиками команд (то есть, что бы внутри handleAsync
).
- Отмена ожидающей задачи синхронно в streamе пользовательского интерфейса
- Должны ли мы переключиться на использование асинхронного ввода-вывода по умолчанию?
- Вызов синхронного асинхронного метода
- Разница между TPL и async / await (Обработка streamов)
- Использовать Task.Run () в синхронном методе, чтобы избежать тупиковой остановки в асинхронном методе?
- Выполнение задач параллельно
- Запуск задач в foreach Loop использует значение последнего элемента
- Как ограничить количество одновременных операций асинхронного ввода-вывода?
Я почти забыл, что можно создать Task
вручную, без запуска или планирования. Затем «Task.Factory.StartNew» и «новая задача (…). Начните« верните меня в нужное русло ». Я думаю, что это один из тех немногих случаев, когда конструктор Task
может быть полезен вместе с вложенными задачами ( Task
) и Task.Unwrap()
:
// AsyncOp class AsyncOp { Task _pending = Task.FromResult(default(T)); public Task CurrentTask { get { return _pending; } } public Task RunAsync(Func> handler, bool useSynchronizationContext = false) { var pending = _pending; Func> wrapper = async () => { // await the prev task var prevResult = await pending; Console.WriteLine("\nprev task result: " + prevResult); // start and await the handler return await handler(); }; var task = new Task>(wrapper); var inner = task.Unwrap(); _pending = inner; task.RunSynchronously(useSynchronizationContext ? TaskScheduler.FromCurrentSynchronizationContext() : TaskScheduler.Current); return inner; } }
Выход:
Тест № 1 ... предыдущая задача: 0 эта задача arg: 1000 результат предыдущей задачи: 1000 эта задача arg: 900 предыдущая задача: 900 эта задача arg: 800 Нажмите любую клавишу, чтобы продолжить тестирование # 2 ... предыдущая задача задачи: 800 эта задача arg: 100 предыдущая задача: 100 эта задача arg: 200
Теперь также очень легко сделать AsyncOp
streamобезопасным, добавив lock
для защиты _pending
, если это необходимо.
Обновлено ниже – это самая последняя версия этого шаблона, которая использует TaskCompletionSource
и является streamобезопасной:
/// /// AsyncOperation /// By Noseratio - http://stackoverflow.com/a/21427264 /// /// Task result type class AsyncOperation { readonly object _lock = new Object(); Task _currentTask = null; CancellationTokenSource _currentCts = null; // a client of this class (eg a ViewModel) has an option // to handle TaskSucceeded or TaskFailed, if needed public event EventHandler TaskSucceeded = null; public event EventHandler TaskFailing = null; public Task CurrentTask { get { lock (_lock) return _currentTask; } } public bool IsCurrent(Task task) { lock (_lock) return task == _currentTask; } public bool IsPending { get { lock (_lock) return _currentTask != null && !_currentTask.IsCompleted; } } public bool IsCancellationRequested { get { lock (_lock) return _currentCts != null && _currentCts.IsCancellationRequested; } } public void Cancel() { lock (_lock) { if (_currentTask != null && !_currentTask.IsCompleted) _currentCts.Cancel(); } } /// /// Start the task routine and observe the result of the previous task routine /// /// /// /// /// public Task StartAsync( Func> routine, CancellationToken token, bool cancelPrevious = true, bool throwImmediately = true) { Task previousTask = null; // pending instance CancellationTokenSource previousCts = null; // pending instance CTS CancellationTokenSource thisCts = CancellationTokenSource.CreateLinkedTokenSource(token); TaskCompletionSource thisTcs = new TaskCompletionSource (); // this task CancellationToken thisToken; // this task's cancellation Token Task routineTask = null; // as returned by routine lock (_lock) { // remember the _currentTask as previousTask previousTask = _currentTask; previousCts = _currentCts; thisToken = thisCts.Token; // set the new _currentTask _currentTask = thisTcs.Task; _currentCts = thisCts; } Action startAsync = async () => { // because startAsync is "async void" method, // any exception not handled inside it // will be immediately thrown on the current synchronization context, // more details: http://stackoverflow.com/a/22395161/1768303 // run and await this task try { // await the previous task instance if (previousTask != null) { if (cancelPrevious) previousCts.Cancel(); try { await previousTask; } catch (OperationCanceledException) { // ignore previous cancellations } } thisToken.ThrowIfCancellationRequested(); routineTask = routine(thisToken); await routineTask; } catch (Exception ex) { // ignore cancellation if (ex is OperationCanceledException) { System.Diagnostics.Debug.Print("Task cancelled, id={0}", thisTcs.Task.Id); thisTcs.SetCanceled(); return; } // fire TaskFailing System.Diagnostics.Debug.Print("Task failing, id={0}", thisTcs.Task.Id); if (this.TaskFailing != null) { var args = new TaskEventArgs(thisTcs.Task, ex); this.TaskFailing(this, args); if (args.Handled) { // exception handled // make thisTcs cancelled rather than faulted thisTcs.SetCanceled(); return; } } // exception unhandled thisTcs.SetException(ex); if (throwImmediately) throw; // rethrow on the current synchronization context // exception should be observed via CurrentTask.Exception return; } // success, fire TaskSucceeded System.Diagnostics.Debug.Print("Task succeded, id={0}", thisTcs.Task.Id); thisTcs.SetResult(routineTask.Result); if (this.TaskSucceeded != null) this.TaskSucceeded(this, new TaskEventArgs(thisTcs.Task)); }; startAsync(); return thisTcs.Task; } // StartAsync with CancellationToken.None public Task StartAsync( Func> routine, bool cancelPrevious = true, bool throwImmediately = true) { return StartAsync(routine, CancellationToken.None, cancelPrevious: true, throwImmediately: true); } /// /// TaskEventArgs /// public class TaskEventArgs : EventArgs { public Task Task { get; private set; } public Exception Exception { get; private set; } public bool Handled { get; set; } public TaskEventArgs(Task task, Exception exception = null) { this.Task = task; this.Exception = exception; } } }