summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
authorMark Spruiell <mes@zeroc.com>2003-03-14 22:53:03 +0000
committerMark Spruiell <mes@zeroc.com>2003-03-14 22:53:03 +0000
commit8da44c34ac9a2b7efa6b62da46e4136647fcc570 (patch)
tree4057933ee1b6af0f395a482c5c8ace56ddc12e20 /cpp/src/Ice/Connection.cpp
parentbug fix (diff)
downloadice-8da44c34ac9a2b7efa6b62da46e4136647fcc570.tar.bz2
ice-8da44c34ac9a2b7efa6b62da46e4136647fcc570.tar.xz
ice-8da44c34ac9a2b7efa6b62da46e4136647fcc570.zip
remove compressed message types; modify message header to include
compression status
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp109
1 files changed, 35 insertions, 74 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 042253209b2..1e8d9020fd9 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -188,6 +188,7 @@ IceInternal::Connection::validate()
os.write(encodingMajor);
os.write(encodingMinor);
os.write(validateConnectionMsg);
+ os.write((Byte)1); // Compression status.
os.write(headerSize); // Message size.
os.i = os.b.begin();
traceHeader("sending validate connection", os, _logger, _traceLevels);
@@ -196,7 +197,7 @@ IceInternal::Connection::validate()
else
{
//
- // Outgoing connection play the passive role with respect to
+ // Outgoing connections play the passive role with respect to
// connection validation.
//
BasicStream is(_instance.get());
@@ -269,6 +270,9 @@ IceInternal::Connection::validate()
throw ConnectionNotValidatedException(__FILE__, __LINE__);
}
+ Byte compress;
+ is.read(compress);
+
Int size;
is.read(size);
if(size != headerSize)
@@ -392,21 +396,21 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
if(compress)
{
//
- // Change message type.
+ // Set compression status.
//
- os->b[8] = compressedRequestMsg;
+ os->b[9] = 2; // Message is compressed.
//
// Do compression.
//
BasicStream cstream(_instance.get());
doCompress(*os, cstream);
-
+
//
// Send the request.
//
os->i = os->b.begin();
- traceRequest("sending compressed request", *os, _logger, _traceLevels);
+ traceRequest("sending request", *os, _logger, _traceLevels);
cstream.i = cstream.b.begin();
_transceiver->write(cstream, _endpoint->timeout());
}
@@ -418,7 +422,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
const Byte* p;
Int sz = os->b.size();
p = reinterpret_cast<const Byte*>(&sz);
- copy(p, p + sizeof(Int), os->b.begin() + 9);
+ copy(p, p + sizeof(Int), os->b.begin() + 10);
//
// Send the request.
@@ -493,9 +497,9 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
if(compress)
{
//
- // Change message type.
+ // Set compression status.
//
- os->b[8] = compressedRequestMsg;
+ os->b[9] = 2; // Message is compressed.
//
// Do compression.
@@ -507,7 +511,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
// Send the request.
//
os->i = os->b.begin();
- traceRequest("sending compressed asynchronous request", *os, _logger, _traceLevels);
+ traceRequest("sending asynchronous request", *os, _logger, _traceLevels);
cstream.i = cstream.b.begin();
_transceiver->write(cstream, _endpoint->timeout());
}
@@ -519,7 +523,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
const Byte* p;
Int sz = os->b.size();
p = reinterpret_cast<const Byte*>(&sz);
- copy(p, p + sizeof(Int), os->b.begin() + 9);
+ copy(p, p + sizeof(Int), os->b.begin() + 10);
//
// Send the request.
@@ -643,9 +647,9 @@ IceInternal::Connection::flushBatchRequest()
if(compress)
{
//
- // Change message type.
+ // Set compression status.
//
- _batchStream.b[8] = compressedRequestBatchMsg;
+ _batchStream.b[9] = 2; // Message is compressed.
//
// Do compression.
@@ -657,7 +661,7 @@ IceInternal::Connection::flushBatchRequest()
// Send the batch request.
//
_batchStream.i = _batchStream.b.begin();
- traceBatchRequest("sending compressed batch request", _batchStream, _logger, _traceLevels);
+ traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
cstream.i = cstream.b.begin();
_transceiver->write(cstream, _endpoint->timeout());
}
@@ -669,7 +673,7 @@ IceInternal::Connection::flushBatchRequest()
const Byte* p;
Int sz = _batchStream.b.size();
p = reinterpret_cast<const Byte*>(&sz);
- copy(p, p + sizeof(Int), _batchStream.b.begin() + 9);
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + 10);
//
// Send the batch request.
@@ -702,7 +706,7 @@ IceInternal::Connection::flushBatchRequest()
}
void
-IceInternal::Connection::sendResponse(BasicStream* os)
+IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag)
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
@@ -725,16 +729,16 @@ IceInternal::Connection::sendResponse(BasicStream* os)
}
else
{
- compress = _endpoint->compress();
+ compress = _endpoint->compress() && compressFlag > 0;
}
if(compress)
{
//
- // Change message type.
+ // Set compression status.
//
- os->b[8] = compressedReplyMsg;
-
+ os->b[9] = 2; // Message is compressed.
+
//
// Do compression.
//
@@ -745,7 +749,7 @@ IceInternal::Connection::sendResponse(BasicStream* os)
// Send the reply.
//
os->i = os->b.begin();
- traceReply("sending compressed reply", *os, _logger, _traceLevels);
+ traceReply("sending reply", *os, _logger, _traceLevels);
cstream.i = cstream.b.begin();
_transceiver->write(cstream, _endpoint->timeout());
}
@@ -757,7 +761,7 @@ IceInternal::Connection::sendResponse(BasicStream* os)
const Byte* p;
Int sz = os->b.size();
p = reinterpret_cast<const Byte*>(&sz);
- copy(p, p + sizeof(Int), os->b.begin() + 9);
+ copy(p, p + sizeof(Int), os->b.begin() + 10);
//
// Send the reply.
@@ -895,6 +899,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
Int invoke = 0;
Int requestId = 0;
+ Byte compress = 0;
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
@@ -961,9 +966,8 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
//
// Uncompress if necessary.
//
- if(messageType == compressedRequestMsg ||
- messageType == compressedRequestBatchMsg ||
- messageType == compressedReplyMsg)
+ stream.read(compress);
+ if(compress == 2)
{
BasicStream ustream(_instance.get());
doUncompress(stream, ustream);
@@ -992,24 +996,6 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
break;
}
- case compressedRequestMsg:
- {
- if(_state == StateClosing)
- {
- traceRequest("received compressed request during closing\n"
- "(ignored by server, client will retry)",
- stream, _logger, _traceLevels);
- }
- else
- {
- traceRequest("received compressed request", stream, _logger, _traceLevels);
- stream.read(requestId);
- invoke = 1;
- ++_dispatchCount;
- }
- break;
- }
-
case requestBatchMsg:
{
if(_state == StateClosing)
@@ -1031,38 +1017,9 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
break;
}
- case compressedRequestBatchMsg:
- {
- if(_state == StateClosing)
- {
- traceBatchRequest("received compressed batch request during closing\n"
- "(ignored by server, client will retry)",
- stream, _logger, _traceLevels);
- }
- else
- {
- traceBatchRequest("received compressed batch request", stream, _logger, _traceLevels);
- stream.read(invoke);
- if(invoke < 0)
- {
- throw NegativeSizeException(__FILE__, __LINE__);
- }
- _dispatchCount += invoke;
- }
- break;
- }
-
case replyMsg:
- case compressedReplyMsg:
{
- if(messageType == compressedReplyMsg)
- {
- traceReply("received compressed reply", stream, _logger, _traceLevels);
- }
- else
- {
- traceReply("received reply", stream, _logger, _traceLevels);
- }
+ traceReply("received reply", stream, _logger, _traceLevels);
stream.read(requestId);
@@ -1216,7 +1173,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
// Prepare the invocation.
//
bool response = !_endpoint->datagram() && requestId != 0;
- Incoming in(_instance.get(), this, _adapter, response);
+ Incoming in(_instance.get(), this, _adapter, response, compress);
BasicStream* is = in.is();
stream.swap(*is);
BasicStream* os = in.os();
@@ -1350,6 +1307,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
requestHdr[6] = encodingMajor;
requestHdr[7] = encodingMinor;
requestHdr[8] = requestMsg;
+ requestHdr[9] = 1; // Default compression status: compression supported but not used.
vector<Byte>& requestBatchHdr = const_cast<vector<Byte>&>(_requestBatchHdr);
requestBatchHdr[0] = magic[0];
@@ -1361,6 +1319,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
requestBatchHdr[6] = encodingMajor;
requestBatchHdr[7] = encodingMinor;
requestBatchHdr[8] = requestBatchMsg;
+ requestBatchHdr[9] = 1; // Default compression status: compression supported but not used.
vector<Byte>& replyHdr = const_cast<vector<Byte>&>(_replyHdr);
replyHdr[0] = magic[0];
@@ -1372,6 +1331,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
replyHdr[6] = encodingMajor;
replyHdr[7] = encodingMinor;
replyHdr[8] = replyMsg;
+ replyHdr[9] = 1; // Default compression status: compression supported but not used.
}
IceInternal::Connection::~Connection()
@@ -1551,6 +1511,7 @@ IceInternal::Connection::initiateShutdown() const
os.write(encodingMajor);
os.write(encodingMinor);
os.write(closeConnectionMsg);
+ os.write((Byte)1); // Compression status: compression supported but not used.
os.write(headerSize); // Message size.
os.i = os.b.begin();
traceHeader("sending close connection", os, _logger, _traceLevels);
@@ -1697,7 +1658,7 @@ IceInternal::Connection::doCompress(BasicStream& uncompressed, BasicStream& comp
//
Int compressedSize = compressed.b.size();
p = reinterpret_cast<const Byte*>(&compressedSize);
- copy(p, p + sizeof(Int), uncompressed.b.begin() + 9);
+ copy(p, p + sizeof(Int), uncompressed.b.begin() + 10);
//
// Add the size of the uncompressed stream before the message body