summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionFactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp1436
1 files changed, 718 insertions, 718 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 5b33b528d5a..3008dfaa485 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -45,18 +45,18 @@ IceInternal::OutgoingConnectionFactory::destroy()
if(_destroyed)
{
- return;
+ return;
}
#ifdef _STLP_BEGIN_NAMESPACE
// voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h
for_each(_connections.begin(), _connections.end(),
- voidbind2nd(Ice::secondVoidMemFun1<EndpointIPtr, ConnectionI, ConnectionI::DestructionReason>
- (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed));
+ voidbind2nd(Ice::secondVoidMemFun1<EndpointIPtr, ConnectionI, ConnectionI::DestructionReason>
+ (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed));
#else
for_each(_connections.begin(), _connections.end(),
- bind2nd(Ice::secondVoidMemFun1<const EndpointIPtr, ConnectionI, ConnectionI::DestructionReason>
- (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed));
+ bind2nd(Ice::secondVoidMemFun1<const EndpointIPtr, ConnectionI, ConnectionI::DestructionReason>
+ (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed));
#endif
_destroyed = true;
@@ -69,27 +69,27 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
multimap<EndpointIPtr, ConnectionIPtr> connections;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // First we wait until the factory is destroyed. We also wait
- // until there are no pending connections anymore. Only then
- // we can be sure the _connections contains all connections.
- //
- while(!_destroyed || !_pending.empty())
- {
- wait();
- }
-
- //
- // We want to wait until all connections are finished outside the
- // thread synchronization.
- //
- connections.swap(_connections);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // First we wait until the factory is destroyed. We also wait
+ // until there are no pending connections anymore. Only then
+ // we can be sure the _connections contains all connections.
+ //
+ while(!_destroyed || !_pending.empty())
+ {
+ wait();
+ }
+
+ //
+ // We want to wait until all connections are finished outside the
+ // thread synchronization.
+ //
+ connections.swap(_connections);
}
for_each(connections.begin(), connections.end(),
- Ice::secondVoidMemFun<const EndpointIPtr, ConnectionI>(&ConnectionI::waitUntilFinished));
+ Ice::secondVoidMemFun<const EndpointIPtr, ConnectionI>(&ConnectionI::waitUntilFinished));
}
ConnectionIPtr
@@ -100,131 +100,131 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
vector<EndpointIPtr> endpoints = endpts;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(_destroyed)
- {
- throw CommunicatorDestroyedException(__FILE__, __LINE__);
- }
-
- //
- // Reap connections for which destruction has completed.
- //
- std::multimap<EndpointIPtr, ConnectionIPtr>::iterator p = _connections.begin();
- while(p != _connections.end())
- {
- if(p->second->isFinished())
- {
- _connections.erase(p++);
- }
- else
- {
- ++p;
- }
- }
-
- //
- // Modify endpoints with overrides.
- //
- vector<EndpointIPtr>::iterator q;
- for(q = endpoints.begin(); q != endpoints.end(); ++q)
- {
- if(_instance->defaultsAndOverrides()->overrideTimeout)
- {
- *q = (*q)->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue);
- }
-
- //
- // The Connection object does not take the compression flag of
- // endpoints into account, but instead gets the information
- // about whether messages should be compressed or not from
- // other sources. In order to allow connection sharing for
- // endpoints that differ in the value of the compression flag
- // only, we always set the compression flag to false here in
- // this connection factory.
- //
- *q = (*q)->compress(false);
- }
-
- //
- // Search for existing connections.
- //
- vector<EndpointIPtr>::const_iterator r;
- for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r)
- {
- pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
- multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(*q);
-
- while(pr.first != pr.second)
- {
- //
- // Don't return connections for which destruction has
- // been initiated. The connection must also match the
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(_destroyed)
+ {
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
+
+ //
+ // Reap connections for which destruction has completed.
+ //
+ std::multimap<EndpointIPtr, ConnectionIPtr>::iterator p = _connections.begin();
+ while(p != _connections.end())
+ {
+ if(p->second->isFinished())
+ {
+ _connections.erase(p++);
+ }
+ else
+ {
+ ++p;
+ }
+ }
+
+ //
+ // Modify endpoints with overrides.
+ //
+ vector<EndpointIPtr>::iterator q;
+ for(q = endpoints.begin(); q != endpoints.end(); ++q)
+ {
+ if(_instance->defaultsAndOverrides()->overrideTimeout)
+ {
+ *q = (*q)->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue);
+ }
+
+ //
+ // The Connection object does not take the compression flag of
+ // endpoints into account, but instead gets the information
+ // about whether messages should be compressed or not from
+ // other sources. In order to allow connection sharing for
+ // endpoints that differ in the value of the compression flag
+ // only, we always set the compression flag to false here in
+ // this connection factory.
+ //
+ *q = (*q)->compress(false);
+ }
+
+ //
+ // Search for existing connections.
+ //
+ vector<EndpointIPtr>::const_iterator r;
+ for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r)
+ {
+ pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
+ multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(*q);
+
+ while(pr.first != pr.second)
+ {
+ //
+ // Don't return connections for which destruction has
+ // been initiated. The connection must also match the
// requested thread-per-connection setting.
- //
- if(!pr.first->second->isDestroyed() &&
+ //
+ if(!pr.first->second->isDestroyed() &&
pr.first->second->threadPerConnection() == threadPerConnection)
- {
- if(_instance->defaultsAndOverrides()->overrideCompress)
- {
- compress = _instance->defaultsAndOverrides()->overrideCompressValue;
- }
- else
- {
- compress = (*r)->compress();
- }
-
- return pr.first->second;
- }
-
- ++pr.first;
- }
- }
-
- //
- // If some other thread is currently trying to establish a
- // connection to any of our endpoints, we wait until this
- // thread is finished.
- //
- bool searchAgain = false;
- while(!_destroyed)
- {
- for(q = endpoints.begin(); q != endpoints.end(); ++q)
- {
- if(_pending.find(*q) != _pending.end())
- {
- break;
- }
- }
-
- if(q == endpoints.end())
- {
- break;
- }
-
- searchAgain = true;
-
- wait();
- }
-
- if(_destroyed)
- {
- throw CommunicatorDestroyedException(__FILE__, __LINE__);
- }
-
- //
- // Search for existing connections again if we waited above,
- // as new connections might have been added in the meantime.
- //
- if(searchAgain)
- {
- for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r)
- {
- pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
- multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(*q);
-
- while(pr.first != pr.second)
- {
+ {
+ if(_instance->defaultsAndOverrides()->overrideCompress)
+ {
+ compress = _instance->defaultsAndOverrides()->overrideCompressValue;
+ }
+ else
+ {
+ compress = (*r)->compress();
+ }
+
+ return pr.first->second;
+ }
+
+ ++pr.first;
+ }
+ }
+
+ //
+ // If some other thread is currently trying to establish a
+ // connection to any of our endpoints, we wait until this
+ // thread is finished.
+ //
+ bool searchAgain = false;
+ while(!_destroyed)
+ {
+ for(q = endpoints.begin(); q != endpoints.end(); ++q)
+ {
+ if(_pending.find(*q) != _pending.end())
+ {
+ break;
+ }
+ }
+
+ if(q == endpoints.end())
+ {
+ break;
+ }
+
+ searchAgain = true;
+
+ wait();
+ }
+
+ if(_destroyed)
+ {
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
+
+ //
+ // Search for existing connections again if we waited above,
+ // as new connections might have been added in the meantime.
+ //
+ if(searchAgain)
+ {
+ for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r)
+ {
+ pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
+ multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(*q);
+
+ while(pr.first != pr.second)
+ {
//
// Don't return connections for which destruction has
// been initiated. The connection must also match the
@@ -232,31 +232,31 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
//
if(!pr.first->second->isDestroyed() &&
pr.first->second->threadPerConnection() == threadPerConnection)
- {
- if(_instance->defaultsAndOverrides()->overrideCompress)
- {
- compress = _instance->defaultsAndOverrides()->overrideCompressValue;
- }
- else
- {
- compress = (*r)->compress();
- }
-
- return pr.first->second;
- }
-
- ++pr.first;
- }
- }
- }
-
- //
- // No connection to any of our endpoints exists yet, so we
- // will try to create one. To avoid that other threads try to
- // create connections to the same endpoints, we add our
- // endpoints to _pending.
- //
- _pending.insert(endpoints.begin(), endpoints.end());
+ {
+ if(_instance->defaultsAndOverrides()->overrideCompress)
+ {
+ compress = _instance->defaultsAndOverrides()->overrideCompressValue;
+ }
+ else
+ {
+ compress = (*r)->compress();
+ }
+
+ return pr.first->second;
+ }
+
+ ++pr.first;
+ }
+ }
+ }
+
+ //
+ // No connection to any of our endpoints exists yet, so we
+ // will try to create one. To avoid that other threads try to
+ // create connections to the same endpoints, we add our
+ // endpoints to _pending.
+ //
+ _pending.insert(endpoints.begin(), endpoints.end());
}
ConnectionIPtr connection;
@@ -266,112 +266,112 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
vector<EndpointIPtr>::const_iterator r;
for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r)
{
- EndpointIPtr endpoint = *q;
-
- try
- {
- TransceiverPtr transceiver = endpoint->clientTransceiver();
- if(!transceiver)
- {
- ConnectorPtr connector = endpoint->connector();
- assert(connector);
-
- Int timeout;
- if(_instance->defaultsAndOverrides()->overrideConnectTimeout)
- {
- timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue;
- }
- // It is not necessary to check for overrideTimeout,
- // the endpoint has already been modified with this
- // override, if set.
- else
- {
- timeout = endpoint->timeout();
- }
-
- transceiver = connector->connect(timeout);
- assert(transceiver);
- }
- connection = new ConnectionI(_instance, transceiver, endpoint, 0, threadPerConnection,
+ EndpointIPtr endpoint = *q;
+
+ try
+ {
+ TransceiverPtr transceiver = endpoint->clientTransceiver();
+ if(!transceiver)
+ {
+ ConnectorPtr connector = endpoint->connector();
+ assert(connector);
+
+ Int timeout;
+ if(_instance->defaultsAndOverrides()->overrideConnectTimeout)
+ {
+ timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue;
+ }
+ // It is not necessary to check for overrideTimeout,
+ // the endpoint has already been modified with this
+ // override, if set.
+ else
+ {
+ timeout = endpoint->timeout();
+ }
+
+ transceiver = connector->connect(timeout);
+ assert(transceiver);
+ }
+ connection = new ConnectionI(_instance, transceiver, endpoint, 0, threadPerConnection,
_instance->threadPerConnectionStackSize());
- connection->validate();
-
- if(_instance->defaultsAndOverrides()->overrideCompress)
- {
- compress = _instance->defaultsAndOverrides()->overrideCompressValue;
- }
- else
- {
- compress = (*r)->compress();
- }
- break;
- }
- catch(const LocalException& ex)
- {
- exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
-
- //
- // If a connection object was constructed, then validate()
- // must have raised the exception.
- //
- if(connection)
- {
- connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup.
- connection = 0;
- }
- }
-
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->retry >= 2)
- {
- Trace out(_instance->initializationData().logger, traceLevels->retryCat);
-
- out << "connection to endpoint failed";
- if(moreEndpts || q + 1 != endpoints.end())
- {
- out << ", trying next endpoint\n";
- }
- else
- {
- out << " and no more endpoints to try\n";
- }
- out << *exception.get();
- }
+ connection->validate();
+
+ if(_instance->defaultsAndOverrides()->overrideCompress)
+ {
+ compress = _instance->defaultsAndOverrides()->overrideCompressValue;
+ }
+ else
+ {
+ compress = (*r)->compress();
+ }
+ break;
+ }
+ catch(const LocalException& ex)
+ {
+ exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
+
+ //
+ // If a connection object was constructed, then validate()
+ // must have raised the exception.
+ //
+ if(connection)
+ {
+ connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup.
+ connection = 0;
+ }
+ }
+
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->retry >= 2)
+ {
+ Trace out(_instance->initializationData().logger, traceLevels->retryCat);
+
+ out << "connection to endpoint failed";
+ if(moreEndpts || q + 1 != endpoints.end())
+ {
+ out << ", trying next endpoint\n";
+ }
+ else
+ {
+ out << " and no more endpoints to try\n";
+ }
+ out << *exception.get();
+ }
}
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // Signal other threads that we are done with trying to
- // establish connections to our endpoints.
- //
- for(q = endpoints.begin(); q != endpoints.end(); ++q)
- {
- _pending.erase(*q);
- }
- notifyAll();
-
- if(!connection)
- {
- assert(exception.get());
- exception->ice_throw();
- }
- else
- {
- _connections.insert(_connections.end(),
- pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint(), connection));
-
- if(_destroyed)
- {
- connection->destroy(ConnectionI::CommunicatorDestroyed);
- throw CommunicatorDestroyedException(__FILE__, __LINE__);
- }
- else
- {
- connection->activate();
- }
- }
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // Signal other threads that we are done with trying to
+ // establish connections to our endpoints.
+ //
+ for(q = endpoints.begin(); q != endpoints.end(); ++q)
+ {
+ _pending.erase(*q);
+ }
+ notifyAll();
+
+ if(!connection)
+ {
+ assert(exception.get());
+ exception->ice_throw();
+ }
+ else
+ {
+ _connections.insert(_connections.end(),
+ pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint(), connection));
+
+ if(_destroyed)
+ {
+ connection->destroy(ConnectionI::CommunicatorDestroyed);
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
+ else
+ {
+ connection->activate();
+ }
+ }
}
assert(connection);
@@ -385,7 +385,7 @@ IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& route
if(_destroyed)
{
- throw CommunicatorDestroyedException(__FILE__, __LINE__);
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
}
assert(routerInfo);
@@ -401,44 +401,44 @@ IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& route
vector<EndpointIPtr>::const_iterator p;
for(p = endpoints.begin(); p != endpoints.end(); ++p)
{
- EndpointIPtr endpoint = *p;
-
- //
- // Modify endpoints with overrides.
- //
- if(_instance->defaultsAndOverrides()->overrideTimeout)
- {
- endpoint = endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue);
- }
-
- //
- // The Connection object does not take the compression flag of
- // endpoints into account, but instead gets the information
- // about whether messages should be compressed or not from
- // other sources. In order to allow connection sharing for
- // endpoints that differ in the value of the compression flag
- // only, we always set the compression flag to false here in
- // this connection factory.
- //
- endpoint = endpoint->compress(false);
-
- pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
- multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(endpoint);
-
- while(pr.first != pr.second)
- {
- try
- {
- pr.first->second->setAdapter(adapter);
- }
- catch(const Ice::LocalException&)
- {
- //
- // Ignore, the connection is being closed or closed.
- //
- }
- ++pr.first;
- }
+ EndpointIPtr endpoint = *p;
+
+ //
+ // Modify endpoints with overrides.
+ //
+ if(_instance->defaultsAndOverrides()->overrideTimeout)
+ {
+ endpoint = endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue);
+ }
+
+ //
+ // The Connection object does not take the compression flag of
+ // endpoints into account, but instead gets the information
+ // about whether messages should be compressed or not from
+ // other sources. In order to allow connection sharing for
+ // endpoints that differ in the value of the compression flag
+ // only, we always set the compression flag to false here in
+ // this connection factory.
+ //
+ endpoint = endpoint->compress(false);
+
+ pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
+ multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(endpoint);
+
+ while(pr.first != pr.second)
+ {
+ try
+ {
+ pr.first->second->setAdapter(adapter);
+ }
+ catch(const Ice::LocalException&)
+ {
+ //
+ // Ignore, the connection is being closed or closed.
+ //
+ }
+ ++pr.first;
+ }
}
}
@@ -449,24 +449,24 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad
if(_destroyed)
{
- return;
+ return;
}
for(multimap<EndpointIPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p)
{
- if(p->second->getAdapter() == adapter)
- {
- try
- {
- p->second->setAdapter(0);
- }
- catch(const Ice::LocalException&)
- {
- //
- // Ignore, the connection is being closed or closed.
- //
- }
- }
+ if(p->second->getAdapter() == adapter)
+ {
+ try
+ {
+ p->second->setAdapter(0);
+ }
+ catch(const Ice::LocalException&)
+ {
+ //
+ // Ignore, the connection is being closed or closed.
+ //
+ }
+ }
}
}
@@ -476,26 +476,26 @@ IceInternal::OutgoingConnectionFactory::flushBatchRequests()
list<ConnectionIPtr> c;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- for(std::multimap<EndpointIPtr, ConnectionIPtr>::const_iterator p = _connections.begin();
- p != _connections.end();
- ++p)
- {
- c.push_back(p->second);
- }
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ for(std::multimap<EndpointIPtr, ConnectionIPtr>::const_iterator p = _connections.begin();
+ p != _connections.end();
+ ++p)
+ {
+ c.push_back(p->second);
+ }
}
for(list<ConnectionIPtr>::const_iterator p = c.begin(); p != c.end(); ++p)
{
- try
- {
- (*p)->flushBatchRequests();
- }
- catch(const LocalException&)
- {
- // Ignore.
- }
+ try
+ {
+ (*p)->flushBatchRequests();
+ }
+ catch(const LocalException&)
+ {
+ // Ignore.
+ }
}
}
@@ -538,22 +538,22 @@ IceInternal::IncomingConnectionFactory::waitUntilHolding() const
list<ConnectionIPtr> connections;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // First we wait until the connection factory itself is in holding
- // state.
- //
- while(_state < StateHolding)
- {
- wait();
- }
-
- //
- // We want to wait until all connections are in holding state
- // outside the thread synchronization.
- //
- connections = _connections;
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // First we wait until the connection factory itself is in holding
+ // state.
+ //
+ while(_state < StateHolding)
+ {
+ wait();
+ }
+
+ //
+ // We want to wait until all connections are in holding state
+ // outside the thread synchronization.
+ //
+ connections = _connections;
}
//
@@ -569,35 +569,35 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished()
list<ConnectionIPtr> connections;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // First we wait until the factory is destroyed. If we are using
- // an acceptor, we also wait for it to be closed.
- //
- while(_state != StateClosed || _acceptor)
- {
- wait();
- }
-
- threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory;
- _threadPerIncomingConnectionFactory = 0;
-
- //
- // Clear the OA. See bug 1673 for the details of why this is necessary.
- //
- _adapter = 0;
-
- //
- // We want to wait until all connections are finished outside the
- // thread synchronization.
- //
- connections.swap(_connections);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // First we wait until the factory is destroyed. If we are using
+ // an acceptor, we also wait for it to be closed.
+ //
+ while(_state != StateClosed || _acceptor)
+ {
+ wait();
+ }
+
+ threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory;
+ _threadPerIncomingConnectionFactory = 0;
+
+ //
+ // Clear the OA. See bug 1673 for the details of why this is necessary.
+ //
+ _adapter = 0;
+
+ //
+ // We want to wait until all connections are finished outside the
+ // thread synchronization.
+ //
+ connections.swap(_connections);
}
if(threadPerIncomingConnectionFactory)
{
- threadPerIncomingConnectionFactory->getThreadControl().join();
+ threadPerIncomingConnectionFactory->getThreadControl().join();
}
for_each(connections.begin(), connections.end(), Ice::voidMemFun(&ConnectionI::waitUntilFinished));
@@ -615,7 +615,7 @@ IceInternal::IncomingConnectionFactory::equivalent(const EndpointIPtr& endp) con
{
if(_transceiver)
{
- return endp->equivalent(_transceiver);
+ return endp->equivalent(_transceiver);
}
assert(_acceptor);
@@ -633,7 +633,7 @@ IceInternal::IncomingConnectionFactory::connections() const
// Only copy connections which have not been destroyed.
//
remove_copy_if(_connections.begin(), _connections.end(), back_inserter(result),
- Ice::constMemFun(&ConnectionI::isDestroyed));
+ Ice::constMemFun(&ConnectionI::isDestroyed));
return result;
}
@@ -645,14 +645,14 @@ IceInternal::IncomingConnectionFactory::flushBatchRequests()
for(list<ConnectionIPtr>::const_iterator p = c.begin(); p != c.end(); ++p)
{
- try
- {
- (*p)->flushBatchRequests();
- }
- catch(const LocalException&)
- {
- // Ignore.
- }
+ try
+ {
+ (*p)->flushBatchRequests();
+ }
+ catch(const LocalException&)
+ {
+ // Ignore.
+ }
}
}
@@ -682,13 +682,13 @@ class PromoteFollower
public:
PromoteFollower(const ThreadPoolPtr& threadPool) :
- _threadPool(threadPool)
+ _threadPool(threadPool)
{
}
~PromoteFollower()
{
- _threadPool->promoteFollower();
+ _threadPool->promoteFollower();
}
private:
@@ -704,73 +704,73 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
ConnectionIPtr connection;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // This makes sure that we promote a follower before we leave
- // the scope of the mutex above, but after we call accept()
- // (if we call it).
- //
- // If _threadPool is null, then this class doesn't do
- // anything.
- //
- PromoteFollower promote(threadPool);
-
- if(_state != StateActive)
- {
- IceUtil::ThreadControl::yield();
- return;
- }
-
- //
- // Reap connections for which destruction has completed.
- //
- _connections.erase(remove_if(_connections.begin(), _connections.end(),
- Ice::constMemFun(&ConnectionI::isFinished)),
- _connections.end());
-
- //
- // Now accept a new connection.
- //
- TransceiverPtr transceiver;
- try
- {
- transceiver = _acceptor->accept(0);
- }
- catch(const SocketException&)
- {
- // Ignore socket exceptions.
- return;
- }
- catch(const TimeoutException&)
- {
- // Ignore timeouts.
- return;
- }
- catch(const LocalException& ex)
- {
- // Warn about other Ice local exceptions.
- if(_warn)
- {
- Warning out(_instance->initializationData().logger);
- out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
- }
- return;
- }
-
- assert(transceiver);
-
- try
- {
- connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection,
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // This makes sure that we promote a follower before we leave
+ // the scope of the mutex above, but after we call accept()
+ // (if we call it).
+ //
+ // If _threadPool is null, then this class doesn't do
+ // anything.
+ //
+ PromoteFollower promote(threadPool);
+
+ if(_state != StateActive)
+ {
+ IceUtil::ThreadControl::yield();
+ return;
+ }
+
+ //
+ // Reap connections for which destruction has completed.
+ //
+ _connections.erase(remove_if(_connections.begin(), _connections.end(),
+ Ice::constMemFun(&ConnectionI::isFinished)),
+ _connections.end());
+
+ //
+ // Now accept a new connection.
+ //
+ TransceiverPtr transceiver;
+ try
+ {
+ transceiver = _acceptor->accept(0);
+ }
+ catch(const SocketException&)
+ {
+ // Ignore socket exceptions.
+ return;
+ }
+ catch(const TimeoutException&)
+ {
+ // Ignore timeouts.
+ return;
+ }
+ catch(const LocalException& ex)
+ {
+ // Warn about other Ice local exceptions.
+ if(_warn)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
+ }
+ return;
+ }
+
+ assert(transceiver);
+
+ try
+ {
+ connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection,
_threadPerConnectionStackSize);
- }
- catch(const LocalException&)
- {
- return;
- }
+ }
+ catch(const LocalException&)
+ {
+ return;
+ }
- _connections.push_back(connection);
+ _connections.push_back(connection);
}
assert(connection);
@@ -781,14 +781,14 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
//
try
{
- connection->validate();
+ connection->validate();
}
catch(const LocalException&)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup.
- _connections.remove(connection);
- return;
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup.
+ _connections.remove(connection);
+ return;
}
connection->activate();
@@ -808,10 +808,10 @@ IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool
if(_finishedCount == 0 && _state == StateClosed)
{
- dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->decFdsInUse();
- _acceptor->close();
- _acceptor = 0;
- notifyAll();
+ dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->decFdsInUse();
+ _acceptor->close();
+ _acceptor = 0;
+ notifyAll();
}
}
@@ -828,7 +828,7 @@ IceInternal::IncomingConnectionFactory::toString() const
if(_transceiver)
{
- return _transceiver->toString();
+ return _transceiver->toString();
}
assert(_acceptor);
@@ -836,9 +836,9 @@ IceInternal::IncomingConnectionFactory::toString() const
}
IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const InstancePtr& instance,
- const EndpointIPtr& endpoint,
- const ObjectAdapterPtr& adapter,
- const string& adapterName) :
+ const EndpointIPtr& endpoint,
+ const ObjectAdapterPtr& adapter,
+ const string& adapterName) :
EventHandler(instance),
_endpoint(endpoint),
_adapter(adapter),
@@ -849,14 +849,14 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
{
if(_instance->defaultsAndOverrides()->overrideTimeout)
{
- const_cast<EndpointIPtr&>(_endpoint) =
- _endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue);
+ const_cast<EndpointIPtr&>(_endpoint) =
+ _endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue);
}
if(_instance->defaultsAndOverrides()->overrideCompress)
{
- const_cast<EndpointIPtr&>(_endpoint) =
- _endpoint->compress(_instance->defaultsAndOverrides()->overrideCompressValue);
+ const_cast<EndpointIPtr&>(_endpoint) =
+ _endpoint->compress(_instance->defaultsAndOverrides()->overrideCompressValue);
}
ObjectAdapterI* adapterImpl = dynamic_cast<ObjectAdapterI*>(_adapter.get());
@@ -866,75 +866,75 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
const_cast<TransceiverPtr&>(_transceiver) = _endpoint->serverTransceiver(const_cast<EndpointIPtr&>(_endpoint));
if(_transceiver)
{
- ConnectionIPtr connection;
+ ConnectionIPtr connection;
- try
- {
- connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter, _threadPerConnection,
+ try
+ {
+ connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter, _threadPerConnection,
_threadPerConnectionStackSize);
- connection->validate();
- }
- catch(const LocalException&)
- {
- //
- // If a connection object was constructed, then validate()
- // must have raised the exception.
- //
- if(connection)
- {
- connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup.
- }
-
- return;
- }
-
- _connections.push_back(connection);
+ connection->validate();
+ }
+ catch(const LocalException&)
+ {
+ //
+ // If a connection object was constructed, then validate()
+ // must have raised the exception.
+ //
+ if(connection)
+ {
+ connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup.
+ }
+
+ return;
+ }
+
+ _connections.push_back(connection);
}
else
{
- _acceptor = _endpoint->acceptor(const_cast<EndpointIPtr&>(_endpoint), adapterName);
- assert(_acceptor);
- _acceptor->listen();
-
- __setNoDelete(true);
- try
- {
- if(_threadPerConnection)
- {
- //
- // If we are in thread per connection mode, we also use
- // one thread per incoming connection factory, that
- // accepts new connections on this endpoint.
- //
- _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this);
- _threadPerIncomingConnectionFactory->start(_threadPerConnectionStackSize);
- }
- else
- {
- adapterImpl->getThreadPool()->incFdsInUse();
- }
- }
- catch(const IceUtil::Exception& ex)
- {
- if(_threadPerConnection)
- {
- Error out(_instance->initializationData().logger);
- out << "cannot create thread for incoming connection factory:\n" << ex;
- }
-
- try
- {
- _acceptor->close();
- }
- catch(const LocalException&)
- {
- // Here we ignore any exceptions in close().
- }
-
- __setNoDelete(false);
- ex.ice_throw();
- }
- __setNoDelete(false);
+ _acceptor = _endpoint->acceptor(const_cast<EndpointIPtr&>(_endpoint), adapterName);
+ assert(_acceptor);
+ _acceptor->listen();
+
+ __setNoDelete(true);
+ try
+ {
+ if(_threadPerConnection)
+ {
+ //
+ // If we are in thread per connection mode, we also use
+ // one thread per incoming connection factory, that
+ // accepts new connections on this endpoint.
+ //
+ _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this);
+ _threadPerIncomingConnectionFactory->start(_threadPerConnectionStackSize);
+ }
+ else
+ {
+ adapterImpl->getThreadPool()->incFdsInUse();
+ }
+ }
+ catch(const IceUtil::Exception& ex)
+ {
+ if(_threadPerConnection)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "cannot create thread for incoming connection factory:\n" << ex;
+ }
+
+ try
+ {
+ _acceptor->close();
+ }
+ catch(const LocalException&)
+ {
+ // Here we ignore any exceptions in close().
+ }
+
+ __setNoDelete(false);
+ ex.ice_throw();
+ }
+ __setNoDelete(false);
}
}
@@ -951,74 +951,74 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
if(_state == state) // Don't switch twice.
{
- return;
+ return;
}
switch(state)
{
- case StateActive:
- {
- if(_state != StateHolding) // Can only switch from holding to active.
- {
- return;
- }
- if(!_threadPerConnection && _acceptor)
- {
- registerWithPool();
- }
- for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::activate));
- break;
- }
-
- case StateHolding:
- {
- if(_state != StateActive) // Can only switch from active to holding.
- {
- return;
- }
- if(!_threadPerConnection && _acceptor)
- {
- unregisterWithPool();
- }
- for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::hold));
- break;
- }
-
- case StateClosed:
- {
- if(_acceptor)
- {
- if(_threadPerConnection)
- {
- //
- // If we are in thread per connection mode, we connect
- // to our own acceptor, which unblocks our thread per
- // incoming connection factory stuck in accept().
- //
- _acceptor->connectToSelf();
- }
- else
- {
- //
- // Otherwise we first must make sure that we are
- // registered, then we unregister, and let finished()
- // do the close.
- //
- registerWithPool();
- unregisterWithPool();
- }
- }
+ case StateActive:
+ {
+ if(_state != StateHolding) // Can only switch from holding to active.
+ {
+ return;
+ }
+ if(!_threadPerConnection && _acceptor)
+ {
+ registerWithPool();
+ }
+ for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::activate));
+ break;
+ }
+
+ case StateHolding:
+ {
+ if(_state != StateActive) // Can only switch from active to holding.
+ {
+ return;
+ }
+ if(!_threadPerConnection && _acceptor)
+ {
+ unregisterWithPool();
+ }
+ for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::hold));
+ break;
+ }
+
+ case StateClosed:
+ {
+ if(_acceptor)
+ {
+ if(_threadPerConnection)
+ {
+ //
+ // If we are in thread per connection mode, we connect
+ // to our own acceptor, which unblocks our thread per
+ // incoming connection factory stuck in accept().
+ //
+ _acceptor->connectToSelf();
+ }
+ else
+ {
+ //
+ // Otherwise we first must make sure that we are
+ // registered, then we unregister, and let finished()
+ // do the close.
+ //
+ registerWithPool();
+ unregisterWithPool();
+ }
+ }
#ifdef _STLP_BEGIN_NAMESPACE
- // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h
- for_each(_connections.begin(), _connections.end(),
- voidbind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated));
+ // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h
+ for_each(_connections.begin(), _connections.end(),
+ voidbind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated));
#else
- for_each(_connections.begin(), _connections.end(),
- bind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated));
+ for_each(_connections.begin(), _connections.end(),
+ bind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated));
#endif
- break;
- }
+ break;
+ }
}
_state = state;
@@ -1033,8 +1033,8 @@ IceInternal::IncomingConnectionFactory::registerWithPool()
if(!_registeredWithPool)
{
- dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->_register(_acceptor->fd(), this);
- _registeredWithPool = true;
+ dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->_register(_acceptor->fd(), this);
+ _registeredWithPool = true;
}
}
@@ -1046,9 +1046,9 @@ IceInternal::IncomingConnectionFactory::unregisterWithPool()
if(_registeredWithPool)
{
- dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->unregister(_acceptor->fd());
- _registeredWithPool = false;
- ++_finishedCount; // For each unregistration, finished() is called once.
+ dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->unregister(_acceptor->fd());
+ _registeredWithPool = false;
+ ++_finishedCount; // For each unregistration, finished() is called once.
}
}
@@ -1059,111 +1059,111 @@ IceInternal::IncomingConnectionFactory::run()
while(true)
{
- //
- // We must accept new connections outside the thread
- // synchronization, because we use blocking accept.
- //
- TransceiverPtr transceiver;
- try
- {
- transceiver = _acceptor->accept(-1);
- }
- catch(const SocketException&)
- {
- // Ignore socket exceptions.
- }
- catch(const TimeoutException&)
- {
- // Ignore timeouts.
- }
- catch(const LocalException& ex)
- {
- // Warn about other Ice local exceptions.
- if(_warn)
- {
- Warning out(_instance->initializationData().logger);
- out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
- }
- }
-
- ConnectionIPtr connection;
-
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- while(_state == StateHolding)
- {
- wait();
- }
-
- if(_state == StateClosed)
- {
- if(transceiver)
- {
- try
- {
- transceiver->close();
- }
- catch(const LocalException&)
- {
- // Here we ignore any exceptions in close().
- }
- }
-
- try
- {
- _acceptor->close();
- }
- catch(const LocalException& ex)
- {
- _acceptor = 0;
- notifyAll();
- ex.ice_throw();
- }
-
- _acceptor = 0;
- notifyAll();
- return;
- }
-
- assert(_state == StateActive);
-
- //
- // Reap connections for which destruction has completed.
- //
- _connections.erase(remove_if(_connections.begin(), _connections.end(),
- Ice::constMemFun(&ConnectionI::isFinished)),
- _connections.end());
-
- //
- // Create a connection object for the connection.
- //
- if(transceiver)
- {
- try
- {
- connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection,
+ //
+ // We must accept new connections outside the thread
+ // synchronization, because we use blocking accept.
+ //
+ TransceiverPtr transceiver;
+ try
+ {
+ transceiver = _acceptor->accept(-1);
+ }
+ catch(const SocketException&)
+ {
+ // Ignore socket exceptions.
+ }
+ catch(const TimeoutException&)
+ {
+ // Ignore timeouts.
+ }
+ catch(const LocalException& ex)
+ {
+ // Warn about other Ice local exceptions.
+ if(_warn)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
+ }
+ }
+
+ ConnectionIPtr connection;
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ while(_state == StateHolding)
+ {
+ wait();
+ }
+
+ if(_state == StateClosed)
+ {
+ if(transceiver)
+ {
+ try
+ {
+ transceiver->close();
+ }
+ catch(const LocalException&)
+ {
+ // Here we ignore any exceptions in close().
+ }
+ }
+
+ try
+ {
+ _acceptor->close();
+ }
+ catch(const LocalException& ex)
+ {
+ _acceptor = 0;
+ notifyAll();
+ ex.ice_throw();
+ }
+
+ _acceptor = 0;
+ notifyAll();
+ return;
+ }
+
+ assert(_state == StateActive);
+
+ //
+ // Reap connections for which destruction has completed.
+ //
+ _connections.erase(remove_if(_connections.begin(), _connections.end(),
+ Ice::constMemFun(&ConnectionI::isFinished)),
+ _connections.end());
+
+ //
+ // Create a connection object for the connection.
+ //
+ if(transceiver)
+ {
+ try
+ {
+ connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection,
_threadPerConnectionStackSize);
- }
- catch(const LocalException&)
- {
- return;
- }
-
- _connections.push_back(connection);
- }
- }
-
- //
- // In thread per connection mode, the connection's thread will
- // take care of connection validation and activation (for
- // non-datagram connections). We don't want to block this
- // thread waiting until validation is complete, because in
- // contrast to thread pool mode, it is the only thread that
- // can accept connections with this factory's
- // acceptor. Therefore we don't call validate() and activate()
- // from the connection factory in thread per connection mode.
- //
+ }
+ catch(const LocalException&)
+ {
+ return;
+ }
+
+ _connections.push_back(connection);
+ }
+ }
+
+ //
+ // In thread per connection mode, the connection's thread will
+ // take care of connection validation and activation (for
+ // non-datagram connections). We don't want to block this
+ // thread waiting until validation is complete, because in
+ // contrast to thread pool mode, it is the only thread that
+ // can accept connections with this factory's
+ // acceptor. Therefore we don't call validate() and activate()
+ // from the connection factory in thread per connection mode.
+ //
}
}
@@ -1178,22 +1178,22 @@ IceInternal::IncomingConnectionFactory::ThreadPerIncomingConnectionFactory::run(
{
try
{
- _factory->run();
+ _factory->run();
}
catch(const Exception& ex)
- {
- Error out(_factory->_instance->initializationData().logger);
- out << "exception in thread per incoming connection factory:\n" << _factory->toString() << ex;
+ {
+ Error out(_factory->_instance->initializationData().logger);
+ out << "exception in thread per incoming connection factory:\n" << _factory->toString() << ex;
}
catch(const std::exception& ex)
{
- Error out(_factory->_instance->initializationData().logger);
- out << "std::exception in thread per incoming connection factory:\n" << _factory->toString() << ex.what();
+ Error out(_factory->_instance->initializationData().logger);
+ out << "std::exception in thread per incoming connection factory:\n" << _factory->toString() << ex.what();
}
catch(...)
{
- Error out(_factory->_instance->initializationData().logger);
- out << "unknown exception in thread per incoming connection factory:\n" << _factory->toString();
+ Error out(_factory->_instance->initializationData().logger);
+ out << "unknown exception in thread per incoming connection factory:\n" << _factory->toString();
}
_factory = 0; // Resolve cyclic dependency.