diff options
author | Bernard Normier <bernard@zeroc.com> | 2007-02-01 17:09:49 +0000 |
---|---|---|
committer | Bernard Normier <bernard@zeroc.com> | 2007-02-01 17:09:49 +0000 |
commit | abada90e3f84dc703b8ddc9efcbed8a946fadead (patch) | |
tree | 2c6f9dccd510ea97cb927a7bd635422efaae547a /cpp/src/Ice/ConnectionFactory.cpp | |
parent | removing trace message (diff) | |
download | ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.bz2 ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.xz ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.zip |
Expanded tabs into spaces
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 1436 |
1 files changed, 718 insertions, 718 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 5b33b528d5a..3008dfaa485 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -45,18 +45,18 @@ IceInternal::OutgoingConnectionFactory::destroy() if(_destroyed) { - return; + return; } #ifdef _STLP_BEGIN_NAMESPACE // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h for_each(_connections.begin(), _connections.end(), - voidbind2nd(Ice::secondVoidMemFun1<EndpointIPtr, ConnectionI, ConnectionI::DestructionReason> - (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); + voidbind2nd(Ice::secondVoidMemFun1<EndpointIPtr, ConnectionI, ConnectionI::DestructionReason> + (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); #else for_each(_connections.begin(), _connections.end(), - bind2nd(Ice::secondVoidMemFun1<const EndpointIPtr, ConnectionI, ConnectionI::DestructionReason> - (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); + bind2nd(Ice::secondVoidMemFun1<const EndpointIPtr, ConnectionI, ConnectionI::DestructionReason> + (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); #endif _destroyed = true; @@ -69,27 +69,27 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() multimap<EndpointIPtr, ConnectionIPtr> connections; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // First we wait until the factory is destroyed. We also wait - // until there are no pending connections anymore. Only then - // we can be sure the _connections contains all connections. - // - while(!_destroyed || !_pending.empty()) - { - wait(); - } - - // - // We want to wait until all connections are finished outside the - // thread synchronization. - // - connections.swap(_connections); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // First we wait until the factory is destroyed. We also wait + // until there are no pending connections anymore. Only then + // we can be sure the _connections contains all connections. + // + while(!_destroyed || !_pending.empty()) + { + wait(); + } + + // + // We want to wait until all connections are finished outside the + // thread synchronization. + // + connections.swap(_connections); } for_each(connections.begin(), connections.end(), - Ice::secondVoidMemFun<const EndpointIPtr, ConnectionI>(&ConnectionI::waitUntilFinished)); + Ice::secondVoidMemFun<const EndpointIPtr, ConnectionI>(&ConnectionI::waitUntilFinished)); } ConnectionIPtr @@ -100,131 +100,131 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt vector<EndpointIPtr> endpoints = endpts; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if(_destroyed) - { - throw CommunicatorDestroyedException(__FILE__, __LINE__); - } - - // - // Reap connections for which destruction has completed. - // - std::multimap<EndpointIPtr, ConnectionIPtr>::iterator p = _connections.begin(); - while(p != _connections.end()) - { - if(p->second->isFinished()) - { - _connections.erase(p++); - } - else - { - ++p; - } - } - - // - // Modify endpoints with overrides. - // - vector<EndpointIPtr>::iterator q; - for(q = endpoints.begin(); q != endpoints.end(); ++q) - { - if(_instance->defaultsAndOverrides()->overrideTimeout) - { - *q = (*q)->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); - } - - // - // The Connection object does not take the compression flag of - // endpoints into account, but instead gets the information - // about whether messages should be compressed or not from - // other sources. In order to allow connection sharing for - // endpoints that differ in the value of the compression flag - // only, we always set the compression flag to false here in - // this connection factory. - // - *q = (*q)->compress(false); - } - - // - // Search for existing connections. - // - vector<EndpointIPtr>::const_iterator r; - for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r) - { - pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, - multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(*q); - - while(pr.first != pr.second) - { - // - // Don't return connections for which destruction has - // been initiated. The connection must also match the + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(_destroyed) + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + + // + // Reap connections for which destruction has completed. + // + std::multimap<EndpointIPtr, ConnectionIPtr>::iterator p = _connections.begin(); + while(p != _connections.end()) + { + if(p->second->isFinished()) + { + _connections.erase(p++); + } + else + { + ++p; + } + } + + // + // Modify endpoints with overrides. + // + vector<EndpointIPtr>::iterator q; + for(q = endpoints.begin(); q != endpoints.end(); ++q) + { + if(_instance->defaultsAndOverrides()->overrideTimeout) + { + *q = (*q)->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); + } + + // + // The Connection object does not take the compression flag of + // endpoints into account, but instead gets the information + // about whether messages should be compressed or not from + // other sources. In order to allow connection sharing for + // endpoints that differ in the value of the compression flag + // only, we always set the compression flag to false here in + // this connection factory. + // + *q = (*q)->compress(false); + } + + // + // Search for existing connections. + // + vector<EndpointIPtr>::const_iterator r; + for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r) + { + pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, + multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(*q); + + while(pr.first != pr.second) + { + // + // Don't return connections for which destruction has + // been initiated. The connection must also match the // requested thread-per-connection setting. - // - if(!pr.first->second->isDestroyed() && + // + if(!pr.first->second->isDestroyed() && pr.first->second->threadPerConnection() == threadPerConnection) - { - if(_instance->defaultsAndOverrides()->overrideCompress) - { - compress = _instance->defaultsAndOverrides()->overrideCompressValue; - } - else - { - compress = (*r)->compress(); - } - - return pr.first->second; - } - - ++pr.first; - } - } - - // - // If some other thread is currently trying to establish a - // connection to any of our endpoints, we wait until this - // thread is finished. - // - bool searchAgain = false; - while(!_destroyed) - { - for(q = endpoints.begin(); q != endpoints.end(); ++q) - { - if(_pending.find(*q) != _pending.end()) - { - break; - } - } - - if(q == endpoints.end()) - { - break; - } - - searchAgain = true; - - wait(); - } - - if(_destroyed) - { - throw CommunicatorDestroyedException(__FILE__, __LINE__); - } - - // - // Search for existing connections again if we waited above, - // as new connections might have been added in the meantime. - // - if(searchAgain) - { - for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r) - { - pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, - multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(*q); - - while(pr.first != pr.second) - { + { + if(_instance->defaultsAndOverrides()->overrideCompress) + { + compress = _instance->defaultsAndOverrides()->overrideCompressValue; + } + else + { + compress = (*r)->compress(); + } + + return pr.first->second; + } + + ++pr.first; + } + } + + // + // If some other thread is currently trying to establish a + // connection to any of our endpoints, we wait until this + // thread is finished. + // + bool searchAgain = false; + while(!_destroyed) + { + for(q = endpoints.begin(); q != endpoints.end(); ++q) + { + if(_pending.find(*q) != _pending.end()) + { + break; + } + } + + if(q == endpoints.end()) + { + break; + } + + searchAgain = true; + + wait(); + } + + if(_destroyed) + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + + // + // Search for existing connections again if we waited above, + // as new connections might have been added in the meantime. + // + if(searchAgain) + { + for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r) + { + pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, + multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(*q); + + while(pr.first != pr.second) + { // // Don't return connections for which destruction has // been initiated. The connection must also match the @@ -232,31 +232,31 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // if(!pr.first->second->isDestroyed() && pr.first->second->threadPerConnection() == threadPerConnection) - { - if(_instance->defaultsAndOverrides()->overrideCompress) - { - compress = _instance->defaultsAndOverrides()->overrideCompressValue; - } - else - { - compress = (*r)->compress(); - } - - return pr.first->second; - } - - ++pr.first; - } - } - } - - // - // No connection to any of our endpoints exists yet, so we - // will try to create one. To avoid that other threads try to - // create connections to the same endpoints, we add our - // endpoints to _pending. - // - _pending.insert(endpoints.begin(), endpoints.end()); + { + if(_instance->defaultsAndOverrides()->overrideCompress) + { + compress = _instance->defaultsAndOverrides()->overrideCompressValue; + } + else + { + compress = (*r)->compress(); + } + + return pr.first->second; + } + + ++pr.first; + } + } + } + + // + // No connection to any of our endpoints exists yet, so we + // will try to create one. To avoid that other threads try to + // create connections to the same endpoints, we add our + // endpoints to _pending. + // + _pending.insert(endpoints.begin(), endpoints.end()); } ConnectionIPtr connection; @@ -266,112 +266,112 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt vector<EndpointIPtr>::const_iterator r; for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r) { - EndpointIPtr endpoint = *q; - - try - { - TransceiverPtr transceiver = endpoint->clientTransceiver(); - if(!transceiver) - { - ConnectorPtr connector = endpoint->connector(); - assert(connector); - - Int timeout; - if(_instance->defaultsAndOverrides()->overrideConnectTimeout) - { - timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue; - } - // It is not necessary to check for overrideTimeout, - // the endpoint has already been modified with this - // override, if set. - else - { - timeout = endpoint->timeout(); - } - - transceiver = connector->connect(timeout); - assert(transceiver); - } - connection = new ConnectionI(_instance, transceiver, endpoint, 0, threadPerConnection, + EndpointIPtr endpoint = *q; + + try + { + TransceiverPtr transceiver = endpoint->clientTransceiver(); + if(!transceiver) + { + ConnectorPtr connector = endpoint->connector(); + assert(connector); + + Int timeout; + if(_instance->defaultsAndOverrides()->overrideConnectTimeout) + { + timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue; + } + // It is not necessary to check for overrideTimeout, + // the endpoint has already been modified with this + // override, if set. + else + { + timeout = endpoint->timeout(); + } + + transceiver = connector->connect(timeout); + assert(transceiver); + } + connection = new ConnectionI(_instance, transceiver, endpoint, 0, threadPerConnection, _instance->threadPerConnectionStackSize()); - connection->validate(); - - if(_instance->defaultsAndOverrides()->overrideCompress) - { - compress = _instance->defaultsAndOverrides()->overrideCompressValue; - } - else - { - compress = (*r)->compress(); - } - break; - } - catch(const LocalException& ex) - { - exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); - - // - // If a connection object was constructed, then validate() - // must have raised the exception. - // - if(connection) - { - connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup. - connection = 0; - } - } - - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->retry >= 2) - { - Trace out(_instance->initializationData().logger, traceLevels->retryCat); - - out << "connection to endpoint failed"; - if(moreEndpts || q + 1 != endpoints.end()) - { - out << ", trying next endpoint\n"; - } - else - { - out << " and no more endpoints to try\n"; - } - out << *exception.get(); - } + connection->validate(); + + if(_instance->defaultsAndOverrides()->overrideCompress) + { + compress = _instance->defaultsAndOverrides()->overrideCompressValue; + } + else + { + compress = (*r)->compress(); + } + break; + } + catch(const LocalException& ex) + { + exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); + + // + // If a connection object was constructed, then validate() + // must have raised the exception. + // + if(connection) + { + connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup. + connection = 0; + } + } + + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->retry >= 2) + { + Trace out(_instance->initializationData().logger, traceLevels->retryCat); + + out << "connection to endpoint failed"; + if(moreEndpts || q + 1 != endpoints.end()) + { + out << ", trying next endpoint\n"; + } + else + { + out << " and no more endpoints to try\n"; + } + out << *exception.get(); + } } { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // Signal other threads that we are done with trying to - // establish connections to our endpoints. - // - for(q = endpoints.begin(); q != endpoints.end(); ++q) - { - _pending.erase(*q); - } - notifyAll(); - - if(!connection) - { - assert(exception.get()); - exception->ice_throw(); - } - else - { - _connections.insert(_connections.end(), - pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint(), connection)); - - if(_destroyed) - { - connection->destroy(ConnectionI::CommunicatorDestroyed); - throw CommunicatorDestroyedException(__FILE__, __LINE__); - } - else - { - connection->activate(); - } - } + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // Signal other threads that we are done with trying to + // establish connections to our endpoints. + // + for(q = endpoints.begin(); q != endpoints.end(); ++q) + { + _pending.erase(*q); + } + notifyAll(); + + if(!connection) + { + assert(exception.get()); + exception->ice_throw(); + } + else + { + _connections.insert(_connections.end(), + pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint(), connection)); + + if(_destroyed) + { + connection->destroy(ConnectionI::CommunicatorDestroyed); + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + else + { + connection->activate(); + } + } } assert(connection); @@ -385,7 +385,7 @@ IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& route if(_destroyed) { - throw CommunicatorDestroyedException(__FILE__, __LINE__); + throw CommunicatorDestroyedException(__FILE__, __LINE__); } assert(routerInfo); @@ -401,44 +401,44 @@ IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& route vector<EndpointIPtr>::const_iterator p; for(p = endpoints.begin(); p != endpoints.end(); ++p) { - EndpointIPtr endpoint = *p; - - // - // Modify endpoints with overrides. - // - if(_instance->defaultsAndOverrides()->overrideTimeout) - { - endpoint = endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); - } - - // - // The Connection object does not take the compression flag of - // endpoints into account, but instead gets the information - // about whether messages should be compressed or not from - // other sources. In order to allow connection sharing for - // endpoints that differ in the value of the compression flag - // only, we always set the compression flag to false here in - // this connection factory. - // - endpoint = endpoint->compress(false); - - pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, - multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(endpoint); - - while(pr.first != pr.second) - { - try - { - pr.first->second->setAdapter(adapter); - } - catch(const Ice::LocalException&) - { - // - // Ignore, the connection is being closed or closed. - // - } - ++pr.first; - } + EndpointIPtr endpoint = *p; + + // + // Modify endpoints with overrides. + // + if(_instance->defaultsAndOverrides()->overrideTimeout) + { + endpoint = endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); + } + + // + // The Connection object does not take the compression flag of + // endpoints into account, but instead gets the information + // about whether messages should be compressed or not from + // other sources. In order to allow connection sharing for + // endpoints that differ in the value of the compression flag + // only, we always set the compression flag to false here in + // this connection factory. + // + endpoint = endpoint->compress(false); + + pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, + multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(endpoint); + + while(pr.first != pr.second) + { + try + { + pr.first->second->setAdapter(adapter); + } + catch(const Ice::LocalException&) + { + // + // Ignore, the connection is being closed or closed. + // + } + ++pr.first; + } } } @@ -449,24 +449,24 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad if(_destroyed) { - return; + return; } for(multimap<EndpointIPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p) { - if(p->second->getAdapter() == adapter) - { - try - { - p->second->setAdapter(0); - } - catch(const Ice::LocalException&) - { - // - // Ignore, the connection is being closed or closed. - // - } - } + if(p->second->getAdapter() == adapter) + { + try + { + p->second->setAdapter(0); + } + catch(const Ice::LocalException&) + { + // + // Ignore, the connection is being closed or closed. + // + } + } } } @@ -476,26 +476,26 @@ IceInternal::OutgoingConnectionFactory::flushBatchRequests() list<ConnectionIPtr> c; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - for(std::multimap<EndpointIPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); - p != _connections.end(); - ++p) - { - c.push_back(p->second); - } + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + for(std::multimap<EndpointIPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); + p != _connections.end(); + ++p) + { + c.push_back(p->second); + } } for(list<ConnectionIPtr>::const_iterator p = c.begin(); p != c.end(); ++p) { - try - { - (*p)->flushBatchRequests(); - } - catch(const LocalException&) - { - // Ignore. - } + try + { + (*p)->flushBatchRequests(); + } + catch(const LocalException&) + { + // Ignore. + } } } @@ -538,22 +538,22 @@ IceInternal::IncomingConnectionFactory::waitUntilHolding() const list<ConnectionIPtr> connections; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // First we wait until the connection factory itself is in holding - // state. - // - while(_state < StateHolding) - { - wait(); - } - - // - // We want to wait until all connections are in holding state - // outside the thread synchronization. - // - connections = _connections; + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // First we wait until the connection factory itself is in holding + // state. + // + while(_state < StateHolding) + { + wait(); + } + + // + // We want to wait until all connections are in holding state + // outside the thread synchronization. + // + connections = _connections; } // @@ -569,35 +569,35 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished() list<ConnectionIPtr> connections; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // First we wait until the factory is destroyed. If we are using - // an acceptor, we also wait for it to be closed. - // - while(_state != StateClosed || _acceptor) - { - wait(); - } - - threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory; - _threadPerIncomingConnectionFactory = 0; - - // - // Clear the OA. See bug 1673 for the details of why this is necessary. - // - _adapter = 0; - - // - // We want to wait until all connections are finished outside the - // thread synchronization. - // - connections.swap(_connections); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // First we wait until the factory is destroyed. If we are using + // an acceptor, we also wait for it to be closed. + // + while(_state != StateClosed || _acceptor) + { + wait(); + } + + threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory; + _threadPerIncomingConnectionFactory = 0; + + // + // Clear the OA. See bug 1673 for the details of why this is necessary. + // + _adapter = 0; + + // + // We want to wait until all connections are finished outside the + // thread synchronization. + // + connections.swap(_connections); } if(threadPerIncomingConnectionFactory) { - threadPerIncomingConnectionFactory->getThreadControl().join(); + threadPerIncomingConnectionFactory->getThreadControl().join(); } for_each(connections.begin(), connections.end(), Ice::voidMemFun(&ConnectionI::waitUntilFinished)); @@ -615,7 +615,7 @@ IceInternal::IncomingConnectionFactory::equivalent(const EndpointIPtr& endp) con { if(_transceiver) { - return endp->equivalent(_transceiver); + return endp->equivalent(_transceiver); } assert(_acceptor); @@ -633,7 +633,7 @@ IceInternal::IncomingConnectionFactory::connections() const // Only copy connections which have not been destroyed. // remove_copy_if(_connections.begin(), _connections.end(), back_inserter(result), - Ice::constMemFun(&ConnectionI::isDestroyed)); + Ice::constMemFun(&ConnectionI::isDestroyed)); return result; } @@ -645,14 +645,14 @@ IceInternal::IncomingConnectionFactory::flushBatchRequests() for(list<ConnectionIPtr>::const_iterator p = c.begin(); p != c.end(); ++p) { - try - { - (*p)->flushBatchRequests(); - } - catch(const LocalException&) - { - // Ignore. - } + try + { + (*p)->flushBatchRequests(); + } + catch(const LocalException&) + { + // Ignore. + } } } @@ -682,13 +682,13 @@ class PromoteFollower public: PromoteFollower(const ThreadPoolPtr& threadPool) : - _threadPool(threadPool) + _threadPool(threadPool) { } ~PromoteFollower() { - _threadPool->promoteFollower(); + _threadPool->promoteFollower(); } private: @@ -704,73 +704,73 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt ConnectionIPtr connection; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // This makes sure that we promote a follower before we leave - // the scope of the mutex above, but after we call accept() - // (if we call it). - // - // If _threadPool is null, then this class doesn't do - // anything. - // - PromoteFollower promote(threadPool); - - if(_state != StateActive) - { - IceUtil::ThreadControl::yield(); - return; - } - - // - // Reap connections for which destruction has completed. - // - _connections.erase(remove_if(_connections.begin(), _connections.end(), - Ice::constMemFun(&ConnectionI::isFinished)), - _connections.end()); - - // - // Now accept a new connection. - // - TransceiverPtr transceiver; - try - { - transceiver = _acceptor->accept(0); - } - catch(const SocketException&) - { - // Ignore socket exceptions. - return; - } - catch(const TimeoutException&) - { - // Ignore timeouts. - return; - } - catch(const LocalException& ex) - { - // Warn about other Ice local exceptions. - if(_warn) - { - Warning out(_instance->initializationData().logger); - out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); - } - return; - } - - assert(transceiver); - - try - { - connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection, + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // This makes sure that we promote a follower before we leave + // the scope of the mutex above, but after we call accept() + // (if we call it). + // + // If _threadPool is null, then this class doesn't do + // anything. + // + PromoteFollower promote(threadPool); + + if(_state != StateActive) + { + IceUtil::ThreadControl::yield(); + return; + } + + // + // Reap connections for which destruction has completed. + // + _connections.erase(remove_if(_connections.begin(), _connections.end(), + Ice::constMemFun(&ConnectionI::isFinished)), + _connections.end()); + + // + // Now accept a new connection. + // + TransceiverPtr transceiver; + try + { + transceiver = _acceptor->accept(0); + } + catch(const SocketException&) + { + // Ignore socket exceptions. + return; + } + catch(const TimeoutException&) + { + // Ignore timeouts. + return; + } + catch(const LocalException& ex) + { + // Warn about other Ice local exceptions. + if(_warn) + { + Warning out(_instance->initializationData().logger); + out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + } + return; + } + + assert(transceiver); + + try + { + connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection, _threadPerConnectionStackSize); - } - catch(const LocalException&) - { - return; - } + } + catch(const LocalException&) + { + return; + } - _connections.push_back(connection); + _connections.push_back(connection); } assert(connection); @@ -781,14 +781,14 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt // try { - connection->validate(); + connection->validate(); } catch(const LocalException&) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup. - _connections.remove(connection); - return; + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup. + _connections.remove(connection); + return; } connection->activate(); @@ -808,10 +808,10 @@ IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool if(_finishedCount == 0 && _state == StateClosed) { - dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->decFdsInUse(); - _acceptor->close(); - _acceptor = 0; - notifyAll(); + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->decFdsInUse(); + _acceptor->close(); + _acceptor = 0; + notifyAll(); } } @@ -828,7 +828,7 @@ IceInternal::IncomingConnectionFactory::toString() const if(_transceiver) { - return _transceiver->toString(); + return _transceiver->toString(); } assert(_acceptor); @@ -836,9 +836,9 @@ IceInternal::IncomingConnectionFactory::toString() const } IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const InstancePtr& instance, - const EndpointIPtr& endpoint, - const ObjectAdapterPtr& adapter, - const string& adapterName) : + const EndpointIPtr& endpoint, + const ObjectAdapterPtr& adapter, + const string& adapterName) : EventHandler(instance), _endpoint(endpoint), _adapter(adapter), @@ -849,14 +849,14 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance { if(_instance->defaultsAndOverrides()->overrideTimeout) { - const_cast<EndpointIPtr&>(_endpoint) = - _endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); + const_cast<EndpointIPtr&>(_endpoint) = + _endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); } if(_instance->defaultsAndOverrides()->overrideCompress) { - const_cast<EndpointIPtr&>(_endpoint) = - _endpoint->compress(_instance->defaultsAndOverrides()->overrideCompressValue); + const_cast<EndpointIPtr&>(_endpoint) = + _endpoint->compress(_instance->defaultsAndOverrides()->overrideCompressValue); } ObjectAdapterI* adapterImpl = dynamic_cast<ObjectAdapterI*>(_adapter.get()); @@ -866,75 +866,75 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance const_cast<TransceiverPtr&>(_transceiver) = _endpoint->serverTransceiver(const_cast<EndpointIPtr&>(_endpoint)); if(_transceiver) { - ConnectionIPtr connection; + ConnectionIPtr connection; - try - { - connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter, _threadPerConnection, + try + { + connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter, _threadPerConnection, _threadPerConnectionStackSize); - connection->validate(); - } - catch(const LocalException&) - { - // - // If a connection object was constructed, then validate() - // must have raised the exception. - // - if(connection) - { - connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup. - } - - return; - } - - _connections.push_back(connection); + connection->validate(); + } + catch(const LocalException&) + { + // + // If a connection object was constructed, then validate() + // must have raised the exception. + // + if(connection) + { + connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup. + } + + return; + } + + _connections.push_back(connection); } else { - _acceptor = _endpoint->acceptor(const_cast<EndpointIPtr&>(_endpoint), adapterName); - assert(_acceptor); - _acceptor->listen(); - - __setNoDelete(true); - try - { - if(_threadPerConnection) - { - // - // If we are in thread per connection mode, we also use - // one thread per incoming connection factory, that - // accepts new connections on this endpoint. - // - _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this); - _threadPerIncomingConnectionFactory->start(_threadPerConnectionStackSize); - } - else - { - adapterImpl->getThreadPool()->incFdsInUse(); - } - } - catch(const IceUtil::Exception& ex) - { - if(_threadPerConnection) - { - Error out(_instance->initializationData().logger); - out << "cannot create thread for incoming connection factory:\n" << ex; - } - - try - { - _acceptor->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - - __setNoDelete(false); - ex.ice_throw(); - } - __setNoDelete(false); + _acceptor = _endpoint->acceptor(const_cast<EndpointIPtr&>(_endpoint), adapterName); + assert(_acceptor); + _acceptor->listen(); + + __setNoDelete(true); + try + { + if(_threadPerConnection) + { + // + // If we are in thread per connection mode, we also use + // one thread per incoming connection factory, that + // accepts new connections on this endpoint. + // + _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this); + _threadPerIncomingConnectionFactory->start(_threadPerConnectionStackSize); + } + else + { + adapterImpl->getThreadPool()->incFdsInUse(); + } + } + catch(const IceUtil::Exception& ex) + { + if(_threadPerConnection) + { + Error out(_instance->initializationData().logger); + out << "cannot create thread for incoming connection factory:\n" << ex; + } + + try + { + _acceptor->close(); + } + catch(const LocalException&) + { + // Here we ignore any exceptions in close(). + } + + __setNoDelete(false); + ex.ice_throw(); + } + __setNoDelete(false); } } @@ -951,74 +951,74 @@ IceInternal::IncomingConnectionFactory::setState(State state) { if(_state == state) // Don't switch twice. { - return; + return; } switch(state) { - case StateActive: - { - if(_state != StateHolding) // Can only switch from holding to active. - { - return; - } - if(!_threadPerConnection && _acceptor) - { - registerWithPool(); - } - for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::activate)); - break; - } - - case StateHolding: - { - if(_state != StateActive) // Can only switch from active to holding. - { - return; - } - if(!_threadPerConnection && _acceptor) - { - unregisterWithPool(); - } - for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::hold)); - break; - } - - case StateClosed: - { - if(_acceptor) - { - if(_threadPerConnection) - { - // - // If we are in thread per connection mode, we connect - // to our own acceptor, which unblocks our thread per - // incoming connection factory stuck in accept(). - // - _acceptor->connectToSelf(); - } - else - { - // - // Otherwise we first must make sure that we are - // registered, then we unregister, and let finished() - // do the close. - // - registerWithPool(); - unregisterWithPool(); - } - } + case StateActive: + { + if(_state != StateHolding) // Can only switch from holding to active. + { + return; + } + if(!_threadPerConnection && _acceptor) + { + registerWithPool(); + } + for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::activate)); + break; + } + + case StateHolding: + { + if(_state != StateActive) // Can only switch from active to holding. + { + return; + } + if(!_threadPerConnection && _acceptor) + { + unregisterWithPool(); + } + for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::hold)); + break; + } + + case StateClosed: + { + if(_acceptor) + { + if(_threadPerConnection) + { + // + // If we are in thread per connection mode, we connect + // to our own acceptor, which unblocks our thread per + // incoming connection factory stuck in accept(). + // + _acceptor->connectToSelf(); + } + else + { + // + // Otherwise we first must make sure that we are + // registered, then we unregister, and let finished() + // do the close. + // + registerWithPool(); + unregisterWithPool(); + } + } #ifdef _STLP_BEGIN_NAMESPACE - // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h - for_each(_connections.begin(), _connections.end(), - voidbind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated)); + // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h + for_each(_connections.begin(), _connections.end(), + voidbind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated)); #else - for_each(_connections.begin(), _connections.end(), - bind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated)); + for_each(_connections.begin(), _connections.end(), + bind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated)); #endif - break; - } + break; + } } _state = state; @@ -1033,8 +1033,8 @@ IceInternal::IncomingConnectionFactory::registerWithPool() if(!_registeredWithPool) { - dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->_register(_acceptor->fd(), this); - _registeredWithPool = true; + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->_register(_acceptor->fd(), this); + _registeredWithPool = true; } } @@ -1046,9 +1046,9 @@ IceInternal::IncomingConnectionFactory::unregisterWithPool() if(_registeredWithPool) { - dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->unregister(_acceptor->fd()); - _registeredWithPool = false; - ++_finishedCount; // For each unregistration, finished() is called once. + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->unregister(_acceptor->fd()); + _registeredWithPool = false; + ++_finishedCount; // For each unregistration, finished() is called once. } } @@ -1059,111 +1059,111 @@ IceInternal::IncomingConnectionFactory::run() while(true) { - // - // We must accept new connections outside the thread - // synchronization, because we use blocking accept. - // - TransceiverPtr transceiver; - try - { - transceiver = _acceptor->accept(-1); - } - catch(const SocketException&) - { - // Ignore socket exceptions. - } - catch(const TimeoutException&) - { - // Ignore timeouts. - } - catch(const LocalException& ex) - { - // Warn about other Ice local exceptions. - if(_warn) - { - Warning out(_instance->initializationData().logger); - out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); - } - } - - ConnectionIPtr connection; - - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - while(_state == StateHolding) - { - wait(); - } - - if(_state == StateClosed) - { - if(transceiver) - { - try - { - transceiver->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - } - - try - { - _acceptor->close(); - } - catch(const LocalException& ex) - { - _acceptor = 0; - notifyAll(); - ex.ice_throw(); - } - - _acceptor = 0; - notifyAll(); - return; - } - - assert(_state == StateActive); - - // - // Reap connections for which destruction has completed. - // - _connections.erase(remove_if(_connections.begin(), _connections.end(), - Ice::constMemFun(&ConnectionI::isFinished)), - _connections.end()); - - // - // Create a connection object for the connection. - // - if(transceiver) - { - try - { - connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection, + // + // We must accept new connections outside the thread + // synchronization, because we use blocking accept. + // + TransceiverPtr transceiver; + try + { + transceiver = _acceptor->accept(-1); + } + catch(const SocketException&) + { + // Ignore socket exceptions. + } + catch(const TimeoutException&) + { + // Ignore timeouts. + } + catch(const LocalException& ex) + { + // Warn about other Ice local exceptions. + if(_warn) + { + Warning out(_instance->initializationData().logger); + out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + } + } + + ConnectionIPtr connection; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + while(_state == StateHolding) + { + wait(); + } + + if(_state == StateClosed) + { + if(transceiver) + { + try + { + transceiver->close(); + } + catch(const LocalException&) + { + // Here we ignore any exceptions in close(). + } + } + + try + { + _acceptor->close(); + } + catch(const LocalException& ex) + { + _acceptor = 0; + notifyAll(); + ex.ice_throw(); + } + + _acceptor = 0; + notifyAll(); + return; + } + + assert(_state == StateActive); + + // + // Reap connections for which destruction has completed. + // + _connections.erase(remove_if(_connections.begin(), _connections.end(), + Ice::constMemFun(&ConnectionI::isFinished)), + _connections.end()); + + // + // Create a connection object for the connection. + // + if(transceiver) + { + try + { + connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection, _threadPerConnectionStackSize); - } - catch(const LocalException&) - { - return; - } - - _connections.push_back(connection); - } - } - - // - // In thread per connection mode, the connection's thread will - // take care of connection validation and activation (for - // non-datagram connections). We don't want to block this - // thread waiting until validation is complete, because in - // contrast to thread pool mode, it is the only thread that - // can accept connections with this factory's - // acceptor. Therefore we don't call validate() and activate() - // from the connection factory in thread per connection mode. - // + } + catch(const LocalException&) + { + return; + } + + _connections.push_back(connection); + } + } + + // + // In thread per connection mode, the connection's thread will + // take care of connection validation and activation (for + // non-datagram connections). We don't want to block this + // thread waiting until validation is complete, because in + // contrast to thread pool mode, it is the only thread that + // can accept connections with this factory's + // acceptor. Therefore we don't call validate() and activate() + // from the connection factory in thread per connection mode. + // } } @@ -1178,22 +1178,22 @@ IceInternal::IncomingConnectionFactory::ThreadPerIncomingConnectionFactory::run( { try { - _factory->run(); + _factory->run(); } catch(const Exception& ex) - { - Error out(_factory->_instance->initializationData().logger); - out << "exception in thread per incoming connection factory:\n" << _factory->toString() << ex; + { + Error out(_factory->_instance->initializationData().logger); + out << "exception in thread per incoming connection factory:\n" << _factory->toString() << ex; } catch(const std::exception& ex) { - Error out(_factory->_instance->initializationData().logger); - out << "std::exception in thread per incoming connection factory:\n" << _factory->toString() << ex.what(); + Error out(_factory->_instance->initializationData().logger); + out << "std::exception in thread per incoming connection factory:\n" << _factory->toString() << ex.what(); } catch(...) { - Error out(_factory->_instance->initializationData().logger); - out << "unknown exception in thread per incoming connection factory:\n" << _factory->toString(); + Error out(_factory->_instance->initializationData().logger); + out << "unknown exception in thread per incoming connection factory:\n" << _factory->toString(); } _factory = 0; // Resolve cyclic dependency. |