diff options
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 109 |
1 files changed, 35 insertions, 74 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 042253209b2..1e8d9020fd9 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -188,6 +188,7 @@ IceInternal::Connection::validate() os.write(encodingMajor); os.write(encodingMinor); os.write(validateConnectionMsg); + os.write((Byte)1); // Compression status. os.write(headerSize); // Message size. os.i = os.b.begin(); traceHeader("sending validate connection", os, _logger, _traceLevels); @@ -196,7 +197,7 @@ IceInternal::Connection::validate() else { // - // Outgoing connection play the passive role with respect to + // Outgoing connections play the passive role with respect to // connection validation. // BasicStream is(_instance.get()); @@ -269,6 +270,9 @@ IceInternal::Connection::validate() throw ConnectionNotValidatedException(__FILE__, __LINE__); } + Byte compress; + is.read(compress); + Int size; is.read(size); if(size != headerSize) @@ -392,21 +396,21 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) if(compress) { // - // Change message type. + // Set compression status. // - os->b[8] = compressedRequestMsg; + os->b[9] = 2; // Message is compressed. // // Do compression. // BasicStream cstream(_instance.get()); doCompress(*os, cstream); - + // // Send the request. // os->i = os->b.begin(); - traceRequest("sending compressed request", *os, _logger, _traceLevels); + traceRequest("sending request", *os, _logger, _traceLevels); cstream.i = cstream.b.begin(); _transceiver->write(cstream, _endpoint->timeout()); } @@ -418,7 +422,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) const Byte* p; Int sz = os->b.size(); p = reinterpret_cast<const Byte*>(&sz); - copy(p, p + sizeof(Int), os->b.begin() + 9); + copy(p, p + sizeof(Int), os->b.begin() + 10); // // Send the request. @@ -493,9 +497,9 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) if(compress) { // - // Change message type. + // Set compression status. // - os->b[8] = compressedRequestMsg; + os->b[9] = 2; // Message is compressed. // // Do compression. @@ -507,7 +511,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) // Send the request. // os->i = os->b.begin(); - traceRequest("sending compressed asynchronous request", *os, _logger, _traceLevels); + traceRequest("sending asynchronous request", *os, _logger, _traceLevels); cstream.i = cstream.b.begin(); _transceiver->write(cstream, _endpoint->timeout()); } @@ -519,7 +523,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) const Byte* p; Int sz = os->b.size(); p = reinterpret_cast<const Byte*>(&sz); - copy(p, p + sizeof(Int), os->b.begin() + 9); + copy(p, p + sizeof(Int), os->b.begin() + 10); // // Send the request. @@ -643,9 +647,9 @@ IceInternal::Connection::flushBatchRequest() if(compress) { // - // Change message type. + // Set compression status. // - _batchStream.b[8] = compressedRequestBatchMsg; + _batchStream.b[9] = 2; // Message is compressed. // // Do compression. @@ -657,7 +661,7 @@ IceInternal::Connection::flushBatchRequest() // Send the batch request. // _batchStream.i = _batchStream.b.begin(); - traceBatchRequest("sending compressed batch request", _batchStream, _logger, _traceLevels); + traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); cstream.i = cstream.b.begin(); _transceiver->write(cstream, _endpoint->timeout()); } @@ -669,7 +673,7 @@ IceInternal::Connection::flushBatchRequest() const Byte* p; Int sz = _batchStream.b.size(); p = reinterpret_cast<const Byte*>(&sz); - copy(p, p + sizeof(Int), _batchStream.b.begin() + 9); + copy(p, p + sizeof(Int), _batchStream.b.begin() + 10); // // Send the batch request. @@ -702,7 +706,7 @@ IceInternal::Connection::flushBatchRequest() } void -IceInternal::Connection::sendResponse(BasicStream* os) +IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag) { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); @@ -725,16 +729,16 @@ IceInternal::Connection::sendResponse(BasicStream* os) } else { - compress = _endpoint->compress(); + compress = _endpoint->compress() && compressFlag > 0; } if(compress) { // - // Change message type. + // Set compression status. // - os->b[8] = compressedReplyMsg; - + os->b[9] = 2; // Message is compressed. + // // Do compression. // @@ -745,7 +749,7 @@ IceInternal::Connection::sendResponse(BasicStream* os) // Send the reply. // os->i = os->b.begin(); - traceReply("sending compressed reply", *os, _logger, _traceLevels); + traceReply("sending reply", *os, _logger, _traceLevels); cstream.i = cstream.b.begin(); _transceiver->write(cstream, _endpoint->timeout()); } @@ -757,7 +761,7 @@ IceInternal::Connection::sendResponse(BasicStream* os) const Byte* p; Int sz = os->b.size(); p = reinterpret_cast<const Byte*>(&sz); - copy(p, p + sizeof(Int), os->b.begin() + 9); + copy(p, p + sizeof(Int), os->b.begin() + 10); // // Send the reply. @@ -895,6 +899,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa Int invoke = 0; Int requestId = 0; + Byte compress = 0; { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); @@ -961,9 +966,8 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa // // Uncompress if necessary. // - if(messageType == compressedRequestMsg || - messageType == compressedRequestBatchMsg || - messageType == compressedReplyMsg) + stream.read(compress); + if(compress == 2) { BasicStream ustream(_instance.get()); doUncompress(stream, ustream); @@ -992,24 +996,6 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa break; } - case compressedRequestMsg: - { - if(_state == StateClosing) - { - traceRequest("received compressed request during closing\n" - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); - } - else - { - traceRequest("received compressed request", stream, _logger, _traceLevels); - stream.read(requestId); - invoke = 1; - ++_dispatchCount; - } - break; - } - case requestBatchMsg: { if(_state == StateClosing) @@ -1031,38 +1017,9 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa break; } - case compressedRequestBatchMsg: - { - if(_state == StateClosing) - { - traceBatchRequest("received compressed batch request during closing\n" - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); - } - else - { - traceBatchRequest("received compressed batch request", stream, _logger, _traceLevels); - stream.read(invoke); - if(invoke < 0) - { - throw NegativeSizeException(__FILE__, __LINE__); - } - _dispatchCount += invoke; - } - break; - } - case replyMsg: - case compressedReplyMsg: { - if(messageType == compressedReplyMsg) - { - traceReply("received compressed reply", stream, _logger, _traceLevels); - } - else - { - traceReply("received reply", stream, _logger, _traceLevels); - } + traceReply("received reply", stream, _logger, _traceLevels); stream.read(requestId); @@ -1216,7 +1173,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa // Prepare the invocation. // bool response = !_endpoint->datagram() && requestId != 0; - Incoming in(_instance.get(), this, _adapter, response); + Incoming in(_instance.get(), this, _adapter, response, compress); BasicStream* is = in.is(); stream.swap(*is); BasicStream* os = in.os(); @@ -1350,6 +1307,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance, requestHdr[6] = encodingMajor; requestHdr[7] = encodingMinor; requestHdr[8] = requestMsg; + requestHdr[9] = 1; // Default compression status: compression supported but not used. vector<Byte>& requestBatchHdr = const_cast<vector<Byte>&>(_requestBatchHdr); requestBatchHdr[0] = magic[0]; @@ -1361,6 +1319,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance, requestBatchHdr[6] = encodingMajor; requestBatchHdr[7] = encodingMinor; requestBatchHdr[8] = requestBatchMsg; + requestBatchHdr[9] = 1; // Default compression status: compression supported but not used. vector<Byte>& replyHdr = const_cast<vector<Byte>&>(_replyHdr); replyHdr[0] = magic[0]; @@ -1372,6 +1331,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance, replyHdr[6] = encodingMajor; replyHdr[7] = encodingMinor; replyHdr[8] = replyMsg; + replyHdr[9] = 1; // Default compression status: compression supported but not used. } IceInternal::Connection::~Connection() @@ -1551,6 +1511,7 @@ IceInternal::Connection::initiateShutdown() const os.write(encodingMajor); os.write(encodingMinor); os.write(closeConnectionMsg); + os.write((Byte)1); // Compression status: compression supported but not used. os.write(headerSize); // Message size. os.i = os.b.begin(); traceHeader("sending close connection", os, _logger, _traceLevels); @@ -1697,7 +1658,7 @@ IceInternal::Connection::doCompress(BasicStream& uncompressed, BasicStream& comp // Int compressedSize = compressed.b.size(); p = reinterpret_cast<const Byte*>(&compressedSize); - copy(p, p + sizeof(Int), uncompressed.b.begin() + 9); + copy(p, p + sizeof(Int), uncompressed.b.begin() + 10); // // Add the size of the uncompressed stream before the message body |