summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionFactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp205
1 files changed, 138 insertions, 67 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index ff635001516..9c29f05c431 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -40,10 +40,61 @@ void IceInternal::decRef(OutgoingConnectionFactory* p) { p->__decRef(); }
void IceInternal::incRef(IncomingConnectionFactory* p) { p->__incRef(); }
void IceInternal::decRef(IncomingConnectionFactory* p) { p->__decRef(); }
+void
+IceInternal::OutgoingConnectionFactory::destroy()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(!_instance)
+ {
+ return;
+ }
+
+#ifdef _STLP_BEGIN_NAMESPACE
+ // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h
+ for_each(_connections.begin(), _connections.end(),
+ voidbind2nd(Ice::secondVoidMemFun1<EndpointPtr, Connection, Connection::DestructionReason>
+ (&Connection::destroy), Connection::CommunicatorDestroyed));
+#else
+ for_each(_connections.begin(), _connections.end(),
+ bind2nd(Ice::secondVoidMemFun1<EndpointPtr, Connection, Connection::DestructionReason>
+ (&Connection::destroy), Connection::CommunicatorDestroyed));
+#endif
+
+ _instance = 0;
+ notifyAll();
+}
+
+void
+IceInternal::OutgoingConnectionFactory::waitUntilFinished()
+{
+ ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // First we wait until the factory is destroyed.
+ //
+ while(_instance)
+ {
+ wait();
+ }
+
+ //
+ // Now we wait for until the destruction of each connection is
+ // finished.
+ //
+ for_each(_connections.begin(), _connections.end(),
+ Ice::secondConstVoidMemFun<EndpointPtr, Connection>(&Connection::waitUntilFinished));
+
+ //
+ // We're done, now we can throw away all connections.
+ //
+ _connections.clear();
+}
+
ConnectionPtr
IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpoints)
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(!_instance)
{
@@ -53,12 +104,12 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpoi
assert(!endpoints.empty());
//
- // Reap destroyed connections.
+ // Reap connections for which destruction has completed.
//
std::map<EndpointPtr, ConnectionPtr>::iterator p = _connections.begin();
while(p != _connections.end())
{
- if(p->second->destroyed())
+ if(p->second->isFinished())
{
_connections.erase(p++);
}
@@ -84,7 +135,14 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpoi
map<EndpointPtr, ConnectionPtr>::const_iterator r = _connections.find(endpoint);
if(r != _connections.end())
{
- return r->second;
+ //
+ // Don't return connections for which destruction has been
+ // initiated.
+ //
+ if(!r->second->isDestroyed())
+ {
+ return r->second;
+ }
}
}
@@ -156,7 +214,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpoi
void
IceInternal::OutgoingConnectionFactory::setRouter(const RouterPrx& router)
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(!_instance)
{
@@ -196,7 +254,7 @@ IceInternal::OutgoingConnectionFactory::setRouter(const RouterPrx& router)
void
IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& adapter)
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(!_instance)
{
@@ -220,45 +278,73 @@ IceInternal::OutgoingConnectionFactory::OutgoingConnectionFactory(const Instance
IceInternal::OutgoingConnectionFactory::~OutgoingConnectionFactory()
{
assert(!_instance);
+ assert(_connections.empty());
}
void
-IceInternal::OutgoingConnectionFactory::destroy()
+IceInternal::IncomingConnectionFactory::activate()
{
- IceUtil::Mutex::Lock sync(*this);
-
- if(!_instance)
- {
- return;
- }
-
-#ifdef _STLP_BEGIN_NAMESPACE
- // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h
- for_each(_connections.begin(), _connections.end(),
- voidbind2nd(Ice::secondVoidMemFun1<EndpointPtr, Connection, Connection::DestructionReason>
- (&Connection::destroy), Connection::CommunicatorDestroyed));
-#else
- for_each(_connections.begin(), _connections.end(),
- bind2nd(Ice::secondVoidMemFun1<EndpointPtr, Connection, Connection::DestructionReason>
- (&Connection::destroy), Connection::CommunicatorDestroyed));
-#endif
-
- _connections.clear();
- _instance = 0;
+ ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this);
+ setState(StateActive);
}
void
IceInternal::IncomingConnectionFactory::hold()
{
- ::IceUtil::Mutex::Lock sync(*this);
+ ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this);
setState(StateHolding);
}
void
-IceInternal::IncomingConnectionFactory::activate()
+IceInternal::IncomingConnectionFactory::destroy()
{
- ::IceUtil::Mutex::Lock sync(*this);
- setState(StateActive);
+ ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed);
+}
+
+void
+IceInternal::IncomingConnectionFactory::waitUntilHolding() const
+{
+ ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // First we wait until the connection factory itself is in holding
+ // state.
+ //
+ while(_state < StateHolding)
+ {
+ wait();
+ }
+
+ //
+ // Now we wait until each connection is in holding state.
+ //
+ for_each(_connections.begin(), _connections.end(), Ice::constVoidMemFun(&Connection::waitUntilHolding));
+}
+
+void
+IceInternal::IncomingConnectionFactory::waitUntilFinished()
+{
+ ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // First we wait until the factory is destroyed.
+ //
+ while(_acceptor)
+ {
+ wait();
+ }
+
+ //
+ // Now we wait for until the destruction of each connection is
+ // finished.
+ //
+ for_each(_connections.begin(), _connections.end(), Ice::constVoidMemFun(&Connection::waitUntilFinished));
+
+ //
+ // We're done, now we can throw away all connections.
+ //
+ _connections.clear();
}
EndpointPtr
@@ -283,16 +369,17 @@ IceInternal::IncomingConnectionFactory::equivalent(const EndpointPtr& endp) cons
list<ConnectionPtr>
IceInternal::IncomingConnectionFactory::connections() const
{
- ::IceUtil::Mutex::Lock sync(*this);
+ ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this);
+
+ list<ConnectionPtr> result;
//
- // Reap destroyed connections.
+ // Only copy connections which have not been destroyed.
//
- list<ConnectionPtr>& connections = const_cast<list<ConnectionPtr>& >(_connections);
- connections.erase(remove_if(connections.begin(), connections.end(), ::Ice::constMemFun(&Connection::destroyed)),
- connections.end());
+ remove_copy_if(_connections.begin(), _connections.end(), back_inserter(result),
+ ::Ice::constMemFun(&Connection::isDestroyed));
- return _connections;
+ return result;
}
bool
@@ -310,7 +397,7 @@ IceInternal::IncomingConnectionFactory::read(BasicStream&)
void
IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPtr& threadPool)
{
- ::IceUtil::Mutex::Lock sync(*this);
+ ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this);
if(_state != StateActive)
{
@@ -320,10 +407,11 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
}
//
- // Reap destroyed connections.
+ // Reap connections for which destruction has completed.
//
- _connections.erase(remove_if(_connections.begin(), _connections.end(), ::Ice::constMemFun(&Connection::destroyed)),
- _connections.end());
+ _connections.erase(remove_if(_connections.begin(), _connections.end(),
+ ::Ice::constMemFun(&Connection::isFinished)),
+ _connections.end());
//
// Now accept a new connection.
@@ -394,7 +482,7 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
void
IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool)
{
- ::IceUtil::Mutex::Lock sync(*this);
+ ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this);
threadPool->promoteFollower();
@@ -405,12 +493,8 @@ IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool
else if(_state == StateClosed)
{
_acceptor->close();
-
- //
- // We don't need the adapter anymore after we closed the
- // acceptor.
- //
- _adapter = 0;
+ _acceptor = 0;
+ notifyAll();
}
}
@@ -438,9 +522,9 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
EventHandler(instance),
_endpoint(endpoint),
_adapter(adapter),
+ _registeredWithPool(false),
_warn(_instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0),
- _state(StateHolding),
- _registeredWithPool(false)
+ _state(StateHolding)
{
DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
if(defaultsAndOverrides->overrideTimeout)
@@ -456,16 +540,10 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
ConnectionPtr connection = new Connection(_instance, _transceiver, _endpoint, _adapter);
connection->validate();
_connections.push_back(connection);
-
- //
- // We don't need an adapter anymore if we don't use an
- // acceptor.
- //
- _adapter = 0;
}
else
{
- const_cast<AcceptorPtr&>(_acceptor) = _endpoint->acceptor(const_cast<EndpointPtr&>(_endpoint));
+ _acceptor = _endpoint->acceptor(const_cast<EndpointPtr&>(_endpoint));
assert(_acceptor);
_acceptor->listen();
}
@@ -480,14 +558,8 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
IceInternal::IncomingConnectionFactory::~IncomingConnectionFactory()
{
assert(_state == StateClosed);
- assert(!_adapter);
-}
-
-void
-IceInternal::IncomingConnectionFactory::destroy()
-{
- ::IceUtil::Mutex::Lock sync(*this);
- setState(StateClosed);
+ assert(!_acceptor);
+ assert(_connections.empty());
}
void
@@ -542,13 +614,12 @@ IceInternal::IncomingConnectionFactory::setState(State state)
for_each(_connections.begin(), _connections.end(),
bind2nd(Ice::voidMemFun1(&Connection::destroy), Connection::ObjectAdapterDeactivated));
#endif
- _connections.clear();
-
break;
}
}
_state = state;
+ notifyAll();
}
void