diff options
Diffstat (limited to 'cppe/src/IceE/IncomingConnectionFactory.cpp')
-rw-r--r-- | cppe/src/IceE/IncomingConnectionFactory.cpp | 452 |
1 files changed, 226 insertions, 226 deletions
diff --git a/cppe/src/IceE/IncomingConnectionFactory.cpp b/cppe/src/IceE/IncomingConnectionFactory.cpp index 8d124ad232c..d3c9cee90e3 100644 --- a/cppe/src/IceE/IncomingConnectionFactory.cpp +++ b/cppe/src/IceE/IncomingConnectionFactory.cpp @@ -53,22 +53,22 @@ IceInternal::IncomingConnectionFactory::waitUntilHolding() const list<ConnectionPtr> connections; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // First we wait until the connection factory itself is in holding - // state. - // - while(_state < StateHolding) - { - wait(); - } - - // - // We want to wait until all connections are in holding state - // outside the thread synchronization. - // - connections = _connections; + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // First we wait until the connection factory itself is in holding + // state. + // + while(_state < StateHolding) + { + wait(); + } + + // + // We want to wait until all connections are in holding state + // outside the thread synchronization. + // + connections = _connections; } // @@ -84,30 +84,30 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished() list<ConnectionPtr> connections; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // First we wait until the factory is destroyed. If we are using - // an acceptor, we also wait for it to be closed. - // - while(_state != StateClosed || _acceptor) - { - wait(); - } - - threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory; - _threadPerIncomingConnectionFactory = 0; - - // - // We want to wait until all connections are finished outside the - // thread synchronization. - // - connections.swap(_connections); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // First we wait until the factory is destroyed. If we are using + // an acceptor, we also wait for it to be closed. + // + while(_state != StateClosed || _acceptor) + { + wait(); + } + + threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory; + _threadPerIncomingConnectionFactory = 0; + + // + // We want to wait until all connections are finished outside the + // thread synchronization. + // + connections.swap(_connections); } if(threadPerIncomingConnectionFactory) { - threadPerIncomingConnectionFactory->getThreadControl().join(); + threadPerIncomingConnectionFactory->getThreadControl().join(); } for_each(connections.begin(), connections.end(), Ice::voidMemFun(&Connection::waitUntilFinished)); @@ -131,7 +131,7 @@ IceInternal::IncomingConnectionFactory::connections() const // Only copy connections which have not been destroyed. // remove_copy_if(_connections.begin(), _connections.end(), back_inserter(result), - Ice::constMemFun(&Connection::isDestroyed)); + Ice::constMemFun(&Connection::isDestroyed)); return result; } @@ -143,14 +143,14 @@ IceInternal::IncomingConnectionFactory::flushBatchRequests() for(list<ConnectionPtr>::const_iterator p = c.begin(); p != c.end(); ++p) { - try - { - (*p)->flushBatchRequests(); - } - catch(const LocalException&) - { - // Ignore. - } + try + { + (*p)->flushBatchRequests(); + } + catch(const LocalException&) + { + // Ignore. + } } } @@ -161,7 +161,7 @@ IceInternal::IncomingConnectionFactory::toString() const if(_transceiver) { - return _transceiver->toString(); + return _transceiver->toString(); } assert(_acceptor); @@ -169,8 +169,8 @@ IceInternal::IncomingConnectionFactory::toString() const } IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const InstancePtr& instance, - const EndpointPtr& endpoint, - const ObjectAdapterPtr& adapter) : + const EndpointPtr& endpoint, + const ObjectAdapterPtr& adapter) : _instance(instance), _endpoint(endpoint), _adapter(adapter), @@ -179,8 +179,8 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance { if(_instance->defaultsAndOverrides()->overrideTimeout) { - const_cast<EndpointPtr&>(_endpoint) = - _endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); + const_cast<EndpointPtr&>(_endpoint) = + _endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); } _acceptor = _endpoint->acceptor(const_cast<EndpointPtr&>(_endpoint)); @@ -190,36 +190,36 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance __setNoDelete(true); try { - // - // If we are in thread per connection mode, we also use one - // thread per incoming connection factory, that accepts new - // connections on this endpoint. - // - _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this); - _threadPerIncomingConnectionFactory->start(_instance->threadPerConnectionStackSize()); + // + // If we are in thread per connection mode, we also use one + // thread per incoming connection factory, that accepts new + // connections on this endpoint. + // + _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this); + _threadPerIncomingConnectionFactory->start(_instance->threadPerConnectionStackSize()); } catch(const Ice::Exception& ex) { - { - Error out(_instance->initializationData().logger); - out << "cannot create thread for incoming connection factory:\n" << ex.toString(); - } - - try - { - _acceptor->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - - _state = StateClosed; - _acceptor = 0; - _threadPerIncomingConnectionFactory = 0; - - __setNoDelete(false); - ex.ice_throw(); + { + Error out(_instance->initializationData().logger); + out << "cannot create thread for incoming connection factory:\n" << ex.toString(); + } + + try + { + _acceptor->close(); + } + catch(const LocalException&) + { + // Here we ignore any exceptions in close(). + } + + _state = StateClosed; + _acceptor = 0; + _threadPerIncomingConnectionFactory = 0; + + __setNoDelete(false); + ex.ice_throw(); } __setNoDelete(false); } @@ -239,52 +239,52 @@ IceInternal::IncomingConnectionFactory::setState(State state) { if(_state == state) // Don't switch twice. { - return; + return; } switch(state) { - case StateActive: - { - if(_state != StateHolding) // Can only switch from holding to active. - { - return; - } - for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&Connection::activate)); - break; - } - - case StateHolding: - { - if(_state != StateActive) // Can only switch from active to holding. - { - return; - } - for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&Connection::hold)); - break; - } - - case StateClosed: - { - if(_acceptor) - { - // - // Connect to our own acceptor, which unblocks our - // thread per incoming connection factory stuck in accept(). - // - _acceptor->connectToSelf(); - } + case StateActive: + { + if(_state != StateHolding) // Can only switch from holding to active. + { + return; + } + for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&Connection::activate)); + break; + } + + case StateHolding: + { + if(_state != StateActive) // Can only switch from active to holding. + { + return; + } + for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&Connection::hold)); + break; + } + + case StateClosed: + { + if(_acceptor) + { + // + // Connect to our own acceptor, which unblocks our + // thread per incoming connection factory stuck in accept(). + // + _acceptor->connectToSelf(); + } #ifdef _STLP_BEGIN_NAMESPACE - // voidbind2nd is an STLport extension for broken compilers in IceE/Functional.h - for_each(_connections.begin(), _connections.end(), - voidbind2nd(Ice::voidMemFun1(&Connection::destroy), Connection::ObjectAdapterDeactivated)); + // voidbind2nd is an STLport extension for broken compilers in IceE/Functional.h + for_each(_connections.begin(), _connections.end(), + voidbind2nd(Ice::voidMemFun1(&Connection::destroy), Connection::ObjectAdapterDeactivated)); #else - for_each(_connections.begin(), _connections.end(), - bind2nd(Ice::voidMemFun1(&Connection::destroy), Connection::ObjectAdapterDeactivated)); + for_each(_connections.begin(), _connections.end(), + bind2nd(Ice::voidMemFun1(&Connection::destroy), Connection::ObjectAdapterDeactivated)); #endif - break; - } + break; + } } _state = state; @@ -298,109 +298,109 @@ IceInternal::IncomingConnectionFactory::run() while(true) { - // - // We must accept new connections outside the thread - // synchronization, because we use blocking accept. - // - TransceiverPtr transceiver; - try - { - transceiver = _acceptor->accept(); - } - catch(const SocketException&) - { - // Ignore socket exceptions. - } - catch(const TimeoutException&) - { - // Ignore timeouts. - } - catch(const LocalException& ex) - { - // Warn about other Ice local exceptions. - if(_warn) - { - Warning out(_instance->initializationData().logger); - out << "connection exception:\n" << ex.toString() << "\n" << _acceptor->toString(); - } - } - - ConnectionPtr connection; - - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - while(_state == StateHolding) - { - wait(); - } - - if(_state == StateClosed) - { - if(transceiver) - { - try - { - transceiver->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - } - - try - { - _acceptor->close(); - } - catch(const LocalException& ex) - { - _acceptor = 0; - notifyAll(); - ex.ice_throw(); - } - - _acceptor = 0; - notifyAll(); - return; - } - - assert(_state == StateActive); - - // - // Reap connections for which destruction has completed. - // - _connections.erase(remove_if(_connections.begin(), _connections.end(), - Ice::constMemFun(&Connection::isFinished)), - _connections.end()); - - // - // Create a connection object for the connection. - // - if(transceiver) - { - try - { - connection = new Connection(_instance, transceiver, _endpoint, _adapter); - } - catch(const LocalException&) - { - return; - } - - _connections.push_back(connection); - } - } - - // - // In thread per connection mode, the connection's thread will - // take care of connection validation and activation. We don't - // want to block this thread waiting until validation is - // complete because it is the only thread that can accept - // connections with this factory's acceptor. Therefore we - // don't call validate() and activate() from the connection - // factory in thread per connection mode. - // + // + // We must accept new connections outside the thread + // synchronization, because we use blocking accept. + // + TransceiverPtr transceiver; + try + { + transceiver = _acceptor->accept(); + } + catch(const SocketException&) + { + // Ignore socket exceptions. + } + catch(const TimeoutException&) + { + // Ignore timeouts. + } + catch(const LocalException& ex) + { + // Warn about other Ice local exceptions. + if(_warn) + { + Warning out(_instance->initializationData().logger); + out << "connection exception:\n" << ex.toString() << "\n" << _acceptor->toString(); + } + } + + ConnectionPtr connection; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + while(_state == StateHolding) + { + wait(); + } + + if(_state == StateClosed) + { + if(transceiver) + { + try + { + transceiver->close(); + } + catch(const LocalException&) + { + // Here we ignore any exceptions in close(). + } + } + + try + { + _acceptor->close(); + } + catch(const LocalException& ex) + { + _acceptor = 0; + notifyAll(); + ex.ice_throw(); + } + + _acceptor = 0; + notifyAll(); + return; + } + + assert(_state == StateActive); + + // + // Reap connections for which destruction has completed. + // + _connections.erase(remove_if(_connections.begin(), _connections.end(), + Ice::constMemFun(&Connection::isFinished)), + _connections.end()); + + // + // Create a connection object for the connection. + // + if(transceiver) + { + try + { + connection = new Connection(_instance, transceiver, _endpoint, _adapter); + } + catch(const LocalException&) + { + return; + } + + _connections.push_back(connection); + } + } + + // + // In thread per connection mode, the connection's thread will + // take care of connection validation and activation. We don't + // want to block this thread waiting until validation is + // complete because it is the only thread that can accept + // connections with this factory's acceptor. Therefore we + // don't call validate() and activate() from the connection + // factory in thread per connection mode. + // } } @@ -415,22 +415,22 @@ IceInternal::IncomingConnectionFactory::ThreadPerIncomingConnectionFactory::run( { try { - _factory->run(); + _factory->run(); } catch(const Exception& ex) - { - Error out(_factory->_instance->initializationData().logger); - out << "exception in thread per incoming connection factory:\n" << _factory->toString() << ex.toString(); + { + Error out(_factory->_instance->initializationData().logger); + out << "exception in thread per incoming connection factory:\n" << _factory->toString() << ex.toString(); } catch(const std::exception& ex) { - Error out(_factory->_instance->initializationData().logger); - out << "std::exception in thread per incoming connection factory:\n" << _factory->toString() << ex.what(); + Error out(_factory->_instance->initializationData().logger); + out << "std::exception in thread per incoming connection factory:\n" << _factory->toString() << ex.what(); } catch(...) { - Error out(_factory->_instance->initializationData().logger); - out << "unknown exception in thread per incoming connection factory:\n" << _factory->toString(); + Error out(_factory->_instance->initializationData().logger); + out << "unknown exception in thread per incoming connection factory:\n" << _factory->toString(); } _factory = 0; // Resolve cyclic dependency. |