diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 49 | ||||
-rw-r--r-- | cpp/src/Ice/Connection.h | 1 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 317 |
3 files changed, 211 insertions, 156 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 7396104b38d..d2590e86ef8 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -160,6 +160,16 @@ IceInternal::Connection::validate() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + if(_exception.get()) + { + _exception->ice_throw(); + } + + if(_state != StateNotValidated) + { + return; + } + if(!_endpoint->datagram()) // Datagram connections are always implicitly validated. { try @@ -246,6 +256,11 @@ IceInternal::Connection::validate() _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); } } + + // + // We start out in holding state. + // + setState(StateHolding); } void @@ -291,7 +306,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool compress) { _exception->ice_throw(); } - assert(_state < StateClosing); + assert(_state > StateNotValidated && _state < StateClosing); Int requestId; @@ -397,7 +412,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool comp { _exception->ice_throw(); } - assert(_state < StateClosing); + assert(_state > StateNotValidated && _state < StateClosing); Int requestId; @@ -497,7 +512,7 @@ IceInternal::Connection::prepareBatchRequest(BasicStream* os) unlock(); _exception->ice_throw(); } - assert(_state < StateClosing); + assert(_state > StateNotValidated && _state < StateClosing); if(_batchStream.b.empty()) { @@ -529,7 +544,7 @@ IceInternal::Connection::finishBatchRequest(BasicStream* os) unlock(); _exception->ice_throw(); } - assert(_state < StateClosing); + assert(_state > StateNotValidated && _state < StateClosing); _batchStream.swap(*os); // Get the batch stream back. ++_batchRequestNum; // Increment the number of requests in the batch. @@ -557,7 +572,7 @@ IceInternal::Connection::flushBatchRequest(bool compress) { _exception->ice_throw(); } - assert(_state < StateClosing); + assert(_state > StateNotValidated && _state < StateClosing); try { @@ -1227,7 +1242,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance, _batchRequestNum(0), _dispatchCount(0), _proxyCount(0), - _state(StateHolding) + _state(StateNotValidated) { vector<Byte>& requestHdr = const_cast<vector<Byte>&>(_requestHdr); requestHdr[0] = protocolVersion; @@ -1318,9 +1333,18 @@ IceInternal::Connection::setState(State state) switch(state) { + case StateNotValidated: + { + assert(false); + break; + } + case StateActive: { - if(_state != StateHolding) // Can only switch from holding to active. + // + // Can only switch from holding to active. + // + if(_state != StateHolding) { return; } @@ -1330,7 +1354,11 @@ IceInternal::Connection::setState(State state) case StateHolding: { - if(_state != StateActive) // Can only switch from active to holding. + // + // Can only switch from active or not validated to + // holding. + // + if(_state != StateActive && _state != StateNotValidated) { return; } @@ -1340,7 +1368,10 @@ IceInternal::Connection::setState(State state) case StateClosing: { - if(_state == StateClosed) // Can't change back from closed. + // + // Can't change back from closed. + // + if(_state == StateClosed) { return; } diff --git a/cpp/src/Ice/Connection.h b/cpp/src/Ice/Connection.h index 8454538953a..ce1c135872f 100644 --- a/cpp/src/Ice/Connection.h +++ b/cpp/src/Ice/Connection.h @@ -113,6 +113,7 @@ private: enum State { + StateNotValidated, StateActive, StateHolding, StateClosing, diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index d9ea5f2849f..7088fe259a4 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -94,119 +94,132 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() ConnectionPtr IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpoints) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + ConnectionPtr connection; - if(!_instance) { - throw CommunicatorDestroyedException(__FILE__, __LINE__); - } - - assert(!endpoints.empty()); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // Reap connections for which destruction has completed. - // - std::map<EndpointPtr, ConnectionPtr>::iterator p = _connections.begin(); - while(p != _connections.end()) - { - if(p->second->isFinished()) - { - _connections.erase(p++); - } - else + if(!_instance) { - ++p; + throw CommunicatorDestroyedException(__FILE__, __LINE__); } - } - // - // Search for existing connections. - // - DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); - vector<EndpointPtr>::const_iterator q; - for(q = endpoints.begin(); q != endpoints.end(); ++q) - { - EndpointPtr endpoint = *q; - if(defaultsAndOverrides->overrideTimeout) - { - endpoint = endpoint->timeout(defaultsAndOverrides->overrideTimeoutValue); - } + assert(!endpoints.empty()); - map<EndpointPtr, ConnectionPtr>::const_iterator r = _connections.find(endpoint); - if(r != _connections.end()) + // + // Reap connections for which destruction has completed. + // + std::map<EndpointPtr, ConnectionPtr>::iterator p = _connections.begin(); + while(p != _connections.end()) { - // - // Don't return connections for which destruction has been - // initiated. - // - if(!r->second->isDestroyed()) + if(p->second->isFinished()) { - return r->second; + _connections.erase(p++); + } + else + { + ++p; } } - } - // - // No connections exist, try to create one. - // - TraceLevelsPtr traceLevels = _instance->traceLevels(); - LoggerPtr logger = _instance->logger(); - - ConnectionPtr connection; - auto_ptr<LocalException> exception; - q = endpoints.begin(); - while(q != endpoints.end()) - { - EndpointPtr endpoint = *q; - if(defaultsAndOverrides->overrideTimeout) + // + // Search for existing connections. + // + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + vector<EndpointPtr>::const_iterator q; + for(q = endpoints.begin(); q != endpoints.end(); ++q) { - endpoint = endpoint->timeout(defaultsAndOverrides->overrideTimeoutValue); - } + EndpointPtr endpoint = *q; + if(defaultsAndOverrides->overrideTimeout) + { + endpoint = endpoint->timeout(defaultsAndOverrides->overrideTimeoutValue); + } - try - { - TransceiverPtr transceiver = endpoint->clientTransceiver(); - if(!transceiver) + map<EndpointPtr, ConnectionPtr>::const_iterator r = _connections.find(endpoint); + if(r != _connections.end()) { - ConnectorPtr connector = endpoint->connector(); - assert(connector); - transceiver = connector->connect(endpoint->timeout()); - assert(transceiver); - } - connection = new Connection(_instance, transceiver, endpoint, 0); - connection->validate(); - connection->activate(); - _connections.insert(make_pair(endpoint, connection)); - break; - } - catch(const LocalException& ex) - { - exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); + // + // Don't return connections for which destruction has been + // initiated. + // + if(!r->second->isDestroyed()) + { + connection = r->second; + break; + } + } } - ++q; - - if(traceLevels->retry >= 2) + // + // No connections exist, try to create one. + // + if(!connection) { - Trace out(logger, traceLevels->retryCat); - out << "connection to endpoint failed"; - if(q != endpoints.end()) + TraceLevelsPtr traceLevels = _instance->traceLevels(); + LoggerPtr logger = _instance->logger(); + + auto_ptr<LocalException> exception; + q = endpoints.begin(); + while(q != endpoints.end()) { - out << ", trying next endpoint\n"; + EndpointPtr endpoint = *q; + if(defaultsAndOverrides->overrideTimeout) + { + endpoint = endpoint->timeout(defaultsAndOverrides->overrideTimeoutValue); + } + + try + { + TransceiverPtr transceiver = endpoint->clientTransceiver(); + if(!transceiver) + { + ConnectorPtr connector = endpoint->connector(); + assert(connector); + transceiver = connector->connect(endpoint->timeout()); + assert(transceiver); + } + connection = new Connection(_instance, transceiver, endpoint, 0); + _connections.insert(make_pair(endpoint, connection)); + break; + } + catch(const LocalException& ex) + { + exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); + } + + ++q; + + if(traceLevels->retry >= 2) + { + Trace out(logger, traceLevels->retryCat); + out << "connection to endpoint failed"; + if(q != endpoints.end()) + { + out << ", trying next endpoint\n"; + } + else + { + out << " and no more endpoints to try\n"; + } + out << *exception.get(); + } } - else + + if(!connection) { - out << " and no more endpoints to try\n"; + assert(exception.get()); + exception->ice_throw(); } - out << *exception.get(); } } - if(!connection) - { - assert(exception.get()); - exception->ice_throw(); - } + // + // We validate and activate outside the thread synchronization, to + // not block the factory. + // + assert(connection); + connection->validate(); + connection->activate(); return connection; } @@ -397,80 +410,90 @@ IceInternal::IncomingConnectionFactory::read(BasicStream&) void IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPtr& threadPool) { - ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this); - - if(_state != StateActive) - { - IceUtil::ThreadControl::yield(); - threadPool->promoteFollower(); - return; - } - - // - // Reap connections for which destruction has completed. - // - _connections.erase(remove_if(_connections.begin(), _connections.end(), - ::Ice::constMemFun(&Connection::isFinished)), - _connections.end()); + ConnectionPtr connection; - // - // Now accept a new connection. - // - TransceiverPtr transceiver; - try - { - transceiver = _acceptor->accept(0); - } - catch(const SocketException&) - { - // TODO: bandaid. Takes care of SSL Handshake problems during - // creation of a Transceiver. Ignore, nothing we can do here. - threadPool->promoteFollower(); - return; - } - catch(const TimeoutException&) - { - // Ignore timeouts. - threadPool->promoteFollower(); - return; - } - catch(const LocalException& ex) { - // Warn about other Ice local exceptions. - if(_warn) + ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this); + + if(_state != StateActive) { - Warning out(_instance->logger()); - out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + IceUtil::ThreadControl::yield(); + threadPool->promoteFollower(); + return; } + + // + // Reap connections for which destruction has completed. + // + _connections.erase(remove_if(_connections.begin(), _connections.end(), + ::Ice::constMemFun(&Connection::isFinished)), + _connections.end()); + + // + // Now accept a new connection. + // + TransceiverPtr transceiver; + try + { + transceiver = _acceptor->accept(0); + } + catch(const SocketException&) + { + // TODO: bandaid. Takes care of SSL Handshake problems during + // creation of a Transceiver. Ignore, nothing we can do here. + threadPool->promoteFollower(); + return; + } + catch(const TimeoutException&) + { + // Ignore timeouts. + threadPool->promoteFollower(); + return; + } + catch(const LocalException& ex) + { + // Warn about other Ice local exceptions. + if(_warn) + { + Warning out(_instance->logger()); + out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + } + threadPool->promoteFollower(); + return; + } + catch(...) + { + threadPool->promoteFollower(); + throw; + } + + // + // We must promote a follower after we accepted a new connection. + // threadPool->promoteFollower(); - return; - } - catch(...) - { - threadPool->promoteFollower(); - throw; + + // + // Create a connection object for the connection. + // + assert(transceiver); + connection = new Connection(_instance, transceiver, _endpoint, _adapter); + _connections.push_back(connection); } - - // - // We must promote a follower after we accepted a new connection. - // - threadPool->promoteFollower(); - + // - // Create and activate a connection object for the connection. + // We validate and activate outside the thread synchronization, to + // not block the factory. // try { - assert(transceiver); - ConnectionPtr connection = new Connection(_instance, transceiver, _endpoint, _adapter); + assert(connection); connection->validate(); - connection->activate(); - _connections.push_back(connection); + connection->activate(); // The factory must be active at this point, so we activate the connection, too. } catch(const LocalException&) { // - // Ignore all exceptions while creating or activating the + // Ignore all exceptions while activating or validating the // connection object. Warning or error messages for such // exceptions must be printed directly in the connection // object code. |