diff options
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 406 |
1 files changed, 238 insertions, 168 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 7cd4aae0847..1128835a881 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -36,17 +36,83 @@ using namespace IceInternal; void IceInternal::incRef(Connection* p) { p->__incRef(); } void IceInternal::decRef(Connection* p) { p->__decRef(); } +void +IceInternal::Connection::activate() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + setState(StateActive); +} + +void +IceInternal::Connection::hold() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + setState(StateHolding); +} + +void +IceInternal::Connection::destroy(DestructionReason reason) +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + switch(reason) + { + case ObjectAdapterDeactivated: + { + setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__)); + break; + } + + case CommunicatorDestroyed: + { + setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__)); + break; + } + } +} + bool -IceInternal::Connection::destroyed() const +IceInternal::Connection::isDestroyed() const { - IceUtil::RecMutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + return _state >= StateClosing; } +bool +IceInternal::Connection::isFinished() const +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + return _transceiver == 0; +} + +void +IceInternal::Connection::waitUntilHolding() const +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + while(_state < StateHolding || _dispatchCount > 0) + { + wait(); + } +} + +void +IceInternal::Connection::waitUntilFinished() const +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + while(_transceiver) + { + wait(); + } +} + void IceInternal::Connection::validate() { - IceUtil::RecMutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); if(_endpoint->datagram()) { @@ -122,34 +188,20 @@ IceInternal::Connection::validate() } void -IceInternal::Connection::hold() +IceInternal::Connection::incProxyCount() { - IceUtil::RecMutex::Lock sync(*this); - setState(StateHolding); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + assert(_proxyCount >= 0); + ++_proxyCount; } void -IceInternal::Connection::activate() +IceInternal::Connection::decProxyCount() { - IceUtil::RecMutex::Lock sync(*this); - setState(StateActive); -} - -void -IceInternal::Connection::incUsageCount() -{ - IceUtil::RecMutex::Lock sync(*this); - assert(_usageCount >= 0); - ++_usageCount; -} - -void -IceInternal::Connection::decUsageCount() -{ - IceUtil::RecMutex::Lock sync(*this); - assert(_usageCount > 0); - --_usageCount; - if(_usageCount == 0 && !_adapter) + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + assert(_proxyCount > 0); + --_proxyCount; + if(_proxyCount == 0 && !_adapter) { assert(_requests.empty()); assert(_asyncRequests.empty()); @@ -164,9 +216,9 @@ IceInternal::Connection::prepareRequest(BasicStream* os) } void -IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool comp) +IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool compress) { - IceUtil::RecMutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); if(_exception.get()) { @@ -198,17 +250,17 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool comp) if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes. { - comp = false; + compress = false; } else { if(_defaultsAndOverrides->overrideCompress) { - comp = _defaultsAndOverrides->overrideCompressValue; + compress = _defaultsAndOverrides->overrideCompressValue; } } - if(comp) + if(compress) { // // Change message type. @@ -219,7 +271,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool comp) // Do compression. // BasicStream cstream(_instance); - compress(*os, cstream); + doCompress(*os, cstream); // // Send the request. @@ -265,9 +317,9 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool comp) } void -IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool comp) +IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress) { - IceUtil::RecMutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); if(_exception.get()) { @@ -296,17 +348,17 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool comp if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes. { - comp = false; + compress = false; } else { if(_defaultsAndOverrides->overrideCompress) { - comp = _defaultsAndOverrides->overrideCompressValue; + compress = _defaultsAndOverrides->overrideCompressValue; } } - if(comp) + if(compress) { // // Change message type. @@ -317,7 +369,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool comp // Do compression. // BasicStream cstream(_instance); - compress(*os, cstream); + doCompress(*os, cstream); // // Send the request. @@ -398,6 +450,7 @@ IceInternal::Connection::finishBatchRequest(BasicStream* os) assert(_state < StateClosing); _batchStream.swap(*os); // Get the batch stream back. + ++_batchRequestNum; // Increment the number of requests in the batch. unlock(); // Give the Connection back. } @@ -409,9 +462,9 @@ IceInternal::Connection::abortBatchRequest() } void -IceInternal::Connection::flushBatchRequest(bool comp) +IceInternal::Connection::flushBatchRequest(bool compress) { - IceUtil::RecMutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); if(_exception.get()) { @@ -428,19 +481,26 @@ IceInternal::Connection::flushBatchRequest(bool comp) _batchStream.i = _batchStream.b.begin(); + // + // Fill in the number of requests in the batch. + // + const Byte* p; + p = reinterpret_cast<const Byte*>(&_batchRequestNum); + copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); + if(_batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes. { - comp = false; + compress = false; } else { if(_defaultsAndOverrides->overrideCompress) { - comp = _defaultsAndOverrides->overrideCompressValue; + compress = _defaultsAndOverrides->overrideCompressValue; } } - if(comp) + if(compress) { // // Change message type. @@ -451,7 +511,7 @@ IceInternal::Connection::flushBatchRequest(bool comp) // Do compression. // BasicStream cstream(_instance); - compress(_batchStream, cstream); + doCompress(_batchStream, cstream); // // Send the batch request. @@ -480,11 +540,13 @@ IceInternal::Connection::flushBatchRequest(bool comp) } // - // Reset _batchStream so that new batch messages can be sent. + // Reset _batchStream and _batchRequestNum, so that new batch + // messages can be sent. // BasicStream dummy(_instance); _batchStream.swap(dummy); assert(_batchStream.b.empty()); + _batchRequestNum = 0; } catch(const LocalException& ex) { @@ -495,9 +557,9 @@ IceInternal::Connection::flushBatchRequest(bool comp) } void -IceInternal::Connection::sendResponse(BasicStream* os, bool comp) +IceInternal::Connection::sendResponse(BasicStream* os, bool compress) { - IceUtil::RecMutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); try { @@ -508,17 +570,17 @@ IceInternal::Connection::sendResponse(BasicStream* os, bool comp) if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes. { - comp = false; + compress = false; } else { if(_defaultsAndOverrides->overrideCompress) { - comp = _defaultsAndOverrides->overrideCompressValue; + compress = _defaultsAndOverrides->overrideCompressValue; } } - if(comp) + if(compress) { // // Change message type. @@ -529,7 +591,7 @@ IceInternal::Connection::sendResponse(BasicStream* os, bool comp) // Do compression. // BasicStream cstream(_instance); - compress(*os, cstream); + doCompress(*os, cstream); // // Send the reply. @@ -557,17 +619,47 @@ IceInternal::Connection::sendResponse(BasicStream* os, bool comp) _transceiver->write(*os, _endpoint->timeout()); } - --_responseCount; + if(--_dispatchCount == 0) + { + notifyAll(); + } - if(_state == StateClosing && _responseCount == 0 && !_endpoint->datagram()) + if(_state == StateClosing && _dispatchCount == 0) { - closeConnection(); + initiateShutdown(); + } + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + } +} + +void +IceInternal::Connection::sendNoResponse() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + try + { + if(_state == StateClosed) + { + return; + } + + if(--_dispatchCount == 0) + { + notifyAll(); + } + + if(_state == StateClosing && _dispatchCount == 0) + { + initiateShutdown(); } } catch(const LocalException& ex) { setState(StateClosed, ex); - return; } } @@ -588,7 +680,7 @@ IceInternal::Connection::endpoint() const void IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter) { - IceUtil::RecMutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); // // We are registered with a thread pool in active and closing @@ -620,7 +712,7 @@ IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter) ObjectAdapterPtr IceInternal::Connection::getAdapter() const { - IceUtil::RecMutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); return _adapter; } @@ -640,12 +732,13 @@ void IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threadPool) { OutgoingAsyncPtr outAsync; - bool invoke = false; - bool comp = false; - bool batch = false; + + Int invoke = 0; + Int requestId = 0; + bool compress = false; { - IceUtil::RecMutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); threadPool->promoteFollower(); @@ -671,9 +764,9 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa messageType == compressedReplyMsg) { BasicStream ustream(_instance); - uncompress(stream, ustream); + doUncompress(stream, ustream); stream.b.swap(ustream.b); - comp = true; + compress = true; } stream.i = stream.b.begin() + headerSize; @@ -691,7 +784,9 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa else { traceRequest("received request", stream, _logger, _traceLevels); - invoke = true; + stream.read(requestId); + invoke = 1; + ++_dispatchCount; } break; } @@ -707,7 +802,9 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa else { traceRequest("received compressed request", stream, _logger, _traceLevels); - invoke = true; + stream.read(requestId); + invoke = 1; + ++_dispatchCount; } break; } @@ -723,8 +820,12 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa else { traceBatchRequest("received batch request", stream, _logger, _traceLevels); - invoke = true; - batch = true; + stream.read(invoke); + if(invoke < 0) + { + throw NegativeSizeException(__FILE__, __LINE__); + } + _dispatchCount += invoke; } break; } @@ -740,8 +841,12 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa else { traceBatchRequest("received compressed batch request", stream, _logger, _traceLevels); - invoke = true; - batch = true; + stream.read(invoke); + if(invoke < 0) + { + throw NegativeSizeException(__FILE__, __LINE__); + } + _dispatchCount += invoke; } break; } @@ -758,7 +863,6 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa traceReply("received reply", stream, _logger, _traceLevels); } - Int requestId; stream.read(requestId); map<Int, Outgoing*>::iterator p = _requests.end(); @@ -893,22 +997,13 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa // Method invocation must be done outside the thread // synchronization, so that nested calls are possible. // - if(invoke) + if(invoke > 0) { // - // If this is not a batch request, get the request id. - // - Int requestId = 0; - if(!batch) - { - stream.read(requestId); - } - bool response = !_endpoint->datagram() && requestId != 0; - - // // Prepare the invocation. // - Incoming in(_instance, _adapter, response ? this : 0, comp); + bool response = !_endpoint->datagram() && requestId != 0; + Incoming in(_instance, _adapter, this, response, compress); BasicStream* is = in.is(); stream.swap(*is); BasicStream* os = in.os(); @@ -920,53 +1015,39 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa // if(response) { - ++_responseCount; - os->write(protocolVersion); - os->write(encodingVersion); - os->write(replyMsg); - os->write(Int(0)); // Message size (placeholder). - os->write(requestId); + assert(invoke == 1); + os->writeBlob(_replyHdr); + + // + // Fill in the request ID. + // + const Byte* p; + p = reinterpret_cast<const Byte*>(&requestId); + copy(p, p + sizeof(Int), os->b.begin() + headerSize); } // // Do the invocation, or multiple invocations for batch // messages. // - do + while(invoke-- > 0) { - if(in.invoke()) - { - // - // If invoke() returned true, the operation was - // dispatched asynchronously, meaning that we - // don't send a response below. - // - response = false; - } + in.invoke(); } - while(batch && is->i < is->b.end()); } catch(const LocalException& ex) { - IceUtil::RecMutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); setState(StateClosed, ex); return; } - - // - // Send a response if necessary. - // - if(response) - { - sendResponse(os, comp); - } } } void IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) { - IceUtil::RecMutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); threadPool->promoteFollower(); @@ -977,13 +1058,15 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) else if(_state == StateClosed) { _transceiver->close(); + _transceiver = 0; + notifyAll(); } } void IceInternal::Connection::exception(const LocalException& ex) { - IceUtil::RecMutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); setState(StateClosed, ex); } @@ -1004,62 +1087,42 @@ IceInternal::Connection::Connection(const InstancePtr& instance, _logger(_instance->logger()), _traceLevels(_instance->traceLevels()), _defaultsAndOverrides(_instance->defaultsAndOverrides()), + _registeredWithPool(false), _warn(_instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0), + _requestHdr(headerSize + 4, 0), + _requestBatchHdr(headerSize + 4, 0), + _replyHdr(headerSize + 4, 0), _nextRequestId(1), _requestsHint(_requests.end()), _asyncRequestsHint(_asyncRequests.end()), _batchStream(_instance), - _responseCount(0), - _usageCount(0), - _state(StateHolding), - _registeredWithPool(false) + _batchRequestNum(0), + _dispatchCount(0), + _proxyCount(0), + _state(StateHolding) { vector<Byte>& requestHdr = const_cast<vector<Byte>&>(_requestHdr); - requestHdr.reserve(headerSize + 4); - requestHdr.push_back(protocolVersion); - requestHdr.push_back(encodingVersion); - requestHdr.push_back(requestMsg); - requestHdr.push_back(0); // Message size (placeholder). - requestHdr.push_back(0); // Message size (placeholder). - requestHdr.push_back(0); // Message size (placeholder). - requestHdr.push_back(0); // Message size (placeholder). - requestHdr.push_back(0); // Request ID (placeholder). - requestHdr.push_back(0); // Request ID (placeholder). - requestHdr.push_back(0); // Request ID (placeholder). - requestHdr.push_back(0); // Request ID (placeholder). - assert(_requestHdr.size() == headerSize + 4); + requestHdr[0] = protocolVersion; + requestHdr[1] = encodingVersion; + requestHdr[2] = requestMsg; vector<Byte>& requestBatchHdr = const_cast<vector<Byte>&>(_requestBatchHdr); - requestBatchHdr.resize(_requestHdr.size() - 4); - copy(_requestHdr.begin(), _requestHdr.end() - 4, requestBatchHdr.begin()); + requestBatchHdr[0] = protocolVersion; + requestBatchHdr[1] = encodingVersion; requestBatchHdr[2] = requestBatchMsg; + + vector<Byte>& replyHdr = const_cast<vector<Byte>&>(_replyHdr); + replyHdr[0] = protocolVersion; + replyHdr[1] = encodingVersion; + replyHdr[2] = replyMsg; } IceInternal::Connection::~Connection() { - assert(_usageCount == 0); assert(_state == StateClosed); -} - -void -IceInternal::Connection::destroy(DestructionReason reason) -{ - RecMutex::Lock sync(*this); - - switch(reason) - { - case ObjectAdapterDeactivated: - { - setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__)); - break; - } - - case CommunicatorDestroyed: - { - setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__)); - break; - } - } + assert(!_transceiver); + assert(_dispatchCount == 0); + assert(_proxyCount == 0); } void @@ -1174,17 +1237,19 @@ IceInternal::Connection::setState(State state) registerWithPool(); } unregisterWithPool(); + _dispatchCount = 0; break; } } _state = state; + notifyAll(); - if(_state == StateClosing && _responseCount == 0 && !_endpoint->datagram()) + if(_state == StateClosing && _dispatchCount == 0) { try { - closeConnection(); + initiateShutdown(); } catch(const LocalException& ex) { @@ -1194,21 +1259,26 @@ IceInternal::Connection::setState(State state) } void -IceInternal::Connection::closeConnection() const +IceInternal::Connection::initiateShutdown() const { - BasicStream os(_instance); - os.write(protocolVersion); - os.write(encodingVersion); - os.write(closeConnectionMsg); - os.write(headerSize); // Message size. - os.i = os.b.begin(); - traceHeader("sending close connection", os, _logger, _traceLevels); - _transceiver->write(os, _endpoint->timeout()); + assert(_state == StateClosing); + assert(_dispatchCount == 0); - // - // A close connection is always followed by a connection shutdown. - // - _transceiver->shutdown(); + if(!_endpoint->datagram()) + { + // + // Before we shut down, we send a close connection message. + // + BasicStream os(_instance); + os.write(protocolVersion); + os.write(encodingVersion); + os.write(closeConnectionMsg); + os.write(headerSize); // Message size. + os.i = os.b.begin(); + traceHeader("sending close connection", os, _logger, _traceLevels); + _transceiver->write(os, _endpoint->timeout()); + _transceiver->shutdown(); + } } void @@ -1321,7 +1391,7 @@ getBZ2Error(int bzError) } void -IceInternal::Connection::compress(BasicStream& uncompressed, BasicStream& compressed) +IceInternal::Connection::doCompress(BasicStream& uncompressed, BasicStream& compressed) { const Byte* p; @@ -1366,7 +1436,7 @@ IceInternal::Connection::compress(BasicStream& uncompressed, BasicStream& compre } void -IceInternal::Connection::uncompress(BasicStream& compressed, BasicStream& uncompressed) +IceInternal::Connection::doUncompress(BasicStream& compressed, BasicStream& uncompressed) { Int uncompressedSize; compressed.i = compressed.b.begin() + headerSize; |