summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/Connection.cpp49
-rw-r--r--cpp/src/Ice/Connection.h1
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp317
3 files changed, 211 insertions, 156 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 7396104b38d..d2590e86ef8 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -160,6 +160,16 @@ IceInternal::Connection::validate()
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+
+ if(_state != StateNotValidated)
+ {
+ return;
+ }
+
if(!_endpoint->datagram()) // Datagram connections are always implicitly validated.
{
try
@@ -246,6 +256,11 @@ IceInternal::Connection::validate()
_acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
}
}
+
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
}
void
@@ -291,7 +306,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool compress)
{
_exception->ice_throw();
}
- assert(_state < StateClosing);
+ assert(_state > StateNotValidated && _state < StateClosing);
Int requestId;
@@ -397,7 +412,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool comp
{
_exception->ice_throw();
}
- assert(_state < StateClosing);
+ assert(_state > StateNotValidated && _state < StateClosing);
Int requestId;
@@ -497,7 +512,7 @@ IceInternal::Connection::prepareBatchRequest(BasicStream* os)
unlock();
_exception->ice_throw();
}
- assert(_state < StateClosing);
+ assert(_state > StateNotValidated && _state < StateClosing);
if(_batchStream.b.empty())
{
@@ -529,7 +544,7 @@ IceInternal::Connection::finishBatchRequest(BasicStream* os)
unlock();
_exception->ice_throw();
}
- assert(_state < StateClosing);
+ assert(_state > StateNotValidated && _state < StateClosing);
_batchStream.swap(*os); // Get the batch stream back.
++_batchRequestNum; // Increment the number of requests in the batch.
@@ -557,7 +572,7 @@ IceInternal::Connection::flushBatchRequest(bool compress)
{
_exception->ice_throw();
}
- assert(_state < StateClosing);
+ assert(_state > StateNotValidated && _state < StateClosing);
try
{
@@ -1227,7 +1242,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
_batchRequestNum(0),
_dispatchCount(0),
_proxyCount(0),
- _state(StateHolding)
+ _state(StateNotValidated)
{
vector<Byte>& requestHdr = const_cast<vector<Byte>&>(_requestHdr);
requestHdr[0] = protocolVersion;
@@ -1318,9 +1333,18 @@ IceInternal::Connection::setState(State state)
switch(state)
{
+ case StateNotValidated:
+ {
+ assert(false);
+ break;
+ }
+
case StateActive:
{
- if(_state != StateHolding) // Can only switch from holding to active.
+ //
+ // Can only switch from holding to active.
+ //
+ if(_state != StateHolding)
{
return;
}
@@ -1330,7 +1354,11 @@ IceInternal::Connection::setState(State state)
case StateHolding:
{
- if(_state != StateActive) // Can only switch from active to holding.
+ //
+ // Can only switch from active or not validated to
+ // holding.
+ //
+ if(_state != StateActive && _state != StateNotValidated)
{
return;
}
@@ -1340,7 +1368,10 @@ IceInternal::Connection::setState(State state)
case StateClosing:
{
- if(_state == StateClosed) // Can't change back from closed.
+ //
+ // Can't change back from closed.
+ //
+ if(_state == StateClosed)
{
return;
}
diff --git a/cpp/src/Ice/Connection.h b/cpp/src/Ice/Connection.h
index 8454538953a..ce1c135872f 100644
--- a/cpp/src/Ice/Connection.h
+++ b/cpp/src/Ice/Connection.h
@@ -113,6 +113,7 @@ private:
enum State
{
+ StateNotValidated,
StateActive,
StateHolding,
StateClosing,
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index d9ea5f2849f..7088fe259a4 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -94,119 +94,132 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
ConnectionPtr
IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpoints)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ ConnectionPtr connection;
- if(!_instance)
{
- throw CommunicatorDestroyedException(__FILE__, __LINE__);
- }
-
- assert(!endpoints.empty());
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- //
- // Reap connections for which destruction has completed.
- //
- std::map<EndpointPtr, ConnectionPtr>::iterator p = _connections.begin();
- while(p != _connections.end())
- {
- if(p->second->isFinished())
- {
- _connections.erase(p++);
- }
- else
+ if(!_instance)
{
- ++p;
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
}
- }
- //
- // Search for existing connections.
- //
- DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
- vector<EndpointPtr>::const_iterator q;
- for(q = endpoints.begin(); q != endpoints.end(); ++q)
- {
- EndpointPtr endpoint = *q;
- if(defaultsAndOverrides->overrideTimeout)
- {
- endpoint = endpoint->timeout(defaultsAndOverrides->overrideTimeoutValue);
- }
+ assert(!endpoints.empty());
- map<EndpointPtr, ConnectionPtr>::const_iterator r = _connections.find(endpoint);
- if(r != _connections.end())
+ //
+ // Reap connections for which destruction has completed.
+ //
+ std::map<EndpointPtr, ConnectionPtr>::iterator p = _connections.begin();
+ while(p != _connections.end())
{
- //
- // Don't return connections for which destruction has been
- // initiated.
- //
- if(!r->second->isDestroyed())
+ if(p->second->isFinished())
{
- return r->second;
+ _connections.erase(p++);
+ }
+ else
+ {
+ ++p;
}
}
- }
- //
- // No connections exist, try to create one.
- //
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- LoggerPtr logger = _instance->logger();
-
- ConnectionPtr connection;
- auto_ptr<LocalException> exception;
- q = endpoints.begin();
- while(q != endpoints.end())
- {
- EndpointPtr endpoint = *q;
- if(defaultsAndOverrides->overrideTimeout)
+ //
+ // Search for existing connections.
+ //
+ DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
+ vector<EndpointPtr>::const_iterator q;
+ for(q = endpoints.begin(); q != endpoints.end(); ++q)
{
- endpoint = endpoint->timeout(defaultsAndOverrides->overrideTimeoutValue);
- }
+ EndpointPtr endpoint = *q;
+ if(defaultsAndOverrides->overrideTimeout)
+ {
+ endpoint = endpoint->timeout(defaultsAndOverrides->overrideTimeoutValue);
+ }
- try
- {
- TransceiverPtr transceiver = endpoint->clientTransceiver();
- if(!transceiver)
+ map<EndpointPtr, ConnectionPtr>::const_iterator r = _connections.find(endpoint);
+ if(r != _connections.end())
{
- ConnectorPtr connector = endpoint->connector();
- assert(connector);
- transceiver = connector->connect(endpoint->timeout());
- assert(transceiver);
- }
- connection = new Connection(_instance, transceiver, endpoint, 0);
- connection->validate();
- connection->activate();
- _connections.insert(make_pair(endpoint, connection));
- break;
- }
- catch(const LocalException& ex)
- {
- exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone()));
+ //
+ // Don't return connections for which destruction has been
+ // initiated.
+ //
+ if(!r->second->isDestroyed())
+ {
+ connection = r->second;
+ break;
+ }
+ }
}
- ++q;
-
- if(traceLevels->retry >= 2)
+ //
+ // No connections exist, try to create one.
+ //
+ if(!connection)
{
- Trace out(logger, traceLevels->retryCat);
- out << "connection to endpoint failed";
- if(q != endpoints.end())
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ LoggerPtr logger = _instance->logger();
+
+ auto_ptr<LocalException> exception;
+ q = endpoints.begin();
+ while(q != endpoints.end())
{
- out << ", trying next endpoint\n";
+ EndpointPtr endpoint = *q;
+ if(defaultsAndOverrides->overrideTimeout)
+ {
+ endpoint = endpoint->timeout(defaultsAndOverrides->overrideTimeoutValue);
+ }
+
+ try
+ {
+ TransceiverPtr transceiver = endpoint->clientTransceiver();
+ if(!transceiver)
+ {
+ ConnectorPtr connector = endpoint->connector();
+ assert(connector);
+ transceiver = connector->connect(endpoint->timeout());
+ assert(transceiver);
+ }
+ connection = new Connection(_instance, transceiver, endpoint, 0);
+ _connections.insert(make_pair(endpoint, connection));
+ break;
+ }
+ catch(const LocalException& ex)
+ {
+ exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone()));
+ }
+
+ ++q;
+
+ if(traceLevels->retry >= 2)
+ {
+ Trace out(logger, traceLevels->retryCat);
+ out << "connection to endpoint failed";
+ if(q != endpoints.end())
+ {
+ out << ", trying next endpoint\n";
+ }
+ else
+ {
+ out << " and no more endpoints to try\n";
+ }
+ out << *exception.get();
+ }
}
- else
+
+ if(!connection)
{
- out << " and no more endpoints to try\n";
+ assert(exception.get());
+ exception->ice_throw();
}
- out << *exception.get();
}
}
- if(!connection)
- {
- assert(exception.get());
- exception->ice_throw();
- }
+ //
+ // We validate and activate outside the thread synchronization, to
+ // not block the factory.
+ //
+ assert(connection);
+ connection->validate();
+ connection->activate();
return connection;
}
@@ -397,80 +410,90 @@ IceInternal::IncomingConnectionFactory::read(BasicStream&)
void
IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPtr& threadPool)
{
- ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this);
-
- if(_state != StateActive)
- {
- IceUtil::ThreadControl::yield();
- threadPool->promoteFollower();
- return;
- }
-
- //
- // Reap connections for which destruction has completed.
- //
- _connections.erase(remove_if(_connections.begin(), _connections.end(),
- ::Ice::constMemFun(&Connection::isFinished)),
- _connections.end());
+ ConnectionPtr connection;
- //
- // Now accept a new connection.
- //
- TransceiverPtr transceiver;
- try
- {
- transceiver = _acceptor->accept(0);
- }
- catch(const SocketException&)
- {
- // TODO: bandaid. Takes care of SSL Handshake problems during
- // creation of a Transceiver. Ignore, nothing we can do here.
- threadPool->promoteFollower();
- return;
- }
- catch(const TimeoutException&)
- {
- // Ignore timeouts.
- threadPool->promoteFollower();
- return;
- }
- catch(const LocalException& ex)
{
- // Warn about other Ice local exceptions.
- if(_warn)
+ ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this);
+
+ if(_state != StateActive)
{
- Warning out(_instance->logger());
- out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
+ IceUtil::ThreadControl::yield();
+ threadPool->promoteFollower();
+ return;
}
+
+ //
+ // Reap connections for which destruction has completed.
+ //
+ _connections.erase(remove_if(_connections.begin(), _connections.end(),
+ ::Ice::constMemFun(&Connection::isFinished)),
+ _connections.end());
+
+ //
+ // Now accept a new connection.
+ //
+ TransceiverPtr transceiver;
+ try
+ {
+ transceiver = _acceptor->accept(0);
+ }
+ catch(const SocketException&)
+ {
+ // TODO: bandaid. Takes care of SSL Handshake problems during
+ // creation of a Transceiver. Ignore, nothing we can do here.
+ threadPool->promoteFollower();
+ return;
+ }
+ catch(const TimeoutException&)
+ {
+ // Ignore timeouts.
+ threadPool->promoteFollower();
+ return;
+ }
+ catch(const LocalException& ex)
+ {
+ // Warn about other Ice local exceptions.
+ if(_warn)
+ {
+ Warning out(_instance->logger());
+ out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
+ }
+ threadPool->promoteFollower();
+ return;
+ }
+ catch(...)
+ {
+ threadPool->promoteFollower();
+ throw;
+ }
+
+ //
+ // We must promote a follower after we accepted a new connection.
+ //
threadPool->promoteFollower();
- return;
- }
- catch(...)
- {
- threadPool->promoteFollower();
- throw;
+
+ //
+ // Create a connection object for the connection.
+ //
+ assert(transceiver);
+ connection = new Connection(_instance, transceiver, _endpoint, _adapter);
+ _connections.push_back(connection);
}
-
- //
- // We must promote a follower after we accepted a new connection.
- //
- threadPool->promoteFollower();
-
+
//
- // Create and activate a connection object for the connection.
+ // We validate and activate outside the thread synchronization, to
+ // not block the factory.
//
try
{
- assert(transceiver);
- ConnectionPtr connection = new Connection(_instance, transceiver, _endpoint, _adapter);
+ assert(connection);
connection->validate();
- connection->activate();
- _connections.push_back(connection);
+ connection->activate(); // The factory must be active at this point, so we activate the connection, too.
}
catch(const LocalException&)
{
//
- // Ignore all exceptions while creating or activating the
+ // Ignore all exceptions while activating or validating the
// connection object. Warning or error messages for such
// exceptions must be printed directly in the connection
// object code.