summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp280
1 files changed, 245 insertions, 35 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 2c82d6eaa58..3cc5c9a757c 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -21,6 +21,7 @@
#include <Ice/Incoming.h>
#include <Ice/LocalException.h>
#include <Ice/Protocol.h>
+#include <bzlib.h>
using namespace std;
using namespace Ice;
@@ -83,7 +84,7 @@ IceInternal::Connection::prepareRequest(Outgoing* out)
}
void
-IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
+IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool comp)
{
IceUtil::RecMutex::Lock sync(*this);
@@ -98,15 +99,10 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
try
{
BasicStream* os = out->os();
- os->i = os->b.begin();
//
- // Fill in the message size and request ID.
+ // Fill in the request ID.
//
- const Byte* p;
- Int sz = os->b.size();
- p = reinterpret_cast<Byte*>(&sz);
- copy(p, p + sizeof(Int), os->i + 3);
if (!_endpoint->datagram() && !oneway)
{
requestId = _nextRequestId++;
@@ -114,12 +110,50 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
{
_nextRequestId = 1;
requestId = _nextRequestId++;
- }
- p = reinterpret_cast<Byte*>(&requestId);
- copy(p, p + sizeof(Int), os->i + headerSize);
+ }
+ const Byte* p;
+ p = reinterpret_cast<const Byte*>(&requestId);
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+ }
+
+ if (comp)
+ {
+ //
+ // Change message type.
+ //
+ os->b[2] = compressedRequestMsg;
+
+ //
+ // Do compression.
+ //
+ BasicStream cstream(_instance);
+ compress(*os, cstream);
+
+ //
+ // Send the request.
+ //
+ os->i = os->b.begin();
+ traceRequest("sending compressed request", *os, _logger, _traceLevels);
+ cstream.i = cstream.b.begin();
+ _transceiver->write(cstream, _endpoint->timeout());
+ }
+ else
+ {
+ //
+ // No compression, just fill in the message size.
+ //
+ const Byte* p;
+ Int sz = os->b.size();
+ p = reinterpret_cast<const Byte*>(&sz);
+ copy(p, p + sizeof(Int), os->b.begin() + 3);
+
+ //
+ // Send the request.
+ //
+ os->i = os->b.begin();
+ traceRequest("sending request", *os, _logger, _traceLevels);
+ _transceiver->write(*os, _endpoint->timeout());
}
- traceRequest("sending request", *os, _logger, _traceLevels);
- _transceiver->write(*os, _endpoint->timeout());
}
catch (const LocalException& ex)
{
@@ -191,7 +225,7 @@ IceInternal::Connection::abortBatchRequest()
}
void
-IceInternal::Connection::flushBatchRequest()
+IceInternal::Connection::flushBatchRequest(bool comp)
{
IceUtil::RecMutex::Lock sync(*this);
@@ -210,15 +244,44 @@ IceInternal::Connection::flushBatchRequest()
_batchStream.i = _batchStream.b.begin();
- //
- // Fill in the message size.
- //
- const Byte* p;
- Int sz = _batchStream.b.size();
- p = reinterpret_cast<Byte*>(&sz);
- copy(p, p + sizeof(Int), _batchStream.i + 3);
- traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
- _transceiver->write(_batchStream, _endpoint->timeout());
+ if (comp)
+ {
+ //
+ // Change message type.
+ //
+ _batchStream.b[2] = compressedRequestMsg;
+
+ //
+ // Do compression.
+ //
+ BasicStream cstream(_instance);
+ compress(_batchStream, cstream);
+
+ //
+ // Send the batch request.
+ //
+ _batchStream.i = _batchStream.b.begin();
+ traceBatchRequest("sending compressed batch request", _batchStream, _logger, _traceLevels);
+ cstream.i = cstream.b.begin();
+ _transceiver->write(cstream, _endpoint->timeout());
+ }
+ else
+ {
+ //
+ // No compression, just fill in the message size.
+ //
+ const Byte* p;
+ Int sz = _batchStream.b.size();
+ p = reinterpret_cast<const Byte*>(&sz);
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + 3);
+
+ //
+ // Send the batch request.
+ //
+ _batchStream.i = _batchStream.b.begin();
+ traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
+ _transceiver->write(_batchStream, _endpoint->timeout());
+ }
//
// Reset _batchStream so that new batch messages can be sent.
@@ -303,6 +366,7 @@ void
IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
{
bool invoke = false;
+ bool comp = false;
bool batch = false;
{
@@ -323,6 +387,20 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
assert(stream.i == stream.b.end());
stream.i = stream.b.begin() + 2;
stream.read(messageType);
+
+ //
+ // Uncompress if necessary.
+ //
+ if (messageType == compressedRequestMsg ||
+ messageType == compressedRequestBatchMsg ||
+ messageType == compressedReplyMsg)
+ {
+ BasicStream ustream(_instance);
+ uncompress(stream, ustream);
+ stream.b.swap(ustream.b);
+ comp = true;
+ }
+
stream.i = stream.b.begin() + headerSize;
switch (messageType)
@@ -343,6 +421,22 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
break;
}
+ case compressedRequestMsg:
+ {
+ if (_state == StateClosing)
+ {
+ traceRequest("received compressed request during closing\n"
+ "(ignored by server, client will retry)",
+ stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceRequest("received compressed request", stream, _logger, _traceLevels);
+ invoke = true;
+ }
+ break;
+ }
+
case requestBatchMsg:
{
if (_state == StateClosing)
@@ -360,9 +454,35 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
break;
}
+ case compressedRequestBatchMsg:
+ {
+ if (_state == StateClosing)
+ {
+ traceBatchRequest("received compressed batch request during closing\n"
+ "(ignored by server, client will retry)",
+ stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceBatchRequest("received compressed batch request", stream, _logger, _traceLevels);
+ invoke = true;
+ batch = true;
+ }
+ break;
+ }
+
case replyMsg:
+ case compressedReplyMsg:
{
- traceReply("received reply", stream, _logger, _traceLevels);
+ if (messageType == compressedReplyMsg)
+ {
+ traceReply("received compressed reply", stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceReply("received reply", stream, _logger, _traceLevels);
+ }
+
Int requestId;
stream.read(requestId);
@@ -523,18 +643,44 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
return;
}
- os->i = os->b.begin();
-
- //
- // Fill in the message size.
- //
- const Byte* p;
- Int sz = os->b.size();
- p = reinterpret_cast<Byte*>(&sz);
- copy(p, p + sizeof(Int), os->i + 3);
-
- traceReply("sending reply", *os, _logger, _traceLevels);
- _transceiver->write(*os, _endpoint->timeout());
+ if (comp)
+ {
+ //
+ // Change message type.
+ //
+ os->b[2] = compressedReplyMsg;
+
+ //
+ // Do compression.
+ //
+ BasicStream cstream(_instance);
+ compress(*os, cstream);
+
+ //
+ // Send the reply.
+ //
+ os->i = os->b.begin();
+ traceReply("sending compressed reply", *os, _logger, _traceLevels);
+ cstream.i = cstream.b.begin();
+ _transceiver->write(cstream, _endpoint->timeout());
+ }
+ else
+ {
+ //
+ // No compression, just fill in the message size.
+ //
+ const Byte* p;
+ Int sz = os->b.size();
+ p = reinterpret_cast<const Byte*>(&sz);
+ copy(p, p + sizeof(Int), os->b.begin() + 3);
+
+ //
+ // Send the reply.
+ //
+ os->i = os->b.begin();
+ traceReply("sending reply", *os, _logger, _traceLevels);
+ _transceiver->write(*os, _endpoint->timeout());
+ }
--_responseCount;
@@ -824,3 +970,67 @@ IceInternal::Connection::unregisterWithPool()
_clientThreadPool->unregister(_transceiver->fd());
}
}
+
+void
+IceInternal::Connection::compress(BasicStream& uncompressed, BasicStream& compressed)
+{
+ const Byte* p;
+
+ //
+ // Compress the message body, but not the header.
+ //
+ unsigned int uncompressedLen = uncompressed.b.size() - headerSize;
+ unsigned int compressedLen = static_cast<int>(uncompressedLen * 1.01 + 600);
+ compressed.b.resize(headerSize + sizeof(Int) + compressedLen);
+ int bzError = BZ2_bzBuffToBuffCompress(compressed.b.begin() + headerSize + sizeof(Int), &compressedLen,
+ uncompressed.b.begin() + headerSize, uncompressedLen,
+ 1, 0, 0);
+ assert(bzError == BZ_OK); // TODO: Local exception
+ compressed.b.resize(headerSize + sizeof(Int) + compressedLen);
+
+ //
+ // Write the size of the compressed stream into the header of the
+ // uncompressed stream. Since the header will be copied, this size
+ // will also be in the header of the compressed stream.
+ //
+ Int compressedSize = compressed.b.size();
+ p = reinterpret_cast<const Byte*>(&compressedSize);
+ copy(p, p + sizeof(Int), uncompressed.b.begin() + 3);
+
+ //
+ // Add the size of the uncompressed stream before the message body
+ // of the compressed stream.
+ //
+ Int uncompressedSize = uncompressed.b.size();
+ p = reinterpret_cast<const Byte*>(&uncompressedSize);
+ copy(p, p + sizeof(Int), compressed.b.begin() + headerSize);
+
+ //
+ // Copy the header from the uncompressed stream to the compressed one.
+ //
+ copy(uncompressed.b.begin(), uncompressed.b.begin() + headerSize, compressed.b.begin());
+}
+
+void
+IceInternal::Connection::uncompress(BasicStream& compressed, BasicStream& uncompressed)
+{
+ Int uncompressedSize;
+ compressed.i = compressed.b.begin() + headerSize;
+ compressed.read(uncompressedSize);
+ if (uncompressedSize <= headerSize)
+ {
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
+ }
+
+ uncompressed.resize(uncompressedSize);
+ unsigned int uncompressedLen = uncompressedSize - headerSize;
+ unsigned int compressedLen = compressed.b.size() - headerSize - sizeof(Int);
+ int bzError = BZ2_bzBuffToBuffDecompress(uncompressed.b.begin() + headerSize,
+ &uncompressedLen,
+ compressed.b.begin() + headerSize + sizeof(Int),
+ compressedLen,
+ 0, 0);
+ assert(bzError == BZ_OK);
+
+ copy(compressed.b.begin(), compressed.b.begin() + headerSize, uncompressed.b.begin());
+}