summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp254
-rw-r--r--cpp/src/Ice/ConnectionFactory.h17
-rw-r--r--cpp/src/Ice/ConnectionI.cpp56
-rw-r--r--cpp/src/Ice/ConnectionI.h22
4 files changed, 186 insertions, 163 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index d2af952f75c..bfbf01ab454 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -47,12 +47,39 @@ struct RandomNumberGenerator : public std::unary_function<ptrdiff_t, ptrdiff_t>
}
};
+template <typename K, typename V> void
+remove(multimap<K, V>& map, K k, V v)
+{
+ pair<typename multimap<K, V>::iterator, typename multimap<K, V>::iterator> pr = map.equal_range(k);
+ assert(pr.first != pr.second);
+ for(typename multimap<K, V>::iterator q = pr.first; q != pr.second; ++q)
+ {
+ if(q->second.get() == v.get())
+ {
+ map.erase(q);
+ return;
+ }
+ }
+ assert(false); // Nothing was removed which is an error.
}
-bool
-IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator<(const ConnectorInfo& other) const
+template <typename K, typename V> ::IceInternal::Handle<V>
+find(multimap<K,::IceInternal::Handle<V> >& map,
+ K k,
+ const ::IceUtilInternal::ConstMemFun<bool, V, ::IceInternal::Handle<V> >& predicate)
{
- return connector < other.connector;
+ pair<typename multimap<K, ::IceInternal::Handle<V> >::const_iterator,
+ typename multimap<K, ::IceInternal::Handle<V> >::const_iterator> pr = map.equal_range(k);
+ for(typename multimap<K, ::IceInternal::Handle<V> >::const_iterator q = pr.first; q != pr.second; ++q)
+ {
+ if(predicate(q->second))
+ {
+ return q->second;
+ }
+ }
+ return IceInternal::Handle<V>();
+}
+
}
bool
@@ -74,11 +101,11 @@ IceInternal::OutgoingConnectionFactory::destroy()
#ifdef _STLP_BEGIN_NAMESPACE
// voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h
for_each(_connections.begin(), _connections.end(),
- voidbind2nd(Ice::secondVoidMemFun1<ConnectorInfo, ConnectionI, ConnectionI::DestructionReason>
+ voidbind2nd(Ice::secondVoidMemFun1<ConnectorPtr, ConnectionI, ConnectionI::DestructionReason>
(&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed));
#else
for_each(_connections.begin(), _connections.end(),
- bind2nd(Ice::secondVoidMemFun1<const ConnectorInfo, ConnectionI, ConnectionI::DestructionReason>
+ bind2nd(Ice::secondVoidMemFun1<const ConnectorPtr, ConnectionI, ConnectionI::DestructionReason>
(&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed));
#endif
@@ -89,7 +116,7 @@ IceInternal::OutgoingConnectionFactory::destroy()
void
IceInternal::OutgoingConnectionFactory::waitUntilFinished()
{
- multimap<ConnectorInfo, ConnectionIPtr> connections;
+ multimap<ConnectorPtr, ConnectionIPtr> connections;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -112,10 +139,15 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
}
for_each(connections.begin(), connections.end(),
- Ice::secondVoidMemFun<const ConnectorInfo, ConnectionI>(&ConnectionI::waitUntilFinished));
+ Ice::secondVoidMemFun<const ConnectorPtr, ConnectionI>(&ConnectionI::waitUntilFinished));
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ // Ensure all the connections are finished and reapable at this point.
+ vector<Ice::ConnectionIPtr> cons;
+ _reaper->swapConnections(cons);
+ assert(cons.size() == _connections.size());
+ cons.clear();
_connections.clear();
_connectionsByEndpoint.clear();
}
@@ -222,14 +254,14 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
catch(const Ice::CommunicatorDestroyedException& ex)
{
exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
- handleException(*exception.get(), *q, connection, hasMore || q != connectors.end() - 1);
+ handleConnectionException(*exception.get(), hasMore || q != connectors.end() - 1);
connection = 0;
break; // No need to continue
}
catch(const Ice::LocalException& ex)
{
exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
- handleException(*exception.get(), *q, connection, hasMore || q != connectors.end() - 1);
+ handleConnectionException(*exception.get(), hasMore || q != connectors.end() - 1);
connection = 0;
}
}
@@ -335,7 +367,7 @@ IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& route
//
endpoint = endpoint->compress(false);
- multimap<ConnectorInfo, ConnectionIPtr>::const_iterator q;
+ multimap<ConnectorPtr, ConnectionIPtr>::const_iterator q;
for(q = _connections.begin(); q != _connections.end(); ++q)
{
if(q->second->endpoint() == endpoint)
@@ -356,7 +388,7 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad
return;
}
- for(multimap<ConnectorInfo, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p)
+ for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p)
{
if(p->second->getAdapter() == adapter)
{
@@ -372,7 +404,7 @@ IceInternal::OutgoingConnectionFactory::flushBatchRequests()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- for(multimap<ConnectorInfo, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end();
+ for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end();
++p)
{
c.push_back(p->second);
@@ -394,6 +426,7 @@ IceInternal::OutgoingConnectionFactory::flushBatchRequests()
IceInternal::OutgoingConnectionFactory::OutgoingConnectionFactory(const InstancePtr& instance) :
_instance(instance),
+ _reaper(new ConnectionReaper()),
_destroyed(false),
_pendingConnectCount(0)
{
@@ -439,23 +472,18 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr
assert(!endpoints.empty());
for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
{
- pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
- multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connectionsByEndpoint.equal_range(*p);
-
- for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = pr.first; q != pr.second; ++q)
+ ConnectionIPtr connection = find(_connectionsByEndpoint, *p, Ice::constMemFun(&ConnectionI::isActiveOrHolding));
+ if(connection)
{
- if(q->second->isActiveOrHolding()) // Don't return destroyed or un-validated connections
+ if(defaultsAndOverrides->overrideCompress)
{
- if(defaultsAndOverrides->overrideCompress)
- {
- compress = defaultsAndOverrides->overrideCompressValue;
- }
- else
- {
- compress = (*p)->compress();
- }
- return q->second;
+ compress = defaultsAndOverrides->overrideCompressValue;
+ }
+ else
+ {
+ compress = (*p)->compress();
}
+ return connection;
}
}
return 0;
@@ -469,38 +497,23 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInf
DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
{
- if(_pending.find(*p) != _pending.end())
+ if(_pending.find(p->connector) != _pending.end())
{
continue;
}
- pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator,
- multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(*p);
-
- if(pr.first == pr.second)
- {
- continue;
- }
-
- for(multimap<ConnectorInfo, ConnectionIPtr>::iterator q = pr.first; q != pr.second; ++q)
+ ConnectionIPtr connection = find(_connections, p->connector, Ice::constMemFun(&ConnectionI::isActiveOrHolding));
+ if(connection)
{
- if(q->second->isActiveOrHolding()) // Don't return destroyed or un-validated connections
+ if(defaultsAndOverrides->overrideCompress)
+ {
+ compress = defaultsAndOverrides->overrideCompressValue;
+ }
+ else
{
- if(q->second->endpoint() != p->endpoint)
- {
- _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(p->endpoint, q->second));
- }
-
- if(defaultsAndOverrides->overrideCompress)
- {
- compress = defaultsAndOverrides->overrideCompressValue;
- }
- else
- {
compress = p->endpoint->compress();
- }
- return q->second;
}
+ return connection;
}
}
@@ -551,32 +564,15 @@ IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo
}
//
- // Reap connections for which destruction has completed.
+ // Reap closed connections
//
- multimap<ConnectorInfo, ConnectionIPtr>::iterator p = _connections.begin();
- while(p != _connections.end())
- {
- if(p->second->isFinished())
- {
- _connections.erase(p++);
- }
- else
- {
- ++p;
- }
- }
-
- multimap<EndpointIPtr, ConnectionIPtr>::iterator q = _connectionsByEndpoint.begin();
- while(q != _connectionsByEndpoint.end())
+ vector<Ice::ConnectionIPtr> cons;
+ _reaper->swapConnections(cons);
+ for(vector<Ice::ConnectionIPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p)
{
- if(q->second->isFinished())
- {
- _connectionsByEndpoint.erase(q++);
- }
- else
- {
- ++q;
- }
+ remove(_connections, (*p)->connector(), *p);
+ remove(_connectionsByEndpoint, (*p)->endpoint(), *p);
+ remove(_connectionsByEndpoint, (*p)->endpoint()->compress(true), *p);
}
//
@@ -651,7 +647,7 @@ ConnectionIPtr
IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& transceiver, const ConnectorInfo& ci)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(_pending.find(ci) != _pending.end() && transceiver);
+ assert(_pending.find(ci.connector) != _pending.end() && transceiver);
//
// Create and add the connection to the connection map. Adding the connection to the map
@@ -666,7 +662,7 @@ IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& t
throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
}
- connection = new ConnectionI(_instance, transceiver, ci.endpoint->compress(false), 0);
+ connection = new ConnectionI(_instance, _reaper, transceiver, ci.connector, ci.endpoint->compress(false), 0);
}
catch(const Ice::LocalException&)
{
@@ -681,8 +677,10 @@ IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& t
throw;
}
- _connections.insert(pair<const ConnectorInfo, ConnectionIPtr>(ci, connection));
- _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(ci.endpoint, connection));
+ _connections.insert(pair<const ConnectorPtr, ConnectionIPtr>(ci.connector, connection));
+ _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint(), connection));
+ _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint()->compress(true),
+ connection));
return connection;
}
@@ -703,7 +701,7 @@ IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<Connect
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
{
- map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p);
+ map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector);
if(q != _pending.end())
{
for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r)
@@ -772,7 +770,7 @@ IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<Connect
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
{
- map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p);
+ map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector);
if(q != _pending.end())
{
for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r)
@@ -819,7 +817,7 @@ IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& c
bool found = false;
for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
{
- map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p);
+ map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector);
if(q != _pending.end())
{
found = true;
@@ -842,9 +840,9 @@ IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& c
//
for(vector<ConnectorInfo>::const_iterator r = connectors.begin(); r != connectors.end(); ++r)
{
- if(_pending.find(*r) == _pending.end())
+ if(_pending.find(r->connector) == _pending.end())
{
- _pending.insert(pair<ConnectorInfo, set<ConnectCallbackPtr> >(*r, set<ConnectCallbackPtr>()));
+ _pending.insert(pair<ConnectorPtr, set<ConnectCallbackPtr> >(r->connector, set<ConnectCallbackPtr>()));
}
}
return false;
@@ -856,7 +854,7 @@ IceInternal::OutgoingConnectionFactory::removeFromPending(const ConnectCallbackP
{
for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
{
- map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p);
+ map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector);
if(q != _pending.end())
{
q->second.erase(cb);
@@ -865,15 +863,14 @@ IceInternal::OutgoingConnectionFactory::removeFromPending(const ConnectCallbackP
}
void
-IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex, const ConnectorInfo& ci,
- const ConnectionIPtr& connection, bool hasMore)
+IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex, bool hasMore)
{
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->retry >= 2)
{
Trace out(_instance->initializationData().logger, traceLevels->retryCat);
- out << "connection to endpoint failed";
+ out << "couldn't resolve endpoint host";
if(dynamic_cast<const CommunicatorDestroyedException*>(&ex))
{
out << "\n";
@@ -891,51 +888,17 @@ IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex
}
out << ex;
}
-
- if(connection && connection->isFinished())
- {
- //
- // If the connection is finished, we remove it right away instead of
- // waiting for the reaping.
- //
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator,
- multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(ci);
-
- for(multimap<ConnectorInfo, ConnectionIPtr>::iterator p = pr.first; p != pr.second; ++p)
- {
- if(p->second == connection)
- {
- _connections.erase(p);
- break;
- }
- }
-
- pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
- multimap<EndpointIPtr, ConnectionIPtr>::iterator> qr = _connectionsByEndpoint.equal_range(ci.endpoint);
-
- for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = qr.first; q != qr.second; ++q)
- {
- if(q->second == connection)
- {
- _connectionsByEndpoint.erase(q);
- break;
- }
- }
- }
- }
}
void
-IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex, bool hasMore)
+IceInternal::OutgoingConnectionFactory::handleConnectionException(const LocalException& ex, bool hasMore)
{
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->retry >= 2)
{
Trace out(_instance->initializationData().logger, traceLevels->retryCat);
- out << "couldn't resolve endpoint host";
+ out << "connection to endpoint failed";
if(dynamic_cast<const CommunicatorDestroyedException*>(&ex))
{
out << "\n";
@@ -985,7 +948,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(c
{
assert(_iter != _connectors.end());
- _factory->handleException(ex, *_iter, connection, _hasMore || _iter != _connectors.end() - 1);
+ _factory->handleConnectionException(ex, _hasMore || _iter != _connectors.end() - 1);
if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) // No need to continue.
{
_factory->finishGetConnection(_connectors, ex, this);
@@ -1224,7 +1187,7 @@ IceInternal::IncomingConnectionFactory::destroy()
void
IceInternal::IncomingConnectionFactory::waitUntilHolding() const
{
- list<ConnectionIPtr> connections;
+ set<ConnectionIPtr> connections;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -1254,7 +1217,7 @@ IceInternal::IncomingConnectionFactory::waitUntilHolding() const
void
IceInternal::IncomingConnectionFactory::waitUntilFinished()
{
- list<ConnectionIPtr> connections;
+ set<ConnectionIPtr> connections;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -1282,6 +1245,11 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ // Ensure all the connections are finished and reapable at this point.
+ vector<Ice::ConnectionIPtr> cons;
+ _reaper->swapConnections(cons);
+ assert(cons.size() == _connections.size());
+ cons.clear();
_connections.clear();
}
}
@@ -1394,14 +1362,17 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current)
IceUtil::ThreadControl::yield();
return;
}
-
+
//
- // Reap connections for which destruction has completed.
+ // Reap closed connections
//
- _connections.erase(remove_if(_connections.begin(), _connections.end(),
- Ice::constMemFun(&ConnectionI::isFinished)),
- _connections.end());
-
+ vector<Ice::ConnectionIPtr> cons;
+ _reaper->swapConnections(cons);
+ for(vector<Ice::ConnectionIPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p)
+ {
+ _connections.erase(*p);
+ }
+
//
// Now accept a new connection.
//
@@ -1439,7 +1410,7 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current)
try
{
- connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter);
+ connection = new ConnectionI(_instance, _reaper, transceiver, 0, _endpoint, _adapter);
}
catch(const LocalException& ex)
{
@@ -1460,7 +1431,7 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current)
return;
}
- _connections.push_back(connection);
+ _connections.insert(connection);
}
assert(connection);
@@ -1531,16 +1502,6 @@ IceInternal::IncomingConnectionFactory::connectionStartFailed(const Ice::Connect
Warning out(_instance->initializationData().logger);
out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
}
-
- //
- // If the connection is finished, remove it right away from
- // the connection map. Otherwise, we keep it in the map, it
- // will eventually be reaped.
- //
- if(connection->isFinished())
- {
- _connections.remove(connection);
- }
}
//
@@ -1552,6 +1513,7 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
const EndpointIPtr& endpoint,
const ObjectAdapterPtr& adapter) :
_instance(instance),
+ _reaper(new ConnectionReaper()),
_endpoint(endpoint),
_adapter(adapter),
_warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0),
@@ -1581,7 +1543,7 @@ IceInternal::IncomingConnectionFactory::initialize(const string& adapterName)
try
{
- connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter);
+ connection = new ConnectionI(_instance, _reaper, _transceiver, 0, _endpoint, _adapter);
}
catch(const LocalException&)
{
@@ -1598,7 +1560,7 @@ IceInternal::IncomingConnectionFactory::initialize(const string& adapterName)
connection->start(0);
- _connections.push_back(connection);
+ _connections.insert(connection);
}
else
{
diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h
index 417d8b30d43..34603d11530 100644
--- a/cpp/src/Ice/ConnectionFactory.h
+++ b/cpp/src/Ice/ConnectionFactory.h
@@ -73,10 +73,9 @@ private:
ConnectorInfo(const ConnectorPtr& c, const EndpointIPtr& e) : connector(c), endpoint(e)
{
}
-
- bool operator<(const ConnectorInfo& other) const;
+
bool operator==(const ConnectorInfo& other) const;
-
+
ConnectorPtr connector;
EndpointIPtr endpoint;
};
@@ -139,13 +138,14 @@ private:
Ice::ConnectionIPtr createConnection(const TransceiverPtr&, const ConnectorInfo&);
void handleException(const Ice::LocalException&, bool);
- void handleException(const Ice::LocalException&, const ConnectorInfo&, const Ice::ConnectionIPtr&, bool);
+ void handleConnectionException(const Ice::LocalException&, bool);
const InstancePtr _instance;
+ const ConnectionReaperPtr _reaper;
bool _destroyed;
- std::multimap<ConnectorInfo, Ice::ConnectionIPtr> _connections;
- std::map<ConnectorInfo, std::set<ConnectCallbackPtr> > _pending;
+ std::multimap<ConnectorPtr, Ice::ConnectionIPtr> _connections;
+ std::map<ConnectorPtr, std::set<ConnectCallbackPtr> > _pending;
std::multimap<EndpointIPtr, Ice::ConnectionIPtr> _connectionsByEndpoint;
int _pendingConnectCount;
@@ -168,7 +168,7 @@ public:
EndpointIPtr endpoint() const;
std::list<Ice::ConnectionIPtr> connections() const;
void flushBatchRequests();
-
+
//
// Operations from EventHandler
//
@@ -203,6 +203,7 @@ private:
void setState(State);
const InstancePtr _instance;
+ const ConnectionReaperPtr _reaper;
AcceptorPtr _acceptor;
const TransceiverPtr _transceiver;
@@ -212,7 +213,7 @@ private:
const bool _warn;
- std::list<Ice::ConnectionIPtr> _connections;
+ std::set<Ice::ConnectionIPtr> _connections;
State _state;
};
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index bea6f4ed5f5..96952c5bf3a 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -58,6 +58,20 @@ private:
}
void
+IceInternal::ConnectionReaper::add(const ConnectionIPtr& connection)
+{
+ Lock sync(*this);
+ _connections.push_back(connection);
+}
+
+void
+IceInternal::ConnectionReaper::swapConnections(vector<ConnectionIPtr>& connections)
+{
+ Lock sync(*this);
+ _connections.swap(connections);
+}
+
+void
Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
{
if(adopted)
@@ -899,27 +913,33 @@ Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_state > StateNotValidated);
-
+
try
{
if(--_dispatchCount == 0)
{
+ if(_state == StateFinished)
+ {
+ _reaper->add(this);
+ }
notifyAll();
}
-
+
if(_state >= StateClosed)
{
assert(_exception.get());
_exception->ice_throw();
}
-
+
OutgoingMessage message(os, compressFlag > 0);
sendMessage(message);
-
+
if(_state == StateClosing && _dispatchCount == 0)
{
initiateShutdown();
}
+
+ return;
}
catch(const LocalException& ex)
{
@@ -932,20 +952,24 @@ Ice::ConnectionI::sendNoResponse()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_state > StateNotValidated);
-
+
try
{
if(--_dispatchCount == 0)
{
+ if(_state == StateFinished)
+ {
+ _reaper->add(this);
+ }
notifyAll();
}
-
+
if(_state >= StateClosed)
{
assert(_exception.get());
_exception->ice_throw();
}
-
+
if(_state == StateClosing && _dispatchCount == 0)
{
initiateShutdown();
@@ -963,6 +987,12 @@ Ice::ConnectionI::endpoint() const
return _endpoint; // No mutex protection necessary, _endpoint is immutable.
}
+ConnectorPtr
+Ice::ConnectionI::connector() const
+{
+ return _connector; // No mutex protection necessary, _connector is immutable.
+}
+
void
Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter)
{
@@ -1390,6 +1420,10 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateFinished);
+ if(_dispatchCount == 0)
+ {
+ _reaper->add(this);
+ }
}
}
@@ -1468,19 +1502,27 @@ Ice::ConnectionI::invokeException(const LocalException& ex, int invokeNum)
assert(_dispatchCount >= 0);
if(_dispatchCount == 0)
{
+ if(_state == StateFinished)
+ {
+ _reaper->add(this);
+ }
notifyAll();
}
}
}
Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
+ const ConnectionReaperPtr& reaper,
const TransceiverPtr& transceiver,
+ const ConnectorPtr& connector,
const EndpointIPtr& endpoint,
const ObjectAdapterPtr& adapter) :
_transceiver(transceiver),
_instance(instance),
+ _reaper(reaper),
_desc(transceiver->toString()),
_type(transceiver->type()),
+ _connector(connector),
_endpoint(endpoint),
_adapter(adapter),
_logger(_instance->initializationData().logger), // Cached for better performance.
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index fb068c6352b..d0f7d908a16 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -23,6 +23,7 @@
#include <Ice/ObjectAdapterF.h>
#include <Ice/ServantManagerF.h>
#include <Ice/EndpointIF.h>
+#include <Ice/ConnectorF.h>
#include <Ice/LoggerF.h>
#include <Ice/TraceLevelsF.h>
#include <Ice/OutgoingAsyncF.h>
@@ -38,6 +39,19 @@ class Outgoing;
class BatchOutgoing;
class OutgoingMessageCallback;
+class ConnectionReaper : public IceUtil::Mutex, public IceUtil::Shared
+{
+public:
+
+ void add(const Ice::ConnectionIPtr&);
+ void swapConnections(std::vector<Ice::ConnectionIPtr>&);
+
+private:
+
+ std::vector<Ice::ConnectionIPtr> _connections;
+};
+typedef IceUtil::Handle<ConnectionReaper> ConnectionReaperPtr;
+
}
namespace Ice
@@ -96,6 +110,7 @@ public:
void sendNoResponse();
IceInternal::EndpointIPtr endpoint() const;
+ IceInternal::ConnectorPtr connector() const;
virtual void setAdapter(const ObjectAdapterPtr&); // From Connection.
virtual ObjectAdapterPtr getAdapter() const; // From Connection.
@@ -168,8 +183,9 @@ private:
bool adopted;
};
- ConnectionI(const IceInternal::InstancePtr&, const IceInternal::TransceiverPtr&, const IceInternal::EndpointIPtr&,
- const ObjectAdapterPtr&);
+ ConnectionI(const IceInternal::InstancePtr&, const IceInternal::ConnectionReaperPtr&,
+ const IceInternal::TransceiverPtr&, const IceInternal::ConnectorPtr&,
+ const IceInternal::EndpointIPtr&, const ObjectAdapterPtr&);
virtual ~ConnectionI();
friend class IceInternal::IncomingConnectionFactory;
@@ -237,8 +253,10 @@ private:
const IceInternal::TransceiverPtr _transceiver;
const IceInternal::InstancePtr _instance;
+ const IceInternal::ConnectionReaperPtr _reaper;
const std::string _desc;
const std::string _type;
+ const IceInternal::ConnectorPtr _connector;
const IceInternal::EndpointIPtr _endpoint;
ObjectAdapterPtr _adapter;