Что такое эквивалент async / await сервера ThreadPool?

Я работаю на tcp-сервере, который выглядит примерно так, используя синхронный apis и пул streamов:

TcpListener listener; void Serve(){ while(true){ var client = listener.AcceptTcpClient(); ThreadPool.QueueUserWorkItem(this.HandleConnection, client); //Or alternatively new Thread(HandleConnection).Start(client) } } 

Предполагая, что моя цель состоит в том, чтобы обрабатывать как можно больше параллельных подключений при минимальном использовании ресурсов, похоже, что это будет быстро ограничено количеством доступных streamов. Я подозреваю, что с помощью неблокирующей задачи apis я смогу обрабатывать гораздо больше с меньшим количеством ресурсов.

Мое первоначальное впечатление – это что-то вроде:

 async Task Serve(){ while(true){ var client = await listener.AcceptTcpClientAsync(); HandleConnectionAsync(client); //fire and forget? } } в async Task Serve(){ while(true){ var client = await listener.AcceptTcpClientAsync(); HandleConnectionAsync(client); //fire and forget? } } 

Но мне кажется, что это может вызвать узкие места. Возможно, HandleConnectionAsync займет необычно долгое время, чтобы нажать на первый вызов и остановит основной цикл принятия. Будет ли это использовать только один stream когда-либо, или будет ли время выполнения волшебным образом работать над несколькими streamами по своему усмотрению?

Есть ли способ объединить эти два подхода, так что мой сервер будет использовать точно количество streamов, которые ему нужны для количества активно запущенных задач, но чтобы он не блокировал streamи без необходимости в операциях ввода-вывода?

Существует ли идиоматический способ максимизации пропускной способности в такой ситуации?

Я бы позволил Framework управлять streamовой обработкой и не создавал бы лишних streamов, если бы тесты на профилирование не предполагали, что мне может понадобиться. Особенно, если вызовы внутри HandleConnectionAsync в основном связаны с IO.

В любом случае, если вы хотите выпустить вызывающий stream (диспетчер) в начале HandleConnectionAsync , есть очень простое решение. Вы можете перейти на новый stream от ThreadPool с await Yield() . Это работает, если сервер работает в среде выполнения, которая не имеет никакого контекста синхронизации, установленного в исходном streamе (консольное приложение, служба WCF), что обычно имеет место для TCP-сервера.

[EDITED] Ниже проиллюстрировано это (код изначально отсюда ). Обратите внимание: основной цикл while не создает нити явно:

 using System; using System.Collections.Generic; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; class Program { object _lock = new Object(); // sync lock List _connections = new List(); // pending connections // The core server task private async Task StartListener() { var tcpListener = TcpListener.Create(8000); tcpListener.Start(); while (true) { var tcpClient = await tcpListener.AcceptTcpClientAsync(); Console.WriteLine("[Server] Client has connected"); var task = StartHandleConnectionAsync(tcpClient); // if already faulted, re-throw any error on the calling context if (task.IsFaulted) task.Wait(); } } // Register and handle the connection private async Task StartHandleConnectionAsync(TcpClient tcpClient) { // start the new connection task var connectionTask = HandleConnectionAsync(tcpClient); // add it to the list of pending task lock (_lock) _connections.Add(connectionTask); // catch all errors of HandleConnectionAsync try { await connectionTask; // we may be on another thread after "await" } catch (Exception ex) { // log the error Console.WriteLine(ex.ToString()); } finally { // remove pending task lock (_lock) _connections.Remove(connectionTask); } } // Handle new connection private async Task HandleConnectionAsync(TcpClient tcpClient) { await Task.Yield(); // continue asynchronously on another threads using (var networkStream = tcpClient.GetStream()) { var buffer = new byte[4096]; Console.WriteLine("[Server] Reading from client"); var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length); var request = Encoding.UTF8.GetString(buffer, 0, byteCount); Console.WriteLine("[Server] Client wrote {0}", request); var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server"); await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length); Console.WriteLine("[Server] Response has been written"); } } // The entry point of the console app static void Main(string[] args) { Console.WriteLine("Hit Ctrl-C to exit."); new Program().StartListener().Wait(); } } 

Кроме того, код может выглядеть как ниже, без await Task.Yield() . Обратите внимание: я Task.Run async лямбду в Task.Run , потому что я все еще хочу использовать асинхронные API внутри HandleConnectionAsync и использовать там HandleConnectionAsync :

 // Handle new connection private static Task HandleConnectionAsync(TcpClient tcpClient) { return Task.Run(async () => { using (var networkStream = tcpClient.GetStream()) { var buffer = new byte[4096]; Console.WriteLine("[Server] Reading from client"); var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length); var request = Encoding.UTF8.GetString(buffer, 0, byteCount); Console.WriteLine("[Server] Client wrote {0}", request); var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server"); await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length); Console.WriteLine("[Server] Response has been written"); } }); } 

[UPDATE] Основываясь на комментарии: если это будет код библиотеки, среда выполнения действительно неизвестна и может иметь контекст синхронизации, отличной от настроек по умолчанию. В этом случае я предпочел бы запустить основной цикл сервера в streamе пула (который не имеет никакого контекста синхронизации):

 private static Task StartListener() { return Task.Run(async () => { var tcpListener = TcpListener.Create(8000); tcpListener.Start(); while (true) { var tcpClient = await tcpListener.AcceptTcpClientAsync(); Console.WriteLine("[Server] Client has connected"); var task = StartHandleConnectionAsync(tcpClient); // if already faulted, re-throw any error on the calling context if (task.IsFaulted) task.Wait(); } }); } 

Таким образом, все дочерние задачи, созданные внутри StartListener , не будут затронуты контекстом синхронизации кода клиента. Поэтому мне не нужно было бы явно Task.ConfigureAwait(false) где-нибудь.

Существующие ответы правильно предложили использовать Task.Run(() => HandleConnection(client)); , но не объяснил почему.

Вот почему: Вы обеспокоены тем, что HandleConnectionAsync может занять некоторое время, чтобы попасть в первый раз. Если вы придерживаетесь использования async IO (как и в этом случае), это означает, что HandleConnectionAsync выполняет работу с ЦП без каких-либо блокировок. Это идеальный случай для пула streamов. Он предназначен для выполнения короткой, неблокирующей работы ЦП.

И вы правы, что цикл accept будет дросселирован HandleConnectionAsync который HandleConnectionAsync много времени, прежде чем вернуться (возможно, потому, что в нем есть значительная работа с ЦП). Этого следует избегать, если вам нужна высокая частота новых соединений.

Если вы уверены, что нет существенной работы по дросселированию цикла, вы можете сохранить дополнительную Task пула streamов и не делать этого.

Кроме того, вы можете одновременно запускать несколько принимающих команд. Заменить await Serve(); (например):

 var serverTasks = Enumerable.Range(0, Environment.ProcessorCount) .Select(_ => Serve()); await Task.WhenAll(serverTasks); 

Это устраняет проблемы масштабируемости. Заметьте, что await будет проглотить все, кроме одной ошибки здесь.

Пытаться

 TcpListener listener; void Serve(){ while(true){ var client = listener.AcceptTcpClient(); Task.Run(() => this.HandleConnection(client)); //Or alternatively new Thread(HandleConnection).Start(client) } } в TcpListener listener; void Serve(){ while(true){ var client = listener.AcceptTcpClient(); Task.Run(() => this.HandleConnection(client)); //Or alternatively new Thread(HandleConnection).Start(client) } } 

Согласно Microsoft http://msdn.microsoft.com/en-AU/library/hh524395.aspx#BKMK_VoidReturnType , тип возврата void не должен использоваться, поскольку он не может улавливать исключения. Как вы указали, вам нужны задачи «стрелять и забывать», поэтому я пришел к выводу, что вы всегда должны возвращать задачу (как сказал Microsoft), но вы должны поймать ошибку, используя:

 TaskInstance.ContinueWith(i => { /* exception handler */ }, TaskContinuationOptions.OnlyOnFaulted); 

Пример, который я использовал в качестве доказательства, приведен ниже:

 public static void Main() { Awaitable() .ContinueWith( i => { foreach (var exception in i.Exception.InnerExceptions) { Console.WriteLine(exception.Message); } }, TaskContinuationOptions.OnlyOnFaulted); Console.WriteLine("This needs to come out before my exception"); Console.ReadLine(); } public static async Task Awaitable() { await Task.Delay(3000); throw new Exception("Hey I can catch these pesky things"); } 

Есть ли причина, по которой вам нужно принимать соединения async? Я имею в виду, ожидает ли какое-либо соединение с клиентом какое-либо значение? Единственная причина для этого – это потому, что на сервере есть еще одна работа, ожидающая соединения. Если есть, возможно, вы можете сделать что-то вроде этого:

  public async void Serve() { while (true) { var client = await _listener.AcceptTcpClientAsync(); Task.Factory.StartNew(() => HandleClient(client), TaskCreationOptions.LongRunning); } } 

Таким образом, принятие будет освобождать текущий вариант оставления streamа для других вещей, которые нужно выполнить, и обработка выполняется в новом streamе. Единственные накладные расходы – это создать новый stream для обработки клиента, прежде чем он вернется к принятию нового соединения.

Edit: Просто понял, что это почти тот же код, который вы написали. Думаю, мне нужно снова прочитать ваш вопрос, чтобы лучше понять, что вы на самом деле спрашиваете: S

Edit2:

Есть ли способ объединить эти два подхода, так что мой сервер будет использовать точно количество streamов, которые ему нужны для количества активно запущенных задач, но чтобы он не блокировал streamи без необходимости в операциях ввода-вывода?

Думайте, что мое решение действительно отвечает на этот вопрос. Действительно ли это необходимо?

Edit3: Made Task.Factory.StartNew () фактически создает новый stream.

Interesting Posts

Как объединить данные из двух листов и условно форматировать строки на основе листа происхождения

Эффективные побитовые операции для подсчета бит или нахождения правого | левого большинства

Начало работы с Android

Соображения производительности для keySet () и entrySet () карты

Теперь, когда у нас есть std :: array, какие применения остаются для массивов в стиле C?

Как сопоставить динамический url / prj / noticeOpen / 2 в Spring MVC-controllerе

Поиск точной информации, полученной установщиками

Linux – установка устройства с конкретными правами пользователя

Где именно я могу загрузить последнюю версию Scene Builder для Java?

Лекция откачки для регулярного языка

Android Studio не может разрешить R в импортированном проекте?

Неожиданное исключение поймало параметр «xxx» в «classе xxx: ошибка установки выражения« xxx »со значением

Есть ли команда linux, например mv, но с регулярным выражением?

Как измерить производительность диска?

Восстановление данных, потерянных chkdsk

Давайте будем гением компьютера.