diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 135 |
1 files changed, 37 insertions, 98 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 839567b6f5f..5c71af33857 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -72,7 +72,7 @@ public: _servantManager(servantManager), _adapter(adapter), _outAsync(outAsync), - _stream(stream.instance()) + _stream(stream.instance(), currentProtocolEncoding) { _stream.swap(stream); } @@ -164,7 +164,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) } assert(str); - stream = new BasicStream(str->instance()); + stream = new BasicStream(str->instance(), currentProtocolEncoding); stream->swap(*str); adopted = true; } @@ -750,7 +750,7 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) // // Reset the batch. // - BasicStream dummy(_instance.get(), _batchAutoFlush); + BasicStream dummy(_instance.get(), currentProtocolEncoding, _batchAutoFlush); _batchStream.swap(dummy); _batchRequestNum = 0; _batchRequestCompress = false; @@ -805,7 +805,7 @@ Ice::ConnectionI::abortBatchRequest() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - BasicStream dummy(_instance.get(), _batchAutoFlush); + BasicStream dummy(_instance.get(), currentProtocolEncoding, _batchAutoFlush); _batchStream.swap(dummy); _batchRequestNum = 0; _batchRequestCompress = false; @@ -922,7 +922,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) // // Reset the batch stream. // - BasicStream dummy(_instance.get(), _batchAutoFlush); + BasicStream dummy(_instance.get(), Ice::currentProtocolEncoding, _batchAutoFlush); _batchStream.swap(dummy); _batchRequestNum = 0; _batchRequestCompress = false; @@ -984,7 +984,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) // // Reset the batch stream. // - BasicStream dummy(_instance.get(), _batchAutoFlush); + BasicStream dummy(_instance.get(), Ice::currentProtocolEncoding, _batchAutoFlush); _batchStream.swap(dummy); _batchRequestNum = 0; _batchRequestCompress = false; @@ -1256,34 +1256,13 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic)); throw ex; } - Byte pMajor; - Byte pMinor; - _readStream.read(pMajor); - _readStream.read(pMinor); - if(pMajor != protocolMajor - || static_cast<unsigned char>(pMinor) > static_cast<unsigned char>(protocolMinor)) - { - UnsupportedProtocolException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(pMajor); - ex.badMinor = static_cast<unsigned char>(pMinor); - ex.major = static_cast<unsigned char>(protocolMajor); - ex.minor = static_cast<unsigned char>(protocolMinor); - throw ex; - } - Byte eMajor; - Byte eMinor; - _readStream.read(eMajor); - _readStream.read(eMinor); - if(eMajor != encodingMajor - || static_cast<unsigned char>(eMinor) > static_cast<unsigned char>(encodingMinor)) - { - UnsupportedEncodingException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(eMajor); - ex.badMinor = static_cast<unsigned char>(eMinor); - ex.major = static_cast<unsigned char>(encodingMajor); - ex.minor = static_cast<unsigned char>(encodingMinor); - throw ex; - } + ProtocolVersion pv; + pv.__read(&_readStream); + checkSupportedProtocol(pv); + EncodingVersion ev; + ev.__read(&_readStream); + checkSupportedProtocolEncoding(ev); + Byte messageType; _readStream.read(messageType); Byte compress; @@ -1782,14 +1761,14 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _asyncRequestsHint(_asyncRequests.end()), _batchAutoFlush( _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0), - _batchStream(_instance.get(), _batchAutoFlush), + _batchStream(_instance.get(), Ice::currentProtocolEncoding, _batchAutoFlush), _batchStreamInUse(false), _batchRequestNum(0), _batchRequestCompress(false), _batchMarker(0), - _readStream(_instance.get()), + _readStream(_instance.get(), Ice::currentProtocolEncoding), _readHeader(false), - _writeStream(_instance.get()), + _writeStream(_instance.get(), Ice::currentProtocolEncoding), _dispatchCount(0), _state(StateNotInitialized), _shutdownInitiated(false) @@ -2087,15 +2066,13 @@ Ice::ConnectionI::initiateShutdown() // // Before we shut down, we send a close connection message. // - BasicStream os(_instance.get()); + BasicStream os(_instance.get(), Ice::currentProtocolEncoding); os.write(magic[0]); os.write(magic[1]); os.write(magic[2]); os.write(magic[3]); - os.write(protocolMajor); - os.write(protocolMinor); - os.write(encodingMajor); - os.write(encodingMinor); + currentProtocol.__write(&os); + currentProtocolEncoding.__write(&os); os.write(closeConnectionMsg); os.write((Byte)1); // compression status: compression supported but not used. os.write(headerSize); // Message size. @@ -2155,10 +2132,8 @@ Ice::ConnectionI::validate(SocketOperation operation) _writeStream.write(magic[1]); _writeStream.write(magic[2]); _writeStream.write(magic[3]); - _writeStream.write(protocolMajor); - _writeStream.write(protocolMinor); - _writeStream.write(encodingMajor); - _writeStream.write(encodingMinor); + currentProtocol.__write(&_writeStream); + currentProtocolEncoding.__write(&_writeStream); _writeStream.write(validateConnectionMsg); _writeStream.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection). _writeStream.write(headerSize); // Message size. @@ -2201,32 +2176,12 @@ Ice::ConnectionI::validate(SocketOperation operation) ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic)); throw ex; } - Byte pMajor; - Byte pMinor; - _readStream.read(pMajor); - _readStream.read(pMinor); - if(pMajor != protocolMajor) - { - UnsupportedProtocolException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(pMajor); - ex.badMinor = static_cast<unsigned char>(pMinor); - ex.major = static_cast<unsigned char>(protocolMajor); - ex.minor = static_cast<unsigned char>(protocolMinor); - throw ex; - } - Byte eMajor; - Byte eMinor; - _readStream.read(eMajor); - _readStream.read(eMinor); - if(eMajor != encodingMajor) - { - UnsupportedEncodingException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(eMajor); - ex.badMinor = static_cast<unsigned char>(eMinor); - ex.major = static_cast<unsigned char>(encodingMajor); - ex.minor = static_cast<unsigned char>(encodingMinor); - throw ex; - } + ProtocolVersion pv; + pv.__read(&_readStream); + checkSupportedProtocol(pv); + EncodingVersion ev; + ev.__read(&_readStream); + checkSupportedProtocolEncoding(ev); Byte messageType; _readStream.read(messageType); if(messageType != validateConnectionMsg) @@ -2299,7 +2254,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb // // Do compression. // - BasicStream stream(_instance.get()); + BasicStream stream(_instance.get(), Ice::currentProtocolEncoding); doCompress(*message->stream, stream); if(message->outAsync) @@ -2408,7 +2363,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // // Do compression. // - BasicStream stream(_instance.get()); + BasicStream stream(_instance.get(), Ice::currentProtocolEncoding); doCompress(*message.stream, stream); stream.i = stream.b.begin(); @@ -2674,7 +2629,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request stream.read(compress); if(compress == 2) { - BasicStream ustream(_instance.get()); + BasicStream ustream(_instance.get(), Ice::currentProtocolEncoding); doUncompress(stream, ustream); stream.b.swap(ustream.b); } @@ -2872,35 +2827,19 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B // Prepare the invocation. // bool response = !_endpoint->datagram() && requestId != 0; + assert(!response || invokeNum == 1); + Incoming in(_instance.get(), this, adapter, response, compress, requestId); - BasicStream* is = in.is(); - stream.swap(*is); - BasicStream* os = in.os(); // - // Prepare the response if necessary. + // Dispatch the invocation. // - if(response) - { - assert(invokeNum == 1); // No further invocations if a response is expected. - os->writeBlob(replyHdr, sizeof(replyHdr)); - - // - // Add the request ID. - // - os->write(requestId); - } + in.invoke(servantManager, &stream); - in.invoke(servantManager); - - // - // If there are more invocations, we need the stream back. - // - if(--invokeNum > 0) - { - stream.swap(*is); - } + --invokeNum; } + + stream.clear(); } catch(const LocalException& ex) { |