summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionFactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp221
1 files changed, 93 insertions, 128 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 6bcc4c9ca20..9bba761af6d 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -60,11 +60,11 @@ IceInternal::OutgoingConnectionFactory::destroy()
#ifdef _STLP_BEGIN_NAMESPACE
// voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h
for_each(_connections.begin(), _connections.end(),
- voidbind2nd(Ice::secondVoidMemFun1<EndpointIPtr, ConnectionI, ConnectionI::DestructionReason>
+ voidbind2nd(Ice::secondVoidMemFun1<ConnectorPtr, ConnectionI, ConnectionI::DestructionReason>
(&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed));
#else
for_each(_connections.begin(), _connections.end(),
- bind2nd(Ice::secondVoidMemFun1<const EndpointIPtr, ConnectionI, ConnectionI::DestructionReason>
+ bind2nd(Ice::secondVoidMemFun1<const ConnectorPtr, ConnectionI, ConnectionI::DestructionReason>
(&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed));
#endif
@@ -75,7 +75,7 @@ IceInternal::OutgoingConnectionFactory::destroy()
void
IceInternal::OutgoingConnectionFactory::waitUntilFinished()
{
- multimap<EndpointIPtr, ConnectionIPtr> connections;
+ multimap<ConnectorPtr, ConnectionIPtr> connections;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -98,7 +98,7 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
}
for_each(connections.begin(), connections.end(),
- Ice::secondVoidMemFun<const EndpointIPtr, ConnectionI>(&ConnectionI::waitUntilFinished));
+ Ice::secondVoidMemFun<const ConnectorPtr, ConnectionI>(&ConnectionI::waitUntilFinished));
}
ConnectionIPtr
@@ -107,7 +107,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
bool& compress)
{
assert(!endpts.empty());
- vector<EndpointIPtr> endpoints = endpts;
+ vector<pair<ConnectorPtr, EndpointIPtr> > connectors;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -120,7 +120,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
//
// Reap connections for which destruction has completed.
//
- std::multimap<EndpointIPtr, ConnectionIPtr>::iterator p = _connections.begin();
+ std::multimap<ConnectorPtr, ConnectionIPtr>::iterator p = _connections.begin();
while(p != _connections.end())
{
if(p->second->isFinished())
@@ -133,37 +133,48 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
}
}
- //
- // Modify endpoints with overrides.
- //
+ vector<EndpointIPtr> endpoints = endpts;
vector<EndpointIPtr>::iterator q;
for(q = endpoints.begin(); q != endpoints.end(); ++q)
{
+ //
+ // Modify endpoints with overrides.
+ //
if(_instance->defaultsAndOverrides()->overrideTimeout)
{
*q = (*q)->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue);
}
//
- // The Connection object does not take the compression flag of
- // endpoints into account, but instead gets the information
- // about whether messages should be compressed or not from
- // other sources. In order to allow connection sharing for
- // endpoints that differ in the value of the compression flag
- // only, we always set the compression flag to false here in
- // this connection factory.
+ // Create connectors for the endpoints.
//
- *q = (*q)->compress(false);
+ vector<ConnectorPtr> cons = (*q)->connectors();
+ assert(cons.size() > 0);
+
+ //
+ // Shuffle connectors is endpoint selection type is Random.
+ //
+ if(selType == Random)
+ {
+ RandomNumberGenerator rng;
+ random_shuffle(cons.begin(), cons.end(), rng);
+ }
+
+ vector<ConnectorPtr>::const_iterator r;
+ for(r = cons.begin(); r != cons.end(); ++r)
+ {
+ connectors.push_back(make_pair(*r, *q));
+ }
}
//
// Search for existing connections.
//
- vector<EndpointIPtr>::const_iterator r;
- for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r)
+ vector<pair<ConnectorPtr, EndpointIPtr> >::const_iterator r;
+ for(r = connectors.begin(); r != connectors.end(); ++r)
{
- pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
- multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(*q);
+ pair<multimap<ConnectorPtr, ConnectionIPtr>::iterator,
+ multimap<ConnectorPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range((*r).first);
while(pr.first != pr.second)
{
@@ -181,7 +192,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
}
else
{
- compress = (*r)->compress();
+ compress = (*r).second->compress();
}
return pr.first->second;
@@ -199,15 +210,15 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
bool searchAgain = false;
while(!_destroyed)
{
- for(q = endpoints.begin(); q != endpoints.end(); ++q)
+ for(r = connectors.begin(); r != connectors.end(); ++r)
{
- if(_pending.find(*q) != _pending.end())
+ if(_pending.find((*r).first) != _pending.end())
{
break;
}
}
- if(q == endpoints.end())
+ if(r == connectors.end())
{
break;
}
@@ -228,10 +239,10 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
//
if(searchAgain)
{
- for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r)
+ for(r = connectors.begin(); r != connectors.end(); ++r)
{
- pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
- multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(*q);
+ pair<multimap<ConnectorPtr, ConnectionIPtr>::iterator,
+ multimap<ConnectorPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range((*r).first);
while(pr.first != pr.second)
{
@@ -249,7 +260,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
}
else
{
- compress = (*r)->compress();
+ compress = (*r).second->compress();
}
return pr.first->second;
@@ -266,100 +277,45 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
// create connections to the same endpoints, we add our
// endpoints to _pending.
//
- _pending.insert(endpoints.begin(), endpoints.end());
+ for(r = connectors.begin(); r != connectors.end(); ++r)
+ {
+ _pending.insert((*r).first);
+ }
}
+ ConnectorPtr connector;
ConnectionIPtr connection;
auto_ptr<LocalException> exception;
- vector<EndpointIPtr>::const_iterator q;
- vector<EndpointIPtr>::const_iterator r;
- for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r)
+ vector<pair<ConnectorPtr, EndpointIPtr> >::const_iterator q;
+ for(q = connectors.begin(); q != connectors.end(); ++q)
{
- EndpointIPtr endpoint = *q;
+ connector = (*q).first;
+ EndpointIPtr endpoint = (*q).second;
try
{
- vector<ConnectorPtr> connectors;
- unsigned int size;
- Int timeout;
- vector<TransceiverPtr> transceivers = endpoint->clientTransceivers();
- if(transceivers.size() == 0)
+ int timeout;
+ if(_instance->defaultsAndOverrides()->overrideConnectTimeout)
{
- connectors = endpoint->connectors();
- size = connectors.size();
- assert(size > 0);
-
- if(selType == Random)
- {
- RandomNumberGenerator rng;
- random_shuffle(connectors.begin(), connectors.end(), rng);
- }
-
- if(_instance->defaultsAndOverrides()->overrideConnectTimeout)
- {
- timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue;
- }
- // It is not necessary to check for overrideTimeout,
- // the endpoint has already been modified with this
- // override, if set.
- else
- {
- timeout = endpoint->timeout();
- }
- }
+ timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue;
+ }
+ // It is not necessary to check for overrideTimeout,
+ // the endpoint has already been modified with this
+ // override, if set.
else
{
- size = transceivers.size();
- if(selType == Random)
- {
- RandomNumberGenerator rng;
- random_shuffle(transceivers.begin(), transceivers.end(), rng);
- }
+ timeout = endpoint->timeout();
}
- for(unsigned int i = 0; i < size; ++i)
- {
- try
- {
- TransceiverPtr transceiver;
- if(transceivers.size() == size)
- {
- transceiver = transceivers[i];
- }
- else
- {
- transceiver = connectors[i]->connect(timeout);
- assert(transceiver);
- }
+ TransceiverPtr transceiver = connector->connect(timeout);
+ assert(transceiver);
- connection = new ConnectionI(_instance, transceiver, endpoint, 0, threadPerConnection,
+ connection = new ConnectionI(_instance, transceiver, endpoint->compress(false), 0, threadPerConnection,
_instance->threadPerConnectionStackSize());
- connection->start();
- connection->validate();
- }
- catch(const LocalException& ex)
- {
- //
- // If a connection object was constructed, then validate()
- // must have raised the exception.
- //
- if(connection)
- {
- connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup.
- connection = 0;
- }
-
- //
- // Throw exception if this is last transceiver in list.
- //
- if(i == size - 1)
- {
- ex.ice_throw();
- }
- }
- }
+ connection->start();
+ connection->validate();
if(_instance->defaultsAndOverrides()->overrideCompress)
{
@@ -367,13 +323,23 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
}
else
{
- compress = (*r)->compress();
+ compress = endpoint->compress();
}
break;
}
catch(const LocalException& ex)
{
exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
+
+ //
+ // If a connection object was constructed, then validate()
+ // must have raised the exception.
+ //
+ if(connection)
+ {
+ connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup.
+ connection = 0;
+ }
}
TraceLevelsPtr traceLevels = _instance->traceLevels();
@@ -382,7 +348,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
Trace out(_instance->initializationData().logger, traceLevels->retryCat);
out << "connection to endpoint failed";
- if(moreEndpts || q + 1 != endpoints.end())
+ if(moreEndpts || q + 1 != connectors.end())
{
out << ", trying next endpoint\n";
}
@@ -401,9 +367,9 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
// Signal other threads that we are done with trying to
// establish connections to our endpoints.
//
- for(q = endpoints.begin(); q != endpoints.end(); ++q)
+ for(q = connectors.begin(); q != connectors.end(); ++q)
{
- _pending.erase(*q);
+ _pending.erase((*q).first);
}
notifyAll();
@@ -414,8 +380,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
}
else
{
- _connections.insert(_connections.end(),
- pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint(), connection));
+ _connections.insert(_connections.end(), pair<const ConnectorPtr, ConnectionIPtr>(connector, connection));
if(_destroyed)
{
@@ -477,22 +442,22 @@ IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& route
//
endpoint = endpoint->compress(false);
- pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
- multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(endpoint);
-
- while(pr.first != pr.second)
+ multimap<ConnectorPtr, ConnectionIPtr>::const_iterator q;
+ for(q = _connections.begin(); q != _connections.end(); ++q)
{
- try
+ if((*q).second->endpoint() == endpoint)
{
- pr.first->second->setAdapter(adapter);
- }
- catch(const Ice::LocalException&)
- {
- //
- // Ignore, the connection is being closed or closed.
- //
+ try
+ {
+ (*q).second->setAdapter(adapter);
+ }
+ catch(const Ice::LocalException&)
+ {
+ //
+ // Ignore, the connection is being closed or closed.
+ //
+ }
}
- ++pr.first;
}
}
}
@@ -507,7 +472,7 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad
return;
}
- for(multimap<EndpointIPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p)
+ for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p)
{
if(p->second->getAdapter() == adapter)
{
@@ -533,7 +498,7 @@ IceInternal::OutgoingConnectionFactory::flushBatchRequests()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- for(std::multimap<EndpointIPtr, ConnectionIPtr>::const_iterator p = _connections.begin();
+ for(std::multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin();
p != _connections.end();
++p)
{
@@ -919,7 +884,7 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
_threadPerConnection = adapterImpl->getThreadPerConnection();
_threadPerConnectionStackSize = adapterImpl->getThreadPerConnectionStackSize();
- const_cast<TransceiverPtr&>(_transceiver) = _endpoint->serverTransceiver(const_cast<EndpointIPtr&>(_endpoint));
+ const_cast<TransceiverPtr&>(_transceiver) = _endpoint->transceiver(const_cast<EndpointIPtr&>(_endpoint));
if(_transceiver)
{
ConnectionIPtr connection;