boost asio async_write: как не чередовать вызовы async_write?

Вот моя реализация:

  • Клиент A отправить сообщение для клиента B
  • Сервер обрабатывает сообщение async_read нужным количеством данных и будет ожидать новых данных от клиента A (в порядке, чтобы не блокировать клиента A)
  • После этого сервер обработает информацию (возможно, выполнит mysql-запрос), а затем отправит сообщение клиенту B с помощью async_write .

Проблема в том, что если сообщение «Клиент A» посылает сообщение очень быстро, async_writes будет чередоваться до вызова предыдущего обработчика async_write.

Есть ли простой способ избежать этой проблемы?

РЕДАКТИРОВАТЬ 1: Если клиент C отправляет сообщение клиенту B сразу после клиента A, то такая же проблема должна появиться …

EDIT 2: Это сработает? потому что он, кажется, блокирует, я не знаю, где …

  namespace structure { class User { public: User(boost::asio::io_service& io_service, boost::asio::ssl::context& context) : m_socket(io_service, context), m_strand(io_service), is_writing(false) {} ssl_socket& getSocket() { return m_socket; } boost::asio::strand getStrand() { return m_strand; } void push(std::string str) { m_strand.post(boost::bind(&structure::User::strand_push, this, str)); } void strand_push(std::string str) { std::cout << "pushing: " << boost::this_thread::get_id() << std::endl; m_queue.push(str); if (!is_writing) { write(); std::cout << "going to write" << std::endl; } std::cout << "Already writing" << std::endl; } void write() { std::cout << "writing" << std::endl; is_writing = true; std::string str = m_queue.front(); boost::asio::async_write(m_socket, boost::asio::buffer(str.c_str(), str.size()), boost::bind(&structure::User::sent, this) ); } void sent() { std::cout << "sent" << std::endl; m_queue.pop(); if (!m_queue.empty()) { write(); return; } else is_writing = false; std::cout << "done sent" << std::endl; } private: ssl_socket m_socket; boost::asio::strand m_strand; std::queue m_queue; bool is_writing; }; } #endif 

Есть ли простой способ избежать этой проблемы?

Да, поддерживать исходящую очередь для каждого клиента. Проверьте размер очереди в async_write завершения async_write , если ненулевое значение, запустите другую операцию async_write . Вот пример

 #include  #include  #include  #include  #include  class Connection { public: Connection( boost::asio::io_service& io_service ) : _io_service( io_service ), _strand( _io_service ), _socket( _io_service ), _outbox() { } void write( const std::string& message ) { _strand.post( boost::bind( &Connection::writeImpl, this, message ) ); } private: void writeImpl( const std::string& message ) { _outbox.push_back( message ); if ( _outbox.size() > 1 ) { // outstanding async_write return; } this->write(); } void write() { const std::string& message = _outbox[0]; boost::asio::async_write( _socket, boost::asio::buffer( message.c_str(), message.size() ), _strand.wrap( boost::bind( &Connection::writeHandler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) ) ); } void writeHandler( const boost::system::error_code& error, const size_t bytesTransferred ) { _outbox.pop_front(); if ( error ) { std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl; return; } if ( !_outbox.empty() ) { // more messages to send this->write(); } } private: typedef std::deque Outbox; private: boost::asio::io_service& _io_service; boost::asio::io_service::strand _strand; boost::asio::ip::tcp::socket _socket; Outbox _outbox; }; int main() { boost::asio::io_service io_service; Connection foo( io_service ); } 

некоторые ключевые моменты

  • boost::asio::io_service::strand защищает доступ к Connection::_outbox
  • обработчик отправляется из Connection::write() поскольку он является общедоступным

для меня это не было очевидным, если вы использовали аналогичные методы в примере в своем вопросе, поскольку все методы являются общедоступными.

Просто пытаюсь улучшить великолепный ответ Сэма. Точками улучшения являются:

  • async_write пытается выполнить отправку каждого байта из буфера (ов) до завершения, что означает, что вы должны предоставить все входные данные, которые у вас есть для операции записи, в противном случае накладные расходы на кадрирование могут увеличиться из-за того, что пакеты TCP меньше, чем они могли бы было.

  • asio::streambuf , будучи очень удобным в использовании, не является нулевой копией. В приведенном ниже примере демонстрируется подход с нулевой копией : сохраняйте блоки данных ввода, где они есть, и используйте перегрузку / async_write перегрузки async_write которая принимает последовательность входных буферов (которые только указывают на фактические входные данные).

Полный исходный код:

 #include  #include  #include  #include  #include  #include  #include  #include  using namespace std::chrono_literals; using boost::asio::ip::tcp; class Server { class Connection : public std::enable_shared_from_this { friend class Server; void ProcessCommand(const std::string& cmd) { if (cmd == "stop") { server_.Stop(); return; } if (cmd == "") { Close(); return; } std::thread t([this, self = shared_from_this(), cmd] { for (int i = 0; i < 30; ++i) { Write("Hello, " + cmd + " " + std::to_string(i) + "\r\n"); } server_.io_service_.post([this, self] { DoReadCmd(); }); }); t.detach(); } void DoReadCmd() { read_timer_.expires_from_now(server_.read_timeout_); read_timer_.async_wait([this](boost::system::error_code ec) { if (!ec) { std::cout << "Read timeout\n"; Shutdown(); } }); boost::asio::async_read_until(socket_, buf_in_, '\n', [this, self = shared_from_this()](boost::system::error_code ec, std::size_t bytes_read) { read_timer_.cancel(); if (!ec) { const char* p = boost::asio::buffer_cast(buf_in_.data()); std::string cmd(p, bytes_read - (bytes_read > 1 && p[bytes_read - 2] == '\r' ? 2 : 1)); buf_in_.consume(bytes_read); ProcessCommand(cmd); } else { Close(); } }); } void DoWrite() { active_buffer_ ^= 1; // switch buffers for (const auto& data : buffers_[active_buffer_]) { buffer_seq_.push_back(boost::asio::buffer(data)); } write_timer_.expires_from_now(server_.write_timeout_); write_timer_.async_wait([this](boost::system::error_code ec) { if (!ec) { std::cout << "Write timeout\n"; Shutdown(); } }); boost::asio::async_write(socket_, buffer_seq_, [this, self = shared_from_this()](const boost::system::error_code& ec, size_t bytes_transferred) { write_timer_.cancel(); std::lock_guard lock(buffers_mtx_); buffers_[active_buffer_].clear(); buffer_seq_.clear(); if (!ec) { std::cout << "Wrote " << bytes_transferred << " bytes\n"; if (!buffers_[active_buffer_ ^ 1].empty()) // have more work DoWrite(); } else { Close(); } }); } bool Writing() const { return !buffer_seq_.empty(); } Server& server_; boost::asio::streambuf buf_in_; std::mutex buffers_mtx_; std::vector buffers_[2]; // a double buffer std::vector buffer_seq_; int active_buffer_ = 0; bool closing_ = false; bool closed_ = false; boost::asio::deadline_timer read_timer_, write_timer_; tcp::socket socket_; public: Connection(Server& server) : server_(server), read_timer_(server.io_service_), write_timer_(server.io_service_), socket_(server.io_service_) { } void Start() { socket_.set_option(tcp::no_delay(true)); DoReadCmd(); } void Close() { closing_ = true; if (!Writing()) Shutdown(); } void Shutdown() { if (!closed_) { closing_ = closed_ = true; boost::system::error_code ec; socket_.shutdown(tcp::socket::shutdown_both, ec); socket_.close(); server_.active_connections_.erase(shared_from_this()); } } void Write(std::string&& data) { std::lock_guard lock(buffers_mtx_); buffers_[active_buffer_ ^ 1].push_back(std::move(data)); // move input data to the inactive buffer if (!Writing()) DoWrite(); } }; void DoAccept() { if (acceptor_.is_open()) { auto session = std::make_shared(*this); acceptor_.async_accept(session->socket_, [this, session](boost::system::error_code ec) { if (!ec) { active_connections_.insert(session); session->Start(); } DoAccept(); }); } } boost::asio::io_service io_service_; tcp::acceptor acceptor_; std::unordered_set> active_connections_; const boost::posix_time::time_duration read_timeout_ = boost::posix_time::seconds(30); const boost::posix_time::time_duration write_timeout_ = boost::posix_time::seconds(30); public: Server(int port) : acceptor_(io_service_, tcp::endpoint(tcp::v6(), port), false) { } void Run() { std::cout << "Listening on " << acceptor_.local_endpoint() << "\n"; DoAccept(); io_service_.run(); } void Stop() { acceptor_.close(); { std::vector> sessionsToClose; copy(active_connections_.begin(), active_connections_.end(), back_inserter(sessionsToClose)); for (auto& s : sessionsToClose) s->Shutdown(); } active_connections_.clear(); io_service_.stop(); } }; int main() { try { Server srv(8888); srv.Run(); } catch (const std::exception& e) { std::cerr << "Error: " << e.what() << "\n"; } } 
  • Неожиданный порядок оценки (ошибка компилятора?)
  • Перегрузка макроса по количеству аргументов
  • Макросъемка Variadic
  • Множественное наследование в C #
  • Как отформатировать указатель на функцию?
  • Наследование C ++ и функция переопределения
  • Обнаружено разбиение стека
  • Область действия объекта исключения в C ++
  • Сортировка 2D-массива в C ++ с использованием встроенных функций (или любого другого метода)?
  • ostream chaining, порядок вывода
  • WPF загружает анимацию в отдельный stream пользовательского интерфейса? (С #)
  • Давайте будем гением компьютера.