diff options
author | Jose <jose@zeroc.com> | 2014-08-13 22:30:18 +0200 |
---|---|---|
committer | Jose <jose@zeroc.com> | 2014-08-13 22:30:18 +0200 |
commit | edd76dbfad6a28c11e441571af493e1eb99ddf52 (patch) | |
tree | 6e8d1045d6e4c60120970468d68b4734e2fee6e0 /cpp/src/Ice/WSTransceiver.cpp | |
parent | - Added back the Ice.BackgroundIO property. (diff) | |
download | ice-edd76dbfad6a28c11e441571af493e1eb99ddf52.tar.bz2 ice-edd76dbfad6a28c11e441571af493e1eb99ddf52.tar.xz ice-edd76dbfad6a28c11e441571af493e1eb99ddf52.zip |
Rename WS transport files for consistency with TCP/UDP transports.
Diffstat (limited to 'cpp/src/Ice/WSTransceiver.cpp')
-rw-r--r-- | cpp/src/Ice/WSTransceiver.cpp | 1698 |
1 files changed, 1698 insertions, 0 deletions
diff --git a/cpp/src/Ice/WSTransceiver.cpp b/cpp/src/Ice/WSTransceiver.cpp new file mode 100644 index 00000000000..cb05c34d75b --- /dev/null +++ b/cpp/src/Ice/WSTransceiver.cpp @@ -0,0 +1,1698 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/WSTransceiver.h> +#include <Ice/Endpoint.h> +#include <Ice/Connection.h> +#include <Ice/ProtocolInstance.h> +#include <Ice/HttpParser.h> +#include <Ice/Communicator.h> +#include <Ice/LoggerUtil.h> +#include <Ice/Buffer.h> +#include <Ice/LocalException.h> +#include <Ice/Base64.h> +#include <IceUtil/Random.h> +#include <IceUtil/SHA1.h> +#include <IceUtil/StringUtil.h> + +#include <IceUtil/DisableWarnings.h> + +#include <stdint.h> +#include <climits> + +using namespace std; +using namespace Ice; +using namespace IceInternal; + +// +// WebSocket opcodes +// +#define OP_CONT 0x0 // Continuation frame +#define OP_TEXT 0x1 // Text frame +#define OP_DATA 0x2 // Data frame +#define OP_RES_0x3 0x3 // Reserved +#define OP_RES_0x4 0x4 // Reserved +#define OP_RES_0x5 0x5 // Reserved +#define OP_RES_0x6 0x6 // Reserved +#define OP_RES_0x7 0x7 // Reserved +#define OP_CLOSE 0x8 // Connection close +#define OP_PING 0x9 // Ping +#define OP_PONG 0xA // Pong +#define OP_RES_0xB 0xB // Reserved +#define OP_RES_0xC 0xC // Reserved +#define OP_RES_0xD 0xD // Reserved +#define OP_RES_0xE 0xE // Reserved +#define OP_RES_0xF 0xF // Reserved +#define FLAG_FINAL 0x80 // Last frame +#define FLAG_MASKED 0x80 // Payload is masked + +#define CLOSURE_NORMAL 1000 +#define CLOSURE_SHUTDOWN 1001 +#define CLOSURE_PROTOCOL_ERROR 1002 +#define CLOSURE_TOO_BIG 1009 + +namespace +{ + +const string _iceProtocol = "ice.zeroc.com"; +const string _wsUUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + +// +// Rename to avoid conflict with OS 10.10 htonll +// +void ice_htonll(Long v, Byte* dest) +{ + // + // Transfer a 64-bit integer in network (big-endian) order. + // +#ifdef ICE_BIG_ENDIAN + const Byte* src = reinterpret_cast<const Byte*>(&v); + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest = *src; +#else + const Byte* src = reinterpret_cast<const Byte*>(&v) + sizeof(Long) - 1; + *dest++ = *src--; + *dest++ = *src--; + *dest++ = *src--; + *dest++ = *src--; + *dest++ = *src--; + *dest++ = *src--; + *dest++ = *src--; + *dest = *src; +#endif +} + +// +// Rename to avoid conflict with OS 10.10 nlltoh +// +Long ice_nlltoh(const Byte* src) +{ + Long v; + + // + // Extract a 64-bit integer in network (big-endian) order. + // +#ifdef ICE_BIG_ENDIAN + Byte* dest = reinterpret_cast<Byte*>(&v); + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest = *src; +#else + Byte* dest = reinterpret_cast<Byte*>(&v) + sizeof(Long) - 1; + *dest-- = *src++; + *dest-- = *src++; + *dest-- = *src++; + *dest-- = *src++; + *dest-- = *src++; + *dest-- = *src++; + *dest-- = *src++; + *dest = *src; +#endif + + return v; +} + +#if defined(ICE_OS_WINRT) +Short htons(Short v) +{ + Short result; + Byte* dest = reinterpret_cast<Byte*>(&result); + + // + // Transfer a short in network (big-endian) order. + // +#ifdef ICE_BIG_ENDIAN + const Byte* src = reinterpret_cast<const Byte*>(&v); + *dest++ = *src++; + *dest = *src; +#else + const Byte* src = reinterpret_cast<const Byte*>(&v) + sizeof(Short) - 1; + *dest++ = *src--; + *dest = *src; +#endif + return result; +} + +Short ntohs(Short value) +{ + const Byte* src = reinterpret_cast<Byte*>(&value); + Short v; + + // + // Extract a 64-bit integer in network (big-endian) order. + // +#ifdef ICE_BIG_ENDIAN + Byte* dest = reinterpret_cast<Byte*>(&v); + *dest++ = *src++; + *dest = *src; +#else + Byte* dest = reinterpret_cast<Byte*>(&v) + sizeof(Short) - 1; + *dest-- = *src++; + *dest = *src; +#endif + + return v; +} +#endif + +} + +NativeInfoPtr +IceInternal::WSTransceiver::getNativeInfo() +{ + return _delegate->getNativeInfo(); +} + +#if defined(ICE_USE_IOCP) +AsyncInfo* +IceInternal::WSTransceiver::getAsyncInfo(SocketOperation status) +{ + return _delegate->getNativeInfo()->getAsyncInfo(status); +} +#elif defined(ICE_OS_WINRT) +void +IceInternal::WSTransceiver::setCompletedHandler(IceInternal::SocketOperationCompletedHandler^ handler) +{ + _delegate->getNativeInfo()->setCompletedHandler(handler); +} +#endif + +SocketOperation +IceInternal::WSTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool& hasMoreData) +{ + // + // Delegate logs exceptions that occur during initialize(), so there's no need to trap them here. + // + if(_state == StateInitializeDelegate) + { + SocketOperation op = _delegate->initialize(readBuffer, writeBuffer, hasMoreData); + if(op != SocketOperationNone) + { + return op; + } + _state = StateConnected; + } + + try + { + if(_state == StateConnected) + { + // + // We don't know how much we'll need to read. + // + _readBuffer.b.resize(1024); + _readI = _readBuffer.i = _readBuffer.b.begin(); + + // + // The server waits for the client's upgrade request, the + // client sends the upgrade request. + // + _state = StateUpgradeRequestPending; + if(!_incoming) + { + // + // Compose the upgrade request. + // + ostringstream out; + out << "GET " << _resource << " HTTP/1.1\r\n" + << "Host: " << _host << ":" << _port << "\r\n" + << "Upgrade: websocket\r\n" + << "Connection: Upgrade\r\n" + << "Sec-WebSocket-Protocol: " << _iceProtocol << "\r\n" + << "Sec-WebSocket-Version: 13\r\n" + << "Sec-WebSocket-Key: "; + + // + // The value for Sec-WebSocket-Key is a 16-byte random number, + // encoded with Base64. + // + vector<unsigned char> key(16); + IceUtilInternal::generateRandom(reinterpret_cast<char*>(&key[0]), key.size()); + _key = IceInternal::Base64::encode(key); + out << _key << "\r\n\r\n"; // EOM + + string str = out.str(); + _writeBuffer.b.resize(str.size()); + memcpy(&_writeBuffer.b[0], str.c_str(), str.size()); + _writeBuffer.i = _writeBuffer.b.begin(); + } + } + + // + // Try to write the client's upgrade request. + // + if(_state == StateUpgradeRequestPending && !_incoming) + { + if(_writeBuffer.i < _writeBuffer.b.end()) + { + SocketOperation s = _delegate->write(_writeBuffer); + if(s) + { + return s; + } + } + assert(_writeBuffer.i == _writeBuffer.b.end()); + _state = StateUpgradeResponsePending; + } + + while(true) + { + if(_readBuffer.i < _readBuffer.b.end()) + { + SocketOperation s = _delegate->read(_readBuffer, hasMoreData); + if(s == SocketOperationWrite || _readBuffer.i == _readBuffer.b.begin()) + { + return s; + } + } + + // + // Try to read the client's upgrade request or the server's response. + // + if((_state == StateUpgradeRequestPending && _incoming) || + (_state == StateUpgradeResponsePending && !_incoming)) + { + // + // Check if we have enough data for a complete message. + // + const Ice::Byte* p = _parser->isCompleteMessage(&_readBuffer.b[0], _readBuffer.i); + if(!p) + { + if(_readBuffer.i < _readBuffer.b.end()) + { + return SocketOperationRead; + } + + // + // Enlarge the buffer and try to read more. + // + const size_t oldSize = static_cast<size_t>(_readBuffer.i - _readBuffer.b.begin()); + if(oldSize + 1024 > _instance->messageSizeMax()) + { + throw MemoryLimitException(__FILE__, __LINE__); + } + _readBuffer.b.resize(oldSize + 1024); + _readBuffer.i = _readBuffer.b.begin() + oldSize; + continue; // Try again to read the response/request + } + + // + // Set _readI at the end of the response/request message. + // + _readI = _readBuffer.b.begin() + (p - &_readBuffer.b[0]); + } + + // + // We're done, the client's upgrade request or server's response is read. + // + break; + } + + try + { + // + // Parse the client's upgrade request. + // + if(_state == StateUpgradeRequestPending && _incoming) + { + if(_parser->parse(&_readBuffer.b[0], _readI)) + { + handleRequest(_writeBuffer); + _state = StateUpgradeResponsePending; + } + else + { + throw ProtocolException(__FILE__, __LINE__, "incomplete request message"); + } + } + + if(_state == StateUpgradeResponsePending) + { + if(_incoming) + { + if(_writeBuffer.i < _writeBuffer.b.end()) + { + SocketOperation s = _delegate->write(_writeBuffer); + if(s) + { + return s; + } + } + } + else + { + // + // Parse the server's response + // + if(_parser->parse(&_readBuffer.b[0], _readI)) + { + handleResponse(); + } + else + { + throw ProtocolException(__FILE__, __LINE__, "incomplete response message"); + } + } + } + } + catch(const WebSocketException& ex) + { + throw ProtocolException(__FILE__, __LINE__, ex.reason); + } + + _state = StateOpened; + _nextState = StateOpened; + + hasMoreData |= _readI < _readBuffer.i; + } + catch(const Ice::LocalException& ex) + { + if(_instance->traceLevel() >= 2) + { + Trace out(_instance->logger(), _instance->traceCategory()); + out << protocol() << " connection HTTP upgrade request failed\n" << toString() << "\n" << ex; + } + throw; + } + + if(_instance->traceLevel() >= 1) + { + Trace out(_instance->logger(), _instance->traceCategory()); + if(_incoming) + { + out << "accepted " << protocol() << " connection HTTP upgrade request\n" << toString(); + } + else + { + out << protocol() << " connection HTTP upgrade request accepted\n" << toString(); + } + } + + return SocketOperationNone; +} + +SocketOperation +IceInternal::WSTransceiver::closing(bool initiator, const Ice::LocalException& reason) +{ + if(_instance->traceLevel() >= 1) + { + Trace out(_instance->logger(), _instance->traceCategory()); + out << "gracefully closing " << protocol() << " connection\n" << toString(); + } + + State s = _nextState == StateOpened ? _state : _nextState; + + if(s == StateClosingRequestPending && _closingInitiator) + { + // + // If we initiated a close connection but also received a + // close connection, we assume we didn't initiated the + // connection and we send the close frame now. This is to + // ensure that if both peers close the connection at the same + // time we don't hang having both peer waiting for the close + // frame of the other. + // + assert(!initiator); + _closingInitiator = false; + return SocketOperationWrite; + } + else if(s >= StateClosingRequestPending) + { + return SocketOperationNone; + } + + _closingInitiator = initiator; + if(dynamic_cast<const Ice::CloseConnectionException*>(&reason)) + { + _closingReason = CLOSURE_NORMAL; + } + else if(dynamic_cast<const Ice::ObjectAdapterDeactivatedException*>(&reason) || + dynamic_cast<const Ice::CommunicatorDestroyedException*>(&reason)) + { + _closingReason = CLOSURE_SHUTDOWN; + } + else if(dynamic_cast<const Ice::ProtocolException*>(&reason)) + { + _closingReason = CLOSURE_PROTOCOL_ERROR; + } + else if(dynamic_cast<const Ice::MemoryLimitException*>(&reason)) + { + _closingReason = CLOSURE_TOO_BIG; + } + + if(_state == StateOpened) + { + _state = StateClosingRequestPending; + return initiator ? SocketOperationRead : SocketOperationWrite; + } + else + { + _nextState = StateClosingRequestPending; + return SocketOperationNone; + } +} + +void +IceInternal::WSTransceiver::close() +{ + _delegate->close(); + _state = StateClosed; +} + +SocketOperation +IceInternal::WSTransceiver::write(Buffer& buf) +{ + if(_writePending) + { + return SocketOperationWrite; + } + + if(_state < StateOpened) + { + if(_state < StateConnected) + { + return _delegate->write(buf); + } + else + { + return _delegate->write(_writeBuffer); + } + } + + do + { + if(preWrite(buf)) + { + if(_writeBuffer.i < _writeBuffer.b.end()) + { + SocketOperation s = _delegate->write(_writeBuffer); + if(s) + { + return s; + } + } + + if(_incoming && !buf.b.empty() && _writeState == WriteStatePayload) + { + SocketOperation s = _delegate->write(buf); + if(s) + { + return s; + } + } + } + } + while(postWrite(buf)); + + if(_state == StateClosingResponsePending && !_closingInitiator) + { + return SocketOperationRead; + } + return SocketOperationNone; +} + +SocketOperation +IceInternal::WSTransceiver::read(Buffer& buf, bool& hasMoreData) +{ + if(_readPending) + { + return SocketOperationRead; + } + + if(_state < StateOpened) + { + if(_state < StateConnected) + { + return _delegate->read(buf, hasMoreData); + } + else + { + if(_delegate->read(_readBuffer, hasMoreData) == SocketOperationWrite) + { + return SocketOperationWrite; + } + else + { + return SocketOperationNone; + } + } + } + + SocketOperation s = SocketOperationNone; + do + { + if(preRead(buf)) + { + if(_readState == ReadStatePayload) + { + // + // If the payload length is smaller than what remains to be read, we read + // no more than the payload length. The remaining of the buffer will be + // sent over in another frame. + // + size_t readSz = _readPayloadLength - (buf.i - _readStart); // Already read + if(static_cast<size_t>(buf.b.end() - buf.i) > readSz) + { + size_t size = buf.b.size(); + buf.b.resize(buf.i - buf.b.begin() + readSz); + s = _delegate->read(buf, hasMoreData); + buf.b.resize(size); + } + else + { + s = _delegate->read(buf, hasMoreData); + } + + } + else + { + s = _delegate->read(_readBuffer, hasMoreData); + } + + if(s == SocketOperationWrite) + { + postRead(buf); + return s; + } + } + } + while(postRead(buf)); + + if(buf.i == buf.b.end()) + { + hasMoreData |= _readI < _readBuffer.i; + s = SocketOperationNone; + } + else + { + hasMoreData = false; + s = SocketOperationRead; + } + + if(((_state == StateClosingRequestPending && !_closingInitiator) || + (_state == StateClosingResponsePending && _closingInitiator) || + _state == StatePingPending || + _state == StatePongPending) && + _writeState == WriteStateHeader) + { + // We have things to write, ask to be notified when writes are ready. + s = static_cast<SocketOperation>(s | SocketOperationWrite); + } + return s; +} + +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) +bool +IceInternal::WSTransceiver::startWrite(Buffer& buf) +{ + _writePending = true; + if(_state < StateOpened) + { + if(_state < StateConnected) + { + return _delegate->startWrite(buf); + } + else + { + return _delegate->startWrite(_writeBuffer); + } + } + + if(preWrite(buf)) + { + if(_writeBuffer.i < _writeBuffer.b.end()) + { + _delegate->startWrite(_writeBuffer); + return false; + } + else + { + assert(_incoming); + return _delegate->startWrite(buf); + } + } + else + { + _delegate->getNativeInfo()->completed(IceInternal::SocketOperationWrite); + return false; + } +} + +void +IceInternal::WSTransceiver::finishWrite(Buffer& buf) +{ + _writePending = false; + if(_state < StateOpened) + { + if(_state < StateConnected) + { + _delegate->finishWrite(buf); + } + else + { + _delegate->finishWrite(_writeBuffer); + } + return; + } + + if(_writeBuffer.i < _writeBuffer.b.end()) + { + _delegate->finishWrite(_writeBuffer); + } + else if(!buf.b.empty() && buf.i != buf.b.end()) + { + assert(_incoming); + _delegate->finishWrite(buf); + } + + postWrite(buf); +} + +void +IceInternal::WSTransceiver::startRead(Buffer& buf) +{ + _readPending = true; + if(_state < StateOpened) + { + if(_state < StateConnected) + { + _delegate->startRead(buf); + } + else + { + _delegate->startRead(_readBuffer); + } + return; + } + + if(preRead(buf)) + { + if(_readState == ReadStatePayload) + { + // + // If the payload length is smaller than what remains to be read, we read + // no more than the payload length. The remaining of the buffer will be + // sent over in another frame. + // + size_t readSz = _readPayloadLength - (buf.i - _readStart); + if(static_cast<size_t>(buf.b.end() - buf.i) > readSz) + { + size_t size = buf.b.size(); + buf.b.resize(buf.i - buf.b.begin() + readSz); + _delegate->startRead(buf); + buf.b.resize(size); + } + else + { + _delegate->startRead(buf); + } + } + else + { + _delegate->startRead(_readBuffer); + } + } + else + { + _delegate->getNativeInfo()->completed(IceInternal::SocketOperationRead); + } +} + +void +IceInternal::WSTransceiver::finishRead(Buffer& buf, bool& hasMoreData) +{ + _readPending = false; + if(_state < StateOpened) + { + if(_state < StateConnected) + { + _delegate->finishRead(buf, hasMoreData); + } + else + { + _delegate->finishRead(_readBuffer, hasMoreData); + } + return; + } + + if(buf.b.empty() || buf.i == buf.b.end()) + { + // Nothing to do. + } + else if(_readState == ReadStatePayload) + { + _delegate->finishRead(buf, hasMoreData); + } + else + { + _delegate->finishRead(_readBuffer, hasMoreData); + } + postRead(buf); +} +#endif + +string +IceInternal::WSTransceiver::protocol() const +{ + return _instance->protocol(); +} + +string +IceInternal::WSTransceiver::toString() const +{ + return _delegate->toString(); +} + +Ice::ConnectionInfoPtr +IceInternal::WSTransceiver::getInfo() const +{ + IPConnectionInfoPtr di = IPConnectionInfoPtr::dynamicCast(_delegate->getInfo()); + assert(di); + WSConnectionInfoPtr info = new WSConnectionInfo(); + info->localAddress = di->localAddress; + info->localPort = di->localPort; + info->remoteAddress = di->remoteAddress; + info->remotePort = di->remotePort; + return info; +} + +void +IceInternal::WSTransceiver::checkSendSize(const Buffer& buf, size_t messageSizeMax) +{ + _delegate->checkSendSize(buf, messageSizeMax); +} + +IceInternal::WSTransceiver::WSTransceiver(const ProtocolInstancePtr& instance, const TransceiverPtr& del, + const string& host, int port, const string& resource) : + _instance(instance), + _delegate(del), + _host(host), + _port(port), + _resource(resource), + _incoming(false), + _state(StateInitializeDelegate), + _parser(new HttpParser), + _readState(ReadStateOpcode), + _readBuffer(0), + _readBufferSize(1024), + _readLastFrame(false), + _readOpCode(0), + _readHeaderLength(0), + _readPayloadLength(0), + _writeState(WriteStateHeader), + _writeBuffer(0), + _writeBufferSize(1024), + _readPending(false), + _writePending(false), + _closingInitiator(false), + _closingReason(CLOSURE_NORMAL) +{ + // + // For client connections, the sent frame payload must be + // masked. So we copy and send the message buffer data in chuncks + // of data whose size is up to the write buffer size. + // + const_cast<size_t&>(_writeBufferSize) = max(IceInternal::getSendBufferSize(del->getNativeInfo()->fd()), 1024); + + // + // Write and read buffer size must be large enough to hold the frame header! + // + assert(_writeBufferSize > 256); + assert(_readBufferSize > 256); +} + +IceInternal::WSTransceiver::WSTransceiver(const ProtocolInstancePtr& instance, const TransceiverPtr& del) : + _instance(instance), + _delegate(del), + _port(-1), + _incoming(true), + _state(StateInitializeDelegate), + _parser(new HttpParser), + _readState(ReadStateOpcode), + _readBuffer(0), + _readBufferSize(1024), + _readLastFrame(false), + _readOpCode(0), + _readHeaderLength(0), + _readPayloadLength(0), + _writeState(WriteStateHeader), + _writeBuffer(0), + _writeBufferSize(1024), + _readPending(false), + _writePending(false), + _closingInitiator(false), + _closingReason(CLOSURE_NORMAL) +{ + // + // Write and read buffer size must be large enough to hold the frame header! + // + assert(_writeBufferSize > 256); + assert(_readBufferSize > 256); +} + +IceInternal::WSTransceiver::~WSTransceiver() +{ +} + +void +IceInternal::WSTransceiver::handleRequest(Buffer& responseBuffer) +{ + string val; + + // + // HTTP/1.1 + // + if(_parser->versionMajor() != 1 || _parser->versionMinor() != 1) + { + throw WebSocketException("unsupported HTTP version"); + } + + // + // "An |Upgrade| header field containing the value 'websocket', + // treated as an ASCII case-insensitive value." + // + if(!_parser->getHeader("Upgrade", val, true)) + { + throw WebSocketException("missing value for Upgrade field"); + } + else if(val != "websocket") + { + throw WebSocketException("invalid value `" + val + "' for Upgrade field"); + } + + // + // "A |Connection| header field that includes the token 'Upgrade', + // treated as an ASCII case-insensitive value. + // + if(!_parser->getHeader("Connection", val, true)) + { + throw WebSocketException("missing value for Connection field"); + } + else if(val.find("upgrade") == string::npos) + { + throw WebSocketException("invalid value `" + val + "' for Connection field"); + } + + // + // "A |Sec-WebSocket-Version| header field, with a value of 13." + // + if(!_parser->getHeader("Sec-WebSocket-Version", val, false)) + { + throw WebSocketException("missing value for WebSocket version"); + } + else if(val != "13") + { + throw WebSocketException("unsupported WebSocket version `" + val + "'"); + } + + // + // "Optionally, a |Sec-WebSocket-Protocol| header field, with a list + // of values indicating which protocols the client would like to + // speak, ordered by preference." + // + bool addProtocol = false; + if(_parser->getHeader("Sec-WebSocket-Protocol", val, true)) + { + vector<string> protocols; + if(!IceUtilInternal::splitString(val, ",", protocols)) + { + throw WebSocketException("invalid value `" + val + "' for WebSocket protocol"); + } + for(vector<string>::iterator p = protocols.begin(); p != protocols.end(); ++p) + { + if(IceUtilInternal::trim(*p) != _iceProtocol) + { + throw WebSocketException("unknown value `" + *p + "' for WebSocket protocol"); + } + addProtocol = true; + } + } + + // + // "A |Sec-WebSocket-Key| header field with a base64-encoded + // value that, when decoded, is 16 bytes in length." + // + string key; + if(!_parser->getHeader("Sec-WebSocket-Key", key, false)) + { + throw WebSocketException("missing value for WebSocket key"); + } + + vector<unsigned char> decodedKey = Base64::decode(key); + if(decodedKey.size() != 16) + { + throw WebSocketException("invalid value `" + key + "' for WebSocket key"); + } + + // + // Retain the target resource. + // + const_cast<string&>(_resource) = _parser->uri(); + + // + // Compose the response. + // + ostringstream out; + out << "HTTP/1.1 101 Switching Protocols\r\n" + << "Upgrade: websocket\r\n" + << "Connection: Upgrade\r\n"; + if(addProtocol) + { + out << "Sec-WebSocket-Protocol: " << _iceProtocol << "\r\n"; + } + + // + // The response includes: + // + // "A |Sec-WebSocket-Accept| header field. The value of this + // header field is constructed by concatenating /key/, defined + // above in step 4 in Section 4.2.2, with the string "258EAFA5- + // E914-47DA-95CA-C5AB0DC85B11", taking the SHA-1 hash of this + // concatenated value to obtain a 20-byte value and base64- + // encoding (see Section 4 of [RFC4648]) this 20-byte hash. + // + out << "Sec-WebSocket-Accept: "; + string input = key + _wsUUID; + vector<unsigned char> hash; + IceUtil::sha1(reinterpret_cast<const unsigned char*>(&input[0]), input.size(), hash); + out << IceInternal::Base64::encode(hash) << "\r\n" << "\r\n"; // EOM + + string str = out.str(); + responseBuffer.b.resize(str.size()); + memcpy(&responseBuffer.b[0], str.c_str(), str.size()); + responseBuffer.i = responseBuffer.b.begin(); +} + +void +IceInternal::WSTransceiver::handleResponse() +{ + string val; + + // + // HTTP/1.1 + // + if(_parser->versionMajor() != 1 || _parser->versionMinor() != 1) + { + throw WebSocketException("unsupported HTTP version"); + } + + // + // "If the status code received from the server is not 101, the + // client handles the response per HTTP [RFC2616] procedures. In + // particular, the client might perform authentication if it + // receives a 401 status code; the server might redirect the client + // using a 3xx status code (but clients are not required to follow + // them), etc." + // + if(_parser->status() != 101) + { + ostringstream out; + out << "unexpected status value " << _parser->status(); + if(!_parser->reason().empty()) + { + out << ":" << endl << _parser->reason(); + } + throw WebSocketException(out.str()); + } + + // + // "If the response lacks an |Upgrade| header field or the |Upgrade| + // header field contains a value that is not an ASCII case- + // insensitive match for the value "websocket", the client MUST + // _Fail the WebSocket Connection_." + // + if(!_parser->getHeader("Upgrade", val, true)) + { + throw WebSocketException("missing value for Upgrade field"); + } + else if(val != "websocket") + { + throw WebSocketException("invalid value `" + val + "' for Upgrade field"); + } + + // + // "If the response lacks a |Connection| header field or the + // |Connection| header field doesn't contain a token that is an + // ASCII case-insensitive match for the value "Upgrade", the client + // MUST _Fail the WebSocket Connection_." + // + if(!_parser->getHeader("Connection", val, true)) + { + throw WebSocketException("missing value for Connection field"); + } + else if(val.find("upgrade") == string::npos) + { + throw WebSocketException("invalid value `" + val + "' for Connection field"); + } + + // + // "If the response includes a |Sec-WebSocket-Protocol| header field + // and this header field indicates the use of a subprotocol that was + // not present in the client's handshake (the server has indicated a + // subprotocol not requested by the client), the client MUST _Fail + // the WebSocket Connection_." + // + if(_parser->getHeader("Sec-WebSocket-Protocol", val, true) && val != _iceProtocol) + { + throw WebSocketException("invalid value `" + val + "' for WebSocket protocol"); + } + + // + // "If the response lacks a |Sec-WebSocket-Accept| header field or + // the |Sec-WebSocket-Accept| contains a value other than the + // base64-encoded SHA-1 of the concatenation of the |Sec-WebSocket- + // Key| (as a string, not base64-decoded) with the string "258EAFA5- + // E914-47DA-95CA-C5AB0DC85B11" but ignoring any leading and + // trailing whitespace, the client MUST _Fail the WebSocket + // Connection_." + // + if(!_parser->getHeader("Sec-WebSocket-Accept", val, false)) + { + throw WebSocketException("missing value for Sec-WebSocket-Accept"); + } + string input = _key + _wsUUID; + vector<unsigned char> hash; + IceUtil::sha1(reinterpret_cast<const unsigned char*>(&input[0]), input.size(), hash); + if(val != IceInternal::Base64::encode(hash)) + { + throw WebSocketException("invalid value `" + val + "' for Sec-WebSocket-Accept"); + } +} + +bool +IceInternal::WSTransceiver::preRead(Buffer& buf) +{ + while(true) + { + if(_readState == ReadStateOpcode) + { + // + // Is there enough data available to read the opcode? + // + if(!readBuffered(2)) + { + return true; + } + + // + // Most-significant bit indicates whether this is the + // last frame. Least-significant four bits hold the + // opcode. + // + unsigned char ch = static_cast<unsigned char>(*_readI++); + _readLastFrame = (ch & FLAG_FINAL) == FLAG_FINAL; + _readOpCode = ch & 0xf; + + ch = static_cast<unsigned char>(*_readI++); + + // + // Check the MASK bit. Messages sent by a client must be masked; + // messages sent by a server must not be masked. + // + const bool masked = (ch & FLAG_MASKED) == FLAG_MASKED; + if(masked != _incoming) + { + throw ProtocolException(__FILE__, __LINE__, "invalid masking"); + } + + // + // Extract the payload length, which can have the following values: + // + // 0-125: The payload length + // 126: The subsequent two bytes contain the payload length + // 127: The subsequent eight bytes contain the payload length + // + _readPayloadLength = (ch & 0x7f); + if(_readPayloadLength < 126) + { + _readHeaderLength = 0; + } + else if(_readPayloadLength == 126) + { + _readHeaderLength = 2; // Need to read a 16-bit payload length. + } + else + { + _readHeaderLength = 8; // Need to read a 64-bit payload length. + } + if(masked) + { + _readHeaderLength += 4; // Need to read a 32-bit mask. + } + + _readState = ReadStateHeader; + } + + if(_readState == ReadStateHeader) + { + // + // Is there enough data available to read the header? + // + if(_readHeaderLength > 0 && !readBuffered(_readHeaderLength)) + { + return true; + } + + if(_readPayloadLength == 126) + { + _readPayloadLength = static_cast<size_t>(ntohs(*reinterpret_cast<uint16_t*>(_readI))); + _readI += 2; + } + else if(_readPayloadLength == 127) + { + assert(_readPayloadLength == 127); + Long l = ice_nlltoh(_readI); + _readI += 8; + if(l < 0 || l > INT_MAX) + { + ostringstream ostr; + ostr << "invalid WebSocket payload length: " << l; + throw ProtocolException(__FILE__, __LINE__, ostr.str()); + } + _readPayloadLength = static_cast<size_t>(l); + } + + // + // Read the mask if this is an incoming connection. + // + if(_incoming) + { + assert(_readBuffer.i - _readI >= 4); // We must have needed to read the mask. + memcpy(_readMask, _readI, 4); // Copy the mask. + _readI += 4; + } + + switch(_readOpCode) + { + case OP_CONT: // Continuation frame + { + // TODO: Add support for continuation frames? + throw ProtocolException(__FILE__, __LINE__, "continuation frames not supported"); + } + case OP_TEXT: // Text frame + { + throw ProtocolException(__FILE__, __LINE__, "text frames not supported"); + } + case OP_DATA: // Data frame + { + if(_instance->traceLevel() >= 2) + { + Trace out(_instance->logger(), _instance->traceCategory()); + out << "received " << protocol() << " data frame with payload length of " << _readPayloadLength; + out << " bytes\n" << toString(); + } + + if(!_readLastFrame) + { + throw ProtocolException(__FILE__, __LINE__, "continuation frames not supported"); + } + if(_readPayloadLength <= 0) + { + throw ProtocolException(__FILE__, __LINE__, "payload length is 0"); + } + _readState = ReadStatePayload; + break; + } + case OP_CLOSE: // Connection close + { + if(_instance->traceLevel() >= 2) + { + Trace out(_instance->logger(), _instance->traceCategory()); + out << "received " << protocol() << " connection close frame\n" << toString(); + } + + State s = _nextState == StateOpened ? _state : _nextState; + if(s == StateClosingRequestPending) + { + // + // If we receive a close frame while we were actually + // waiting to send one, change the role and send a + // close frame response. + // + if(!_closingInitiator) + { + _closingInitiator = true; + } + if(_state == StateClosingRequestPending) + { + _state = StateClosingResponsePending; + } + else + { + _nextState = StateClosingResponsePending; + } + return false; // No longer interested in reading + } + else + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; + } + } + case OP_PING: + { + if(_instance->traceLevel() >= 2) + { + Trace out(_instance->logger(), _instance->traceCategory()); + out << "received " << protocol() << " connection ping frame\n" << toString(); + } + _readState = ReadStateControlFrame; + break; + } + case OP_PONG: // Pong + { + if(_instance->traceLevel() >= 2) + { + Trace out(_instance->logger(), _instance->traceCategory()); + out << "received " << protocol() << " connection pong frame\n" << toString(); + } + _readState = ReadStateControlFrame; + break; + } + default: + { + ostringstream ostr; + ostr << "unsupported opcode: " << _readOpCode; + throw ProtocolException(__FILE__, __LINE__, ostr.str()); + } + } + } + + if(_readState == ReadStateControlFrame) + { + if(_readPayloadLength > 0 && !readBuffered(_readPayloadLength)) + { + return true; + } + + if(_readPayloadLength > 0 && _readOpCode == OP_PING) + { + _pingPayload.clear(); + _pingPayload.resize(_readPayloadLength); + memcpy(&_pingPayload[0], _readI, _pingPayload.size()); + } + + _readI += _readPayloadLength; + _readPayloadLength = 0; + + if(_readOpCode == OP_PING) + { + if(_state == StateOpened) + { + _state = StatePongPending; // Send pong frame now + } + else if(_nextState < StatePongPending) + { + _nextState = StatePongPending; // Send pong frame next + } + } + + // + // We've read the payload of the PING/PONG frame, we're ready + // to read a new frame. + // + _readState = ReadStateOpcode; + } + + if(_readState == ReadStatePayload) + { + // + // This must be assigned before the check for the buffer. If the buffer is empty + // or already read, postRead will return false. + // + _readStart = buf.i; + + if(buf.b.empty() || buf.i == buf.b.end()) + { + return false; + } + + if(_readI < _readBuffer.i) + { + size_t n = min(_readBuffer.i - _readI, buf.b.end() - buf.i); + memcpy(buf.i, _readI, n); + buf.i += n; + _readI += n; + } + + // + // Continue reading if we didn't read the full message, otherwise give back + // the control to the connection + // + return buf.i < buf.b.end(); + } + } +} + +bool +IceInternal::WSTransceiver::postRead(Buffer& buf) +{ + if(_readState != ReadStatePayload) + { + return _readStart < _readBuffer.i; // Returns true if data was read. + } + + if(_readStart == buf.i) + { + return false; // Nothing was read or nothing to read. + } + assert(_readStart < buf.i); + + if(_incoming) + { + // + // Unmask the data we just read. + // + IceInternal::Buffer::Container::iterator p = _readStart; + for(size_t n = _readStart - buf.b.begin(); p < buf.i; ++p, ++n) + { + *p ^= _readMask[n % 4]; + } + } + + _readPayloadLength -= buf.i - _readStart; + _readStart = buf.i; + if(_readPayloadLength == 0) + { + // + // We've read the complete payload, we're ready to read a new frame. + // + _readState = ReadStateOpcode; + } + return buf.i != buf.b.end(); +} + +bool +IceInternal::WSTransceiver::preWrite(Buffer& buf) +{ + if(_writeState == WriteStateHeader) + { + if(_state == StateOpened) + { + if(buf.b.empty() || buf.i == buf.b.end()) + { + return false; + } + + assert(buf.i = buf.b.begin()); + prepareWriteHeader(OP_DATA, buf.b.size()); + + // + // For server connections, we use the _writeBuffer only to + // write the header, the message is sent directly from the + // message buffer. For client connections, we use the write + // buffer for both the header and message buffer since we need + // to mask the message data. + // + if(_incoming) + { + _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin()); + _writeBuffer.i = _writeBuffer.b.begin(); + } + _writeState = WriteStatePayload; + } + else if(_state == StatePingPending) + { + prepareWriteHeader(OP_PING, 0); // Don't send any payload + + _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin()); + _writeState = WriteStateControlFrame; + _writeBuffer.i = _writeBuffer.b.begin(); + } + else if(_state == StatePongPending) + { + prepareWriteHeader(OP_PONG, _pingPayload.size()); + if(_pingPayload.size() > static_cast<size_t>(_writeBuffer.b.end() - _writeBuffer.i)) + { + size_t pos = _writeBuffer.i - _writeBuffer.b.begin(); + _writeBuffer.b.resize(pos + _pingPayload.size()); + _writeBuffer.i = _writeBuffer.b.begin() + pos; + } + memcpy(_writeBuffer.i, &_pingPayload[0], _pingPayload.size()); + _writeBuffer.i += _pingPayload.size(); + _pingPayload.clear(); + + _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin()); + _writeState = WriteStateControlFrame; + _writeBuffer.i = _writeBuffer.b.begin(); + } + else if((_state == StateClosingRequestPending && !_closingInitiator) || + (_state == StateClosingResponsePending && _closingInitiator)) + { + prepareWriteHeader(OP_CLOSE, 2); + + // Write closing reason + *reinterpret_cast<uint16_t*>(_writeBuffer.i) = htons(static_cast<uint16_t>(_closingReason)); + if(!_incoming) + { + *_writeBuffer.i++ ^= _writeMask[0]; + *_writeBuffer.i++ ^= _writeMask[1]; + } + else + { + _writeBuffer.i += 2; + } + + _writeState = WriteStateControlFrame; + _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin()); + _writeBuffer.i = _writeBuffer.b.begin(); + } + else + { + assert(_state != StateClosed); + return false; // Nothing to write in this state + } + + _writePayloadLength = 0; + } + + if(_writeState == WriteStatePayload) + { + // + // For an outgoing connection, each message must be masked with a random + // 32-bit value, so we copy the entire message into the internal buffer + // for writing. + // + // For an incoming connection, we use the internal buffer to hold the + // frame header, and then write the caller's buffer to avoid copying. + // + if(!_incoming) + { + if((_writePayloadLength == 0 || _writeBuffer.i == _writeBuffer.b.end())) + { + if(_writeBuffer.i == _writeBuffer.b.end()) + { + _writeBuffer.i = _writeBuffer.b.begin(); + } + + size_t n = buf.i - buf.b.begin(); + for(; n < buf.b.size() && _writeBuffer.i < _writeBuffer.b.end(); ++_writeBuffer.i, ++n) + { + *_writeBuffer.i = buf.b[n] ^ _writeMask[n % 4]; + } + _writePayloadLength = n; + + if(_writeBuffer.i < _writeBuffer.b.end()) + { + _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin()); + } + _writeBuffer.i = _writeBuffer.b.begin(); + } + } + return true; + } + else + { + return _writeBuffer.i < _writeBuffer.b.end(); + } +} + +bool +IceInternal::WSTransceiver::postWrite(Buffer& buf) +{ + if(_state > StateOpened && _writeState == WriteStateControlFrame) + { + if(_writeBuffer.i == _writeBuffer.b.end()) + { + if(_state == StatePingPending) + { + if(_instance->traceLevel() >= 2) + { + Trace out(_instance->logger(), _instance->traceCategory()); + out << "sent " << protocol() << " connection ping frame\n" << toString(); + } + } + else if(_state == StatePongPending) + { + if(_instance->traceLevel() >= 2) + { + Trace out(_instance->logger(), _instance->traceCategory()); + out << "sent " << protocol() << " connection pong frame\n" << toString(); + } + } + else if((_state == StateClosingRequestPending && !_closingInitiator) || + (_state == StateClosingResponsePending && _closingInitiator)) + { + if(_instance->traceLevel() >= 2) + { + Trace out(_instance->logger(), _instance->traceCategory()); + out << "sent " << protocol() << " connection close frame\n" << toString(); + } + + if(_state == StateClosingRequestPending && !_closingInitiator) + { + _writeState = WriteStateHeader; + _state = StateClosingResponsePending; + return false; + } + else + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; + } + } + else if(_state == StateClosed) + { + return false; + } + + _state = _nextState; + _nextState = StateOpened; + _writeState = WriteStateHeader; + } + else + { + return true; + } + } + + if(!_incoming && _writePayloadLength > 0) + { + if(_writeBuffer.i == _writeBuffer.b.end()) + { + buf.i = buf.b.begin() + _writePayloadLength; + } + } + + if(buf.b.empty() || buf.i == buf.b.end()) + { + _writeState = WriteStateHeader; + if(_state == StatePingPending || + _state == StatePongPending || + (_state == StateClosingRequestPending && !_closingInitiator) || + (_state == StateClosingResponsePending && _closingInitiator)) + { + return true; + } + } + else if(_state == StateOpened) + { + return true; + } + return false; +} + +bool +IceInternal::WSTransceiver::readBuffered(IceInternal::Buffer::Container::size_type sz) +{ + if(_readI == _readBuffer.i) + { + _readBuffer.b.resize(_readBufferSize); + _readI = _readBuffer.i = _readBuffer.b.begin(); + } + else + { + IceInternal::Buffer::Container::size_type available = _readBuffer.i - _readI; + if(available < sz) + { + if(_readI != &_readBuffer.b[0]) + { + memmove(&_readBuffer.b[0], _readI, available); + } + _readBuffer.b.resize(max(_readBufferSize, sz)); + _readI = _readBuffer.b.begin(); + _readBuffer.i = _readI + available; + } + } + + _readStart = _readBuffer.i; + if(_readI + sz > _readBuffer.i) + { + return false; // Not enough read. + } + assert(_readBuffer.i > _readI); + return true; +} + +void +IceInternal::WSTransceiver::prepareWriteHeader(Byte opCode, IceInternal::Buffer::Container::size_type payloadLength) +{ + // + // We need to prepare the frame header. + // + _writeBuffer.b.resize(_writeBufferSize); + _writeBuffer.i = _writeBuffer.b.begin(); + + // + // Set the opcode - this is the one and only data frame. + // + *_writeBuffer.i++ = static_cast<Byte>(opCode | FLAG_FINAL); + + // + // Set the payload length. + // + if(payloadLength <= 125) + { + *_writeBuffer.i++ = static_cast<Byte>(payloadLength); + } + else if(payloadLength > 125 && payloadLength <= USHRT_MAX) + { + // + // Use an extra 16 bits to encode the payload length. + // + *_writeBuffer.i++ = static_cast<Byte>(126); + *reinterpret_cast<uint16_t*>(_writeBuffer.i) = htons(static_cast<uint16_t>(payloadLength)); + _writeBuffer.i += 2; + } + else if(payloadLength > USHRT_MAX) + { + // + // Use an extra 64 bits to encode the payload length. + // + *_writeBuffer.i++ = static_cast<Byte>(127); + ice_htonll(payloadLength, _writeBuffer.i); + _writeBuffer.i += 8; + } + + if(!_incoming) + { + // + // Add a random 32-bit mask to every outgoing frame, copy the payload data, + // and apply the mask. + // + _writeBuffer.b[1] |= FLAG_MASKED; + IceUtilInternal::generateRandom(reinterpret_cast<char*>(_writeMask), sizeof(_writeMask)); + memcpy(_writeBuffer.i, _writeMask, sizeof(_writeMask)); + _writeBuffer.i += sizeof(_writeMask); + } +} + |