diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 254 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.h | 17 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 56 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 22 |
4 files changed, 186 insertions, 163 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index d2af952f75c..bfbf01ab454 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -47,12 +47,39 @@ struct RandomNumberGenerator : public std::unary_function<ptrdiff_t, ptrdiff_t> } }; +template <typename K, typename V> void +remove(multimap<K, V>& map, K k, V v) +{ + pair<typename multimap<K, V>::iterator, typename multimap<K, V>::iterator> pr = map.equal_range(k); + assert(pr.first != pr.second); + for(typename multimap<K, V>::iterator q = pr.first; q != pr.second; ++q) + { + if(q->second.get() == v.get()) + { + map.erase(q); + return; + } + } + assert(false); // Nothing was removed which is an error. } -bool -IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator<(const ConnectorInfo& other) const +template <typename K, typename V> ::IceInternal::Handle<V> +find(multimap<K,::IceInternal::Handle<V> >& map, + K k, + const ::IceUtilInternal::ConstMemFun<bool, V, ::IceInternal::Handle<V> >& predicate) { - return connector < other.connector; + pair<typename multimap<K, ::IceInternal::Handle<V> >::const_iterator, + typename multimap<K, ::IceInternal::Handle<V> >::const_iterator> pr = map.equal_range(k); + for(typename multimap<K, ::IceInternal::Handle<V> >::const_iterator q = pr.first; q != pr.second; ++q) + { + if(predicate(q->second)) + { + return q->second; + } + } + return IceInternal::Handle<V>(); +} + } bool @@ -74,11 +101,11 @@ IceInternal::OutgoingConnectionFactory::destroy() #ifdef _STLP_BEGIN_NAMESPACE // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h for_each(_connections.begin(), _connections.end(), - voidbind2nd(Ice::secondVoidMemFun1<ConnectorInfo, ConnectionI, ConnectionI::DestructionReason> + voidbind2nd(Ice::secondVoidMemFun1<ConnectorPtr, ConnectionI, ConnectionI::DestructionReason> (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); #else for_each(_connections.begin(), _connections.end(), - bind2nd(Ice::secondVoidMemFun1<const ConnectorInfo, ConnectionI, ConnectionI::DestructionReason> + bind2nd(Ice::secondVoidMemFun1<const ConnectorPtr, ConnectionI, ConnectionI::DestructionReason> (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); #endif @@ -89,7 +116,7 @@ IceInternal::OutgoingConnectionFactory::destroy() void IceInternal::OutgoingConnectionFactory::waitUntilFinished() { - multimap<ConnectorInfo, ConnectionIPtr> connections; + multimap<ConnectorPtr, ConnectionIPtr> connections; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -112,10 +139,15 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() } for_each(connections.begin(), connections.end(), - Ice::secondVoidMemFun<const ConnectorInfo, ConnectionI>(&ConnectionI::waitUntilFinished)); + Ice::secondVoidMemFun<const ConnectorPtr, ConnectionI>(&ConnectionI::waitUntilFinished)); { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + // Ensure all the connections are finished and reapable at this point. + vector<Ice::ConnectionIPtr> cons; + _reaper->swapConnections(cons); + assert(cons.size() == _connections.size()); + cons.clear(); _connections.clear(); _connectionsByEndpoint.clear(); } @@ -222,14 +254,14 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt catch(const Ice::CommunicatorDestroyedException& ex) { exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); - handleException(*exception.get(), *q, connection, hasMore || q != connectors.end() - 1); + handleConnectionException(*exception.get(), hasMore || q != connectors.end() - 1); connection = 0; break; // No need to continue } catch(const Ice::LocalException& ex) { exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); - handleException(*exception.get(), *q, connection, hasMore || q != connectors.end() - 1); + handleConnectionException(*exception.get(), hasMore || q != connectors.end() - 1); connection = 0; } } @@ -335,7 +367,7 @@ IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& route // endpoint = endpoint->compress(false); - multimap<ConnectorInfo, ConnectionIPtr>::const_iterator q; + multimap<ConnectorPtr, ConnectionIPtr>::const_iterator q; for(q = _connections.begin(); q != _connections.end(); ++q) { if(q->second->endpoint() == endpoint) @@ -356,7 +388,7 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad return; } - for(multimap<ConnectorInfo, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p) + for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p) { if(p->second->getAdapter() == adapter) { @@ -372,7 +404,7 @@ IceInternal::OutgoingConnectionFactory::flushBatchRequests() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - for(multimap<ConnectorInfo, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); + for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p) { c.push_back(p->second); @@ -394,6 +426,7 @@ IceInternal::OutgoingConnectionFactory::flushBatchRequests() IceInternal::OutgoingConnectionFactory::OutgoingConnectionFactory(const InstancePtr& instance) : _instance(instance), + _reaper(new ConnectionReaper()), _destroyed(false), _pendingConnectCount(0) { @@ -439,23 +472,18 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr assert(!endpoints.empty()); for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) { - pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, - multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connectionsByEndpoint.equal_range(*p); - - for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = pr.first; q != pr.second; ++q) + ConnectionIPtr connection = find(_connectionsByEndpoint, *p, Ice::constMemFun(&ConnectionI::isActiveOrHolding)); + if(connection) { - if(q->second->isActiveOrHolding()) // Don't return destroyed or un-validated connections + if(defaultsAndOverrides->overrideCompress) { - if(defaultsAndOverrides->overrideCompress) - { - compress = defaultsAndOverrides->overrideCompressValue; - } - else - { - compress = (*p)->compress(); - } - return q->second; + compress = defaultsAndOverrides->overrideCompressValue; + } + else + { + compress = (*p)->compress(); } + return connection; } } return 0; @@ -469,38 +497,23 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInf DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) { - if(_pending.find(*p) != _pending.end()) + if(_pending.find(p->connector) != _pending.end()) { continue; } - pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator, - multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(*p); - - if(pr.first == pr.second) - { - continue; - } - - for(multimap<ConnectorInfo, ConnectionIPtr>::iterator q = pr.first; q != pr.second; ++q) + ConnectionIPtr connection = find(_connections, p->connector, Ice::constMemFun(&ConnectionI::isActiveOrHolding)); + if(connection) { - if(q->second->isActiveOrHolding()) // Don't return destroyed or un-validated connections + if(defaultsAndOverrides->overrideCompress) + { + compress = defaultsAndOverrides->overrideCompressValue; + } + else { - if(q->second->endpoint() != p->endpoint) - { - _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(p->endpoint, q->second)); - } - - if(defaultsAndOverrides->overrideCompress) - { - compress = defaultsAndOverrides->overrideCompressValue; - } - else - { compress = p->endpoint->compress(); - } - return q->second; } + return connection; } } @@ -551,32 +564,15 @@ IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo } // - // Reap connections for which destruction has completed. + // Reap closed connections // - multimap<ConnectorInfo, ConnectionIPtr>::iterator p = _connections.begin(); - while(p != _connections.end()) - { - if(p->second->isFinished()) - { - _connections.erase(p++); - } - else - { - ++p; - } - } - - multimap<EndpointIPtr, ConnectionIPtr>::iterator q = _connectionsByEndpoint.begin(); - while(q != _connectionsByEndpoint.end()) + vector<Ice::ConnectionIPtr> cons; + _reaper->swapConnections(cons); + for(vector<Ice::ConnectionIPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p) { - if(q->second->isFinished()) - { - _connectionsByEndpoint.erase(q++); - } - else - { - ++q; - } + remove(_connections, (*p)->connector(), *p); + remove(_connectionsByEndpoint, (*p)->endpoint(), *p); + remove(_connectionsByEndpoint, (*p)->endpoint()->compress(true), *p); } // @@ -651,7 +647,7 @@ ConnectionIPtr IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& transceiver, const ConnectorInfo& ci) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(_pending.find(ci) != _pending.end() && transceiver); + assert(_pending.find(ci.connector) != _pending.end() && transceiver); // // Create and add the connection to the connection map. Adding the connection to the map @@ -666,7 +662,7 @@ IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& t throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); } - connection = new ConnectionI(_instance, transceiver, ci.endpoint->compress(false), 0); + connection = new ConnectionI(_instance, _reaper, transceiver, ci.connector, ci.endpoint->compress(false), 0); } catch(const Ice::LocalException&) { @@ -681,8 +677,10 @@ IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& t throw; } - _connections.insert(pair<const ConnectorInfo, ConnectionIPtr>(ci, connection)); - _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(ci.endpoint, connection)); + _connections.insert(pair<const ConnectorPtr, ConnectionIPtr>(ci.connector, connection)); + _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint(), connection)); + _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint()->compress(true), + connection)); return connection; } @@ -703,7 +701,7 @@ IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<Connect IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) { - map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); + map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector); if(q != _pending.end()) { for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r) @@ -772,7 +770,7 @@ IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<Connect IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) { - map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); + map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector); if(q != _pending.end()) { for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r) @@ -819,7 +817,7 @@ IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& c bool found = false; for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) { - map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); + map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector); if(q != _pending.end()) { found = true; @@ -842,9 +840,9 @@ IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& c // for(vector<ConnectorInfo>::const_iterator r = connectors.begin(); r != connectors.end(); ++r) { - if(_pending.find(*r) == _pending.end()) + if(_pending.find(r->connector) == _pending.end()) { - _pending.insert(pair<ConnectorInfo, set<ConnectCallbackPtr> >(*r, set<ConnectCallbackPtr>())); + _pending.insert(pair<ConnectorPtr, set<ConnectCallbackPtr> >(r->connector, set<ConnectCallbackPtr>())); } } return false; @@ -856,7 +854,7 @@ IceInternal::OutgoingConnectionFactory::removeFromPending(const ConnectCallbackP { for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) { - map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); + map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector); if(q != _pending.end()) { q->second.erase(cb); @@ -865,15 +863,14 @@ IceInternal::OutgoingConnectionFactory::removeFromPending(const ConnectCallbackP } void -IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex, const ConnectorInfo& ci, - const ConnectionIPtr& connection, bool hasMore) +IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex, bool hasMore) { TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->retry >= 2) { Trace out(_instance->initializationData().logger, traceLevels->retryCat); - out << "connection to endpoint failed"; + out << "couldn't resolve endpoint host"; if(dynamic_cast<const CommunicatorDestroyedException*>(&ex)) { out << "\n"; @@ -891,51 +888,17 @@ IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex } out << ex; } - - if(connection && connection->isFinished()) - { - // - // If the connection is finished, we remove it right away instead of - // waiting for the reaping. - // - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator, - multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(ci); - - for(multimap<ConnectorInfo, ConnectionIPtr>::iterator p = pr.first; p != pr.second; ++p) - { - if(p->second == connection) - { - _connections.erase(p); - break; - } - } - - pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, - multimap<EndpointIPtr, ConnectionIPtr>::iterator> qr = _connectionsByEndpoint.equal_range(ci.endpoint); - - for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = qr.first; q != qr.second; ++q) - { - if(q->second == connection) - { - _connectionsByEndpoint.erase(q); - break; - } - } - } - } } void -IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex, bool hasMore) +IceInternal::OutgoingConnectionFactory::handleConnectionException(const LocalException& ex, bool hasMore) { TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->retry >= 2) { Trace out(_instance->initializationData().logger, traceLevels->retryCat); - out << "couldn't resolve endpoint host"; + out << "connection to endpoint failed"; if(dynamic_cast<const CommunicatorDestroyedException*>(&ex)) { out << "\n"; @@ -985,7 +948,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(c { assert(_iter != _connectors.end()); - _factory->handleException(ex, *_iter, connection, _hasMore || _iter != _connectors.end() - 1); + _factory->handleConnectionException(ex, _hasMore || _iter != _connectors.end() - 1); if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) // No need to continue. { _factory->finishGetConnection(_connectors, ex, this); @@ -1224,7 +1187,7 @@ IceInternal::IncomingConnectionFactory::destroy() void IceInternal::IncomingConnectionFactory::waitUntilHolding() const { - list<ConnectionIPtr> connections; + set<ConnectionIPtr> connections; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1254,7 +1217,7 @@ IceInternal::IncomingConnectionFactory::waitUntilHolding() const void IceInternal::IncomingConnectionFactory::waitUntilFinished() { - list<ConnectionIPtr> connections; + set<ConnectionIPtr> connections; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1282,6 +1245,11 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + // Ensure all the connections are finished and reapable at this point. + vector<Ice::ConnectionIPtr> cons; + _reaper->swapConnections(cons); + assert(cons.size() == _connections.size()); + cons.clear(); _connections.clear(); } } @@ -1394,14 +1362,17 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current) IceUtil::ThreadControl::yield(); return; } - + // - // Reap connections for which destruction has completed. + // Reap closed connections // - _connections.erase(remove_if(_connections.begin(), _connections.end(), - Ice::constMemFun(&ConnectionI::isFinished)), - _connections.end()); - + vector<Ice::ConnectionIPtr> cons; + _reaper->swapConnections(cons); + for(vector<Ice::ConnectionIPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p) + { + _connections.erase(*p); + } + // // Now accept a new connection. // @@ -1439,7 +1410,7 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current) try { - connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter); + connection = new ConnectionI(_instance, _reaper, transceiver, 0, _endpoint, _adapter); } catch(const LocalException& ex) { @@ -1460,7 +1431,7 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current) return; } - _connections.push_back(connection); + _connections.insert(connection); } assert(connection); @@ -1531,16 +1502,6 @@ IceInternal::IncomingConnectionFactory::connectionStartFailed(const Ice::Connect Warning out(_instance->initializationData().logger); out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); } - - // - // If the connection is finished, remove it right away from - // the connection map. Otherwise, we keep it in the map, it - // will eventually be reaped. - // - if(connection->isFinished()) - { - _connections.remove(connection); - } } // @@ -1552,6 +1513,7 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance const EndpointIPtr& endpoint, const ObjectAdapterPtr& adapter) : _instance(instance), + _reaper(new ConnectionReaper()), _endpoint(endpoint), _adapter(adapter), _warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0), @@ -1581,7 +1543,7 @@ IceInternal::IncomingConnectionFactory::initialize(const string& adapterName) try { - connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter); + connection = new ConnectionI(_instance, _reaper, _transceiver, 0, _endpoint, _adapter); } catch(const LocalException&) { @@ -1598,7 +1560,7 @@ IceInternal::IncomingConnectionFactory::initialize(const string& adapterName) connection->start(0); - _connections.push_back(connection); + _connections.insert(connection); } else { diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index 417d8b30d43..34603d11530 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -73,10 +73,9 @@ private: ConnectorInfo(const ConnectorPtr& c, const EndpointIPtr& e) : connector(c), endpoint(e) { } - - bool operator<(const ConnectorInfo& other) const; + bool operator==(const ConnectorInfo& other) const; - + ConnectorPtr connector; EndpointIPtr endpoint; }; @@ -139,13 +138,14 @@ private: Ice::ConnectionIPtr createConnection(const TransceiverPtr&, const ConnectorInfo&); void handleException(const Ice::LocalException&, bool); - void handleException(const Ice::LocalException&, const ConnectorInfo&, const Ice::ConnectionIPtr&, bool); + void handleConnectionException(const Ice::LocalException&, bool); const InstancePtr _instance; + const ConnectionReaperPtr _reaper; bool _destroyed; - std::multimap<ConnectorInfo, Ice::ConnectionIPtr> _connections; - std::map<ConnectorInfo, std::set<ConnectCallbackPtr> > _pending; + std::multimap<ConnectorPtr, Ice::ConnectionIPtr> _connections; + std::map<ConnectorPtr, std::set<ConnectCallbackPtr> > _pending; std::multimap<EndpointIPtr, Ice::ConnectionIPtr> _connectionsByEndpoint; int _pendingConnectCount; @@ -168,7 +168,7 @@ public: EndpointIPtr endpoint() const; std::list<Ice::ConnectionIPtr> connections() const; void flushBatchRequests(); - + // // Operations from EventHandler // @@ -203,6 +203,7 @@ private: void setState(State); const InstancePtr _instance; + const ConnectionReaperPtr _reaper; AcceptorPtr _acceptor; const TransceiverPtr _transceiver; @@ -212,7 +213,7 @@ private: const bool _warn; - std::list<Ice::ConnectionIPtr> _connections; + std::set<Ice::ConnectionIPtr> _connections; State _state; }; diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index bea6f4ed5f5..96952c5bf3a 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -58,6 +58,20 @@ private: } void +IceInternal::ConnectionReaper::add(const ConnectionIPtr& connection) +{ + Lock sync(*this); + _connections.push_back(connection); +} + +void +IceInternal::ConnectionReaper::swapConnections(vector<ConnectionIPtr>& connections) +{ + Lock sync(*this); + _connections.swap(connections); +} + +void Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) { if(adopted) @@ -899,27 +913,33 @@ Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_state > StateNotValidated); - + try { if(--_dispatchCount == 0) { + if(_state == StateFinished) + { + _reaper->add(this); + } notifyAll(); } - + if(_state >= StateClosed) { assert(_exception.get()); _exception->ice_throw(); } - + OutgoingMessage message(os, compressFlag > 0); sendMessage(message); - + if(_state == StateClosing && _dispatchCount == 0) { initiateShutdown(); } + + return; } catch(const LocalException& ex) { @@ -932,20 +952,24 @@ Ice::ConnectionI::sendNoResponse() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_state > StateNotValidated); - + try { if(--_dispatchCount == 0) { + if(_state == StateFinished) + { + _reaper->add(this); + } notifyAll(); } - + if(_state >= StateClosed) { assert(_exception.get()); _exception->ice_throw(); } - + if(_state == StateClosing && _dispatchCount == 0) { initiateShutdown(); @@ -963,6 +987,12 @@ Ice::ConnectionI::endpoint() const return _endpoint; // No mutex protection necessary, _endpoint is immutable. } +ConnectorPtr +Ice::ConnectionI::connector() const +{ + return _connector; // No mutex protection necessary, _connector is immutable. +} + void Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter) { @@ -1390,6 +1420,10 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateFinished); + if(_dispatchCount == 0) + { + _reaper->add(this); + } } } @@ -1468,19 +1502,27 @@ Ice::ConnectionI::invokeException(const LocalException& ex, int invokeNum) assert(_dispatchCount >= 0); if(_dispatchCount == 0) { + if(_state == StateFinished) + { + _reaper->add(this); + } notifyAll(); } } } Ice::ConnectionI::ConnectionI(const InstancePtr& instance, + const ConnectionReaperPtr& reaper, const TransceiverPtr& transceiver, + const ConnectorPtr& connector, const EndpointIPtr& endpoint, const ObjectAdapterPtr& adapter) : _transceiver(transceiver), _instance(instance), + _reaper(reaper), _desc(transceiver->toString()), _type(transceiver->type()), + _connector(connector), _endpoint(endpoint), _adapter(adapter), _logger(_instance->initializationData().logger), // Cached for better performance. diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index fb068c6352b..d0f7d908a16 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -23,6 +23,7 @@ #include <Ice/ObjectAdapterF.h> #include <Ice/ServantManagerF.h> #include <Ice/EndpointIF.h> +#include <Ice/ConnectorF.h> #include <Ice/LoggerF.h> #include <Ice/TraceLevelsF.h> #include <Ice/OutgoingAsyncF.h> @@ -38,6 +39,19 @@ class Outgoing; class BatchOutgoing; class OutgoingMessageCallback; +class ConnectionReaper : public IceUtil::Mutex, public IceUtil::Shared +{ +public: + + void add(const Ice::ConnectionIPtr&); + void swapConnections(std::vector<Ice::ConnectionIPtr>&); + +private: + + std::vector<Ice::ConnectionIPtr> _connections; +}; +typedef IceUtil::Handle<ConnectionReaper> ConnectionReaperPtr; + } namespace Ice @@ -96,6 +110,7 @@ public: void sendNoResponse(); IceInternal::EndpointIPtr endpoint() const; + IceInternal::ConnectorPtr connector() const; virtual void setAdapter(const ObjectAdapterPtr&); // From Connection. virtual ObjectAdapterPtr getAdapter() const; // From Connection. @@ -168,8 +183,9 @@ private: bool adopted; }; - ConnectionI(const IceInternal::InstancePtr&, const IceInternal::TransceiverPtr&, const IceInternal::EndpointIPtr&, - const ObjectAdapterPtr&); + ConnectionI(const IceInternal::InstancePtr&, const IceInternal::ConnectionReaperPtr&, + const IceInternal::TransceiverPtr&, const IceInternal::ConnectorPtr&, + const IceInternal::EndpointIPtr&, const ObjectAdapterPtr&); virtual ~ConnectionI(); friend class IceInternal::IncomingConnectionFactory; @@ -237,8 +253,10 @@ private: const IceInternal::TransceiverPtr _transceiver; const IceInternal::InstancePtr _instance; + const IceInternal::ConnectionReaperPtr _reaper; const std::string _desc; const std::string _type; + const IceInternal::ConnectorPtr _connector; const IceInternal::EndpointIPtr _endpoint; ObjectAdapterPtr _adapter; |