summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp135
1 files changed, 37 insertions, 98 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 839567b6f5f..5c71af33857 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -72,7 +72,7 @@ public:
_servantManager(servantManager),
_adapter(adapter),
_outAsync(outAsync),
- _stream(stream.instance())
+ _stream(stream.instance(), currentProtocolEncoding)
{
_stream.swap(stream);
}
@@ -164,7 +164,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
}
assert(str);
- stream = new BasicStream(str->instance());
+ stream = new BasicStream(str->instance(), currentProtocolEncoding);
stream->swap(*str);
adopted = true;
}
@@ -750,7 +750,7 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
//
// Reset the batch.
//
- BasicStream dummy(_instance.get(), _batchAutoFlush);
+ BasicStream dummy(_instance.get(), currentProtocolEncoding, _batchAutoFlush);
_batchStream.swap(dummy);
_batchRequestNum = 0;
_batchRequestCompress = false;
@@ -805,7 +805,7 @@ Ice::ConnectionI::abortBatchRequest()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- BasicStream dummy(_instance.get(), _batchAutoFlush);
+ BasicStream dummy(_instance.get(), currentProtocolEncoding, _batchAutoFlush);
_batchStream.swap(dummy);
_batchRequestNum = 0;
_batchRequestCompress = false;
@@ -922,7 +922,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
//
// Reset the batch stream.
//
- BasicStream dummy(_instance.get(), _batchAutoFlush);
+ BasicStream dummy(_instance.get(), Ice::currentProtocolEncoding, _batchAutoFlush);
_batchStream.swap(dummy);
_batchRequestNum = 0;
_batchRequestCompress = false;
@@ -984,7 +984,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
//
// Reset the batch stream.
//
- BasicStream dummy(_instance.get(), _batchAutoFlush);
+ BasicStream dummy(_instance.get(), Ice::currentProtocolEncoding, _batchAutoFlush);
_batchStream.swap(dummy);
_batchRequestNum = 0;
_batchRequestCompress = false;
@@ -1256,34 +1256,13 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic));
throw ex;
}
- Byte pMajor;
- Byte pMinor;
- _readStream.read(pMajor);
- _readStream.read(pMinor);
- if(pMajor != protocolMajor
- || static_cast<unsigned char>(pMinor) > static_cast<unsigned char>(protocolMinor))
- {
- UnsupportedProtocolException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(pMajor);
- ex.badMinor = static_cast<unsigned char>(pMinor);
- ex.major = static_cast<unsigned char>(protocolMajor);
- ex.minor = static_cast<unsigned char>(protocolMinor);
- throw ex;
- }
- Byte eMajor;
- Byte eMinor;
- _readStream.read(eMajor);
- _readStream.read(eMinor);
- if(eMajor != encodingMajor
- || static_cast<unsigned char>(eMinor) > static_cast<unsigned char>(encodingMinor))
- {
- UnsupportedEncodingException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(eMajor);
- ex.badMinor = static_cast<unsigned char>(eMinor);
- ex.major = static_cast<unsigned char>(encodingMajor);
- ex.minor = static_cast<unsigned char>(encodingMinor);
- throw ex;
- }
+ ProtocolVersion pv;
+ pv.__read(&_readStream);
+ checkSupportedProtocol(pv);
+ EncodingVersion ev;
+ ev.__read(&_readStream);
+ checkSupportedProtocolEncoding(ev);
+
Byte messageType;
_readStream.read(messageType);
Byte compress;
@@ -1782,14 +1761,14 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
_asyncRequestsHint(_asyncRequests.end()),
_batchAutoFlush(
_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0),
- _batchStream(_instance.get(), _batchAutoFlush),
+ _batchStream(_instance.get(), Ice::currentProtocolEncoding, _batchAutoFlush),
_batchStreamInUse(false),
_batchRequestNum(0),
_batchRequestCompress(false),
_batchMarker(0),
- _readStream(_instance.get()),
+ _readStream(_instance.get(), Ice::currentProtocolEncoding),
_readHeader(false),
- _writeStream(_instance.get()),
+ _writeStream(_instance.get(), Ice::currentProtocolEncoding),
_dispatchCount(0),
_state(StateNotInitialized),
_shutdownInitiated(false)
@@ -2087,15 +2066,13 @@ Ice::ConnectionI::initiateShutdown()
//
// Before we shut down, we send a close connection message.
//
- BasicStream os(_instance.get());
+ BasicStream os(_instance.get(), Ice::currentProtocolEncoding);
os.write(magic[0]);
os.write(magic[1]);
os.write(magic[2]);
os.write(magic[3]);
- os.write(protocolMajor);
- os.write(protocolMinor);
- os.write(encodingMajor);
- os.write(encodingMinor);
+ currentProtocol.__write(&os);
+ currentProtocolEncoding.__write(&os);
os.write(closeConnectionMsg);
os.write((Byte)1); // compression status: compression supported but not used.
os.write(headerSize); // Message size.
@@ -2155,10 +2132,8 @@ Ice::ConnectionI::validate(SocketOperation operation)
_writeStream.write(magic[1]);
_writeStream.write(magic[2]);
_writeStream.write(magic[3]);
- _writeStream.write(protocolMajor);
- _writeStream.write(protocolMinor);
- _writeStream.write(encodingMajor);
- _writeStream.write(encodingMinor);
+ currentProtocol.__write(&_writeStream);
+ currentProtocolEncoding.__write(&_writeStream);
_writeStream.write(validateConnectionMsg);
_writeStream.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection).
_writeStream.write(headerSize); // Message size.
@@ -2201,32 +2176,12 @@ Ice::ConnectionI::validate(SocketOperation operation)
ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic));
throw ex;
}
- Byte pMajor;
- Byte pMinor;
- _readStream.read(pMajor);
- _readStream.read(pMinor);
- if(pMajor != protocolMajor)
- {
- UnsupportedProtocolException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(pMajor);
- ex.badMinor = static_cast<unsigned char>(pMinor);
- ex.major = static_cast<unsigned char>(protocolMajor);
- ex.minor = static_cast<unsigned char>(protocolMinor);
- throw ex;
- }
- Byte eMajor;
- Byte eMinor;
- _readStream.read(eMajor);
- _readStream.read(eMinor);
- if(eMajor != encodingMajor)
- {
- UnsupportedEncodingException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(eMajor);
- ex.badMinor = static_cast<unsigned char>(eMinor);
- ex.major = static_cast<unsigned char>(encodingMajor);
- ex.minor = static_cast<unsigned char>(encodingMinor);
- throw ex;
- }
+ ProtocolVersion pv;
+ pv.__read(&_readStream);
+ checkSupportedProtocol(pv);
+ EncodingVersion ev;
+ ev.__read(&_readStream);
+ checkSupportedProtocolEncoding(ev);
Byte messageType;
_readStream.read(messageType);
if(messageType != validateConnectionMsg)
@@ -2299,7 +2254,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb
//
// Do compression.
//
- BasicStream stream(_instance.get());
+ BasicStream stream(_instance.get(), Ice::currentProtocolEncoding);
doCompress(*message->stream, stream);
if(message->outAsync)
@@ -2408,7 +2363,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
//
// Do compression.
//
- BasicStream stream(_instance.get());
+ BasicStream stream(_instance.get(), Ice::currentProtocolEncoding);
doCompress(*message.stream, stream);
stream.i = stream.b.begin();
@@ -2674,7 +2629,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
stream.read(compress);
if(compress == 2)
{
- BasicStream ustream(_instance.get());
+ BasicStream ustream(_instance.get(), Ice::currentProtocolEncoding);
doUncompress(stream, ustream);
stream.b.swap(ustream.b);
}
@@ -2872,35 +2827,19 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B
// Prepare the invocation.
//
bool response = !_endpoint->datagram() && requestId != 0;
+ assert(!response || invokeNum == 1);
+
Incoming in(_instance.get(), this, adapter, response, compress, requestId);
- BasicStream* is = in.is();
- stream.swap(*is);
- BasicStream* os = in.os();
//
- // Prepare the response if necessary.
+ // Dispatch the invocation.
//
- if(response)
- {
- assert(invokeNum == 1); // No further invocations if a response is expected.
- os->writeBlob(replyHdr, sizeof(replyHdr));
-
- //
- // Add the request ID.
- //
- os->write(requestId);
- }
+ in.invoke(servantManager, &stream);
- in.invoke(servantManager);
-
- //
- // If there are more invocations, we need the stream back.
- //
- if(--invokeNum > 0)
- {
- stream.swap(*is);
- }
+ --invokeNum;
}
+
+ stream.clear();
}
catch(const LocalException& ex)
{