refactor receiving socket code in its own method
This commit is contained in:
		| @@ -350,29 +350,10 @@ namespace ix | |||||||
|         } |         } | ||||||
|         else if (pollResult == PollResultType::ReadyForRead) |         else if (pollResult == PollResultType::ReadyForRead) | ||||||
|         { |         { | ||||||
|             while (true) |             if (!receiveFromSocket()) | ||||||
|             { |             { | ||||||
|                 ssize_t ret = _socket->recv((char*) &_readbuf[0], _readbuf.size()); |  | ||||||
|  |  | ||||||
|                 if (ret < 0 && Socket::isWaitNeeded()) |  | ||||||
|                 { |  | ||||||
|                     break; |  | ||||||
|                 } |  | ||||||
|                 else if (ret <= 0) |  | ||||||
|                 { |  | ||||||
|                     // if there are received data pending to be processed, then delay the abnormal |  | ||||||
|                     // closure to after dispatch (other close code/reason could be read from the |  | ||||||
|                     // buffer) |  | ||||||
|  |  | ||||||
|                     closeSocket(); |  | ||||||
|  |  | ||||||
|                 return PollResult::AbnormalClose; |                 return PollResult::AbnormalClose; | ||||||
|             } |             } | ||||||
|                 else |  | ||||||
|                 { |  | ||||||
|                     _rxbuf.insert(_rxbuf.end(), _readbuf.begin(), _readbuf.begin() + ret); |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         } |         } | ||||||
|         else if (pollResult == PollResultType::Error) |         else if (pollResult == PollResultType::Error) | ||||||
|         { |         { | ||||||
| @@ -1053,19 +1034,17 @@ namespace ix | |||||||
|             wsheader_type::TEXT_FRAME, message, _enablePerMessageDeflate, onProgressCallback); |             wsheader_type::TEXT_FRAME, message, _enablePerMessageDeflate, onProgressCallback); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     ssize_t WebSocketTransport::send() |  | ||||||
|     { |  | ||||||
|         std::lock_guard<std::mutex> lock(_socketMutex); |  | ||||||
|         return _socket->send((char*) &_txbuf[0], _txbuf.size()); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     bool WebSocketTransport::sendOnSocket() |     bool WebSocketTransport::sendOnSocket() | ||||||
|     { |     { | ||||||
|         std::lock_guard<std::mutex> lock(_txbufMutex); |         std::lock_guard<std::mutex> lock(_txbufMutex); | ||||||
|  |  | ||||||
|         while (_txbuf.size()) |         while (_txbuf.size()) | ||||||
|         { |         { | ||||||
|             ssize_t ret = send(); |             ssize_t ret = 0; | ||||||
|  |             { | ||||||
|  |                 std::lock_guard<std::mutex> lock(_socketMutex); | ||||||
|  |                 ret = _socket->send((char*) &_txbuf[0], _txbuf.size()); | ||||||
|  |             } | ||||||
|  |  | ||||||
|             if (ret < 0 && Socket::isWaitNeeded()) |             if (ret < 0 && Socket::isWaitNeeded()) | ||||||
|             { |             { | ||||||
| @@ -1086,6 +1065,34 @@ namespace ix | |||||||
|         return true; |         return true; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     bool WebSocketTransport::receiveFromSocket() | ||||||
|  |     { | ||||||
|  |         while (true) | ||||||
|  |         { | ||||||
|  |             ssize_t ret = _socket->recv((char*) &_readbuf[0], _readbuf.size()); | ||||||
|  |  | ||||||
|  |             if (ret < 0 && Socket::isWaitNeeded()) | ||||||
|  |             { | ||||||
|  |                 break; | ||||||
|  |             } | ||||||
|  |             else if (ret <= 0) | ||||||
|  |             { | ||||||
|  |                 // if there are received data pending to be processed, then delay the abnormal | ||||||
|  |                 // closure to after dispatch (other close code/reason could be read from the | ||||||
|  |                 // buffer) | ||||||
|  |  | ||||||
|  |                 closeSocket(); | ||||||
|  |                 return false; | ||||||
|  |             } | ||||||
|  |             else | ||||||
|  |             { | ||||||
|  |                 _rxbuf.insert(_rxbuf.end(), _readbuf.begin(), _readbuf.begin() + ret); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         return true; | ||||||
|  |     } | ||||||
|  |  | ||||||
|     void WebSocketTransport::sendCloseFrame(uint16_t code, const std::string& reason) |     void WebSocketTransport::sendCloseFrame(uint16_t code, const std::string& reason) | ||||||
|     { |     { | ||||||
|         bool compress = false; |         bool compress = false; | ||||||
|   | |||||||
| @@ -99,7 +99,6 @@ namespace ix | |||||||
|                    bool remote = false); |                    bool remote = false); | ||||||
|  |  | ||||||
|         void closeSocket(); |         void closeSocket(); | ||||||
|         ssize_t send(); |  | ||||||
|  |  | ||||||
|         ReadyState getReadyState() const; |         ReadyState getReadyState() const; | ||||||
|         void setReadyState(ReadyState readyState); |         void setReadyState(ReadyState readyState); | ||||||
| @@ -245,6 +244,8 @@ namespace ix | |||||||
|  |  | ||||||
|         bool flushSendBuffer(); |         bool flushSendBuffer(); | ||||||
|         bool sendOnSocket(); |         bool sendOnSocket(); | ||||||
|  |         bool receiveFromSocket(); | ||||||
|  |  | ||||||
|         WebSocketSendInfo sendData(wsheader_type::opcode_type type, |         WebSocketSendInfo sendData(wsheader_type::opcode_type type, | ||||||
|                                    const std::string& message, |                                    const std::string& message, | ||||||
|                                    bool compress, |                                    bool compress, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user