diff options
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 280 |
1 files changed, 245 insertions, 35 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 2c82d6eaa58..3cc5c9a757c 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -21,6 +21,7 @@ #include <Ice/Incoming.h> #include <Ice/LocalException.h> #include <Ice/Protocol.h> +#include <bzlib.h> using namespace std; using namespace Ice; @@ -83,7 +84,7 @@ IceInternal::Connection::prepareRequest(Outgoing* out) } void -IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) +IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool comp) { IceUtil::RecMutex::Lock sync(*this); @@ -98,15 +99,10 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) try { BasicStream* os = out->os(); - os->i = os->b.begin(); // - // Fill in the message size and request ID. + // Fill in the request ID. // - const Byte* p; - Int sz = os->b.size(); - p = reinterpret_cast<Byte*>(&sz); - copy(p, p + sizeof(Int), os->i + 3); if (!_endpoint->datagram() && !oneway) { requestId = _nextRequestId++; @@ -114,12 +110,50 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) { _nextRequestId = 1; requestId = _nextRequestId++; - } - p = reinterpret_cast<Byte*>(&requestId); - copy(p, p + sizeof(Int), os->i + headerSize); + } + const Byte* p; + p = reinterpret_cast<const Byte*>(&requestId); + copy(p, p + sizeof(Int), os->b.begin() + headerSize); + } + + if (comp) + { + // + // Change message type. + // + os->b[2] = compressedRequestMsg; + + // + // Do compression. + // + BasicStream cstream(_instance); + compress(*os, cstream); + + // + // Send the request. + // + os->i = os->b.begin(); + traceRequest("sending compressed request", *os, _logger, _traceLevels); + cstream.i = cstream.b.begin(); + _transceiver->write(cstream, _endpoint->timeout()); + } + else + { + // + // No compression, just fill in the message size. + // + const Byte* p; + Int sz = os->b.size(); + p = reinterpret_cast<const Byte*>(&sz); + copy(p, p + sizeof(Int), os->b.begin() + 3); + + // + // Send the request. + // + os->i = os->b.begin(); + traceRequest("sending request", *os, _logger, _traceLevels); + _transceiver->write(*os, _endpoint->timeout()); } - traceRequest("sending request", *os, _logger, _traceLevels); - _transceiver->write(*os, _endpoint->timeout()); } catch (const LocalException& ex) { @@ -191,7 +225,7 @@ IceInternal::Connection::abortBatchRequest() } void -IceInternal::Connection::flushBatchRequest() +IceInternal::Connection::flushBatchRequest(bool comp) { IceUtil::RecMutex::Lock sync(*this); @@ -210,15 +244,44 @@ IceInternal::Connection::flushBatchRequest() _batchStream.i = _batchStream.b.begin(); - // - // Fill in the message size. - // - const Byte* p; - Int sz = _batchStream.b.size(); - p = reinterpret_cast<Byte*>(&sz); - copy(p, p + sizeof(Int), _batchStream.i + 3); - traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); - _transceiver->write(_batchStream, _endpoint->timeout()); + if (comp) + { + // + // Change message type. + // + _batchStream.b[2] = compressedRequestMsg; + + // + // Do compression. + // + BasicStream cstream(_instance); + compress(_batchStream, cstream); + + // + // Send the batch request. + // + _batchStream.i = _batchStream.b.begin(); + traceBatchRequest("sending compressed batch request", _batchStream, _logger, _traceLevels); + cstream.i = cstream.b.begin(); + _transceiver->write(cstream, _endpoint->timeout()); + } + else + { + // + // No compression, just fill in the message size. + // + const Byte* p; + Int sz = _batchStream.b.size(); + p = reinterpret_cast<const Byte*>(&sz); + copy(p, p + sizeof(Int), _batchStream.b.begin() + 3); + + // + // Send the batch request. + // + _batchStream.i = _batchStream.b.begin(); + traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + _transceiver->write(_batchStream, _endpoint->timeout()); + } // // Reset _batchStream so that new batch messages can be sent. @@ -303,6 +366,7 @@ void IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threadPool) { bool invoke = false; + bool comp = false; bool batch = false; { @@ -323,6 +387,20 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa assert(stream.i == stream.b.end()); stream.i = stream.b.begin() + 2; stream.read(messageType); + + // + // Uncompress if necessary. + // + if (messageType == compressedRequestMsg || + messageType == compressedRequestBatchMsg || + messageType == compressedReplyMsg) + { + BasicStream ustream(_instance); + uncompress(stream, ustream); + stream.b.swap(ustream.b); + comp = true; + } + stream.i = stream.b.begin() + headerSize; switch (messageType) @@ -343,6 +421,22 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa break; } + case compressedRequestMsg: + { + if (_state == StateClosing) + { + traceRequest("received compressed request during closing\n" + "(ignored by server, client will retry)", + stream, _logger, _traceLevels); + } + else + { + traceRequest("received compressed request", stream, _logger, _traceLevels); + invoke = true; + } + break; + } + case requestBatchMsg: { if (_state == StateClosing) @@ -360,9 +454,35 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa break; } + case compressedRequestBatchMsg: + { + if (_state == StateClosing) + { + traceBatchRequest("received compressed batch request during closing\n" + "(ignored by server, client will retry)", + stream, _logger, _traceLevels); + } + else + { + traceBatchRequest("received compressed batch request", stream, _logger, _traceLevels); + invoke = true; + batch = true; + } + break; + } + case replyMsg: + case compressedReplyMsg: { - traceReply("received reply", stream, _logger, _traceLevels); + if (messageType == compressedReplyMsg) + { + traceReply("received compressed reply", stream, _logger, _traceLevels); + } + else + { + traceReply("received reply", stream, _logger, _traceLevels); + } + Int requestId; stream.read(requestId); @@ -523,18 +643,44 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa return; } - os->i = os->b.begin(); - - // - // Fill in the message size. - // - const Byte* p; - Int sz = os->b.size(); - p = reinterpret_cast<Byte*>(&sz); - copy(p, p + sizeof(Int), os->i + 3); - - traceReply("sending reply", *os, _logger, _traceLevels); - _transceiver->write(*os, _endpoint->timeout()); + if (comp) + { + // + // Change message type. + // + os->b[2] = compressedReplyMsg; + + // + // Do compression. + // + BasicStream cstream(_instance); + compress(*os, cstream); + + // + // Send the reply. + // + os->i = os->b.begin(); + traceReply("sending compressed reply", *os, _logger, _traceLevels); + cstream.i = cstream.b.begin(); + _transceiver->write(cstream, _endpoint->timeout()); + } + else + { + // + // No compression, just fill in the message size. + // + const Byte* p; + Int sz = os->b.size(); + p = reinterpret_cast<const Byte*>(&sz); + copy(p, p + sizeof(Int), os->b.begin() + 3); + + // + // Send the reply. + // + os->i = os->b.begin(); + traceReply("sending reply", *os, _logger, _traceLevels); + _transceiver->write(*os, _endpoint->timeout()); + } --_responseCount; @@ -824,3 +970,67 @@ IceInternal::Connection::unregisterWithPool() _clientThreadPool->unregister(_transceiver->fd()); } } + +void +IceInternal::Connection::compress(BasicStream& uncompressed, BasicStream& compressed) +{ + const Byte* p; + + // + // Compress the message body, but not the header. + // + unsigned int uncompressedLen = uncompressed.b.size() - headerSize; + unsigned int compressedLen = static_cast<int>(uncompressedLen * 1.01 + 600); + compressed.b.resize(headerSize + sizeof(Int) + compressedLen); + int bzError = BZ2_bzBuffToBuffCompress(compressed.b.begin() + headerSize + sizeof(Int), &compressedLen, + uncompressed.b.begin() + headerSize, uncompressedLen, + 1, 0, 0); + assert(bzError == BZ_OK); // TODO: Local exception + compressed.b.resize(headerSize + sizeof(Int) + compressedLen); + + // + // Write the size of the compressed stream into the header of the + // uncompressed stream. Since the header will be copied, this size + // will also be in the header of the compressed stream. + // + Int compressedSize = compressed.b.size(); + p = reinterpret_cast<const Byte*>(&compressedSize); + copy(p, p + sizeof(Int), uncompressed.b.begin() + 3); + + // + // Add the size of the uncompressed stream before the message body + // of the compressed stream. + // + Int uncompressedSize = uncompressed.b.size(); + p = reinterpret_cast<const Byte*>(&uncompressedSize); + copy(p, p + sizeof(Int), compressed.b.begin() + headerSize); + + // + // Copy the header from the uncompressed stream to the compressed one. + // + copy(uncompressed.b.begin(), uncompressed.b.begin() + headerSize, compressed.b.begin()); +} + +void +IceInternal::Connection::uncompress(BasicStream& compressed, BasicStream& uncompressed) +{ + Int uncompressedSize; + compressed.i = compressed.b.begin() + headerSize; + compressed.read(uncompressedSize); + if (uncompressedSize <= headerSize) + { + throw IllegalMessageSizeException(__FILE__, __LINE__); + } + + uncompressed.resize(uncompressedSize); + unsigned int uncompressedLen = uncompressedSize - headerSize; + unsigned int compressedLen = compressed.b.size() - headerSize - sizeof(Int); + int bzError = BZ2_bzBuffToBuffDecompress(uncompressed.b.begin() + headerSize, + &uncompressedLen, + compressed.b.begin() + headerSize + sizeof(Int), + compressedLen, + 0, 0); + assert(bzError == BZ_OK); + + copy(compressed.b.begin(), compressed.b.begin() + headerSize, uncompressed.b.begin()); +} |