diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 1695 |
1 files changed, 1695 insertions, 0 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp new file mode 100644 index 00000000000..ee201fd09cd --- /dev/null +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -0,0 +1,1695 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/ConnectionFactory.h> +#include <Ice/ConnectionI.h> +#include <Ice/Instance.h> +#include <Ice/LoggerUtil.h> +#include <Ice/TraceLevels.h> +#include <Ice/DefaultsAndOverrides.h> +#include <Ice/Properties.h> +#include <Ice/Transceiver.h> +#include <Ice/Connector.h> +#include <Ice/Acceptor.h> +#include <Ice/ThreadPool.h> +#include <Ice/ObjectAdapterI.h> // For getThreadPool(). +#include <Ice/Reference.h> +#include <Ice/EndpointI.h> +#include <Ice/RouterInfo.h> +#include <Ice/LocalException.h> +#include <Ice/Functional.h> +#include <IceUtil/Random.h> +#include <iterator> + +using namespace std; +using namespace Ice; +using namespace IceInternal; + +IceUtil::Shared* IceInternal::upCast(OutgoingConnectionFactory* p) { return p; } +IceUtil::Shared* IceInternal::upCast(IncomingConnectionFactory* p) { return p; } + +namespace +{ + +struct RandomNumberGenerator : public std::unary_function<ptrdiff_t, ptrdiff_t> +{ + ptrdiff_t operator()(ptrdiff_t d) + { + return IceUtilInternal::random(static_cast<int>(d)); + } +}; + +template <typename K, typename V> void +remove(multimap<K, V>& m, K k, V v) +{ +#if defined(_MSC_VER) && (_MSC_VER < 1300) + pair<multimap<K, V>::iterator, multimap<K, V>::iterator> pr = m.equal_range(k); + assert(pr.first != pr.second); + for(multimap<K, V>::iterator q = pr.first; q != pr.second; ++q) +#else + pair<typename multimap<K, V>::iterator, typename multimap<K, V>::iterator> pr = m.equal_range(k); + assert(pr.first != pr.second); + for(typename multimap<K, V>::iterator q = pr.first; q != pr.second; ++q) +#endif + { + if(q->second.get() == v.get()) + { + m.erase(q); + return; + } + } + assert(false); // Nothing was removed which is an error. +} + +template <typename K, typename V> ::IceInternal::Handle<V> +find(const multimap<K,::IceInternal::Handle<V> >& m, + K k, + const ::IceUtilInternal::ConstMemFun<bool, V, ::IceInternal::Handle<V> >& predicate) +{ +#if defined(_MSC_VER) && (_MSC_VER < 1300) + pair<multimap<K, ::IceInternal::Handle<V> >::const_iterator, + multimap<K, ::IceInternal::Handle<V> >::const_iterator> pr = m.equal_range(k); + for(multimap<K, ::IceInternal::Handle<V> >::const_iterator q = pr.first; q != pr.second; ++q) +#else + pair<typename multimap<K, ::IceInternal::Handle<V> >::const_iterator, + typename multimap<K, ::IceInternal::Handle<V> >::const_iterator> pr = m.equal_range(k); + for(typename multimap<K, ::IceInternal::Handle<V> >::const_iterator q = pr.first; q != pr.second; ++q) +#endif + { + if(predicate(q->second)) + { + return q->second; + } + } + return IceInternal::Handle<V>(); +} + +} + +bool +IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator==(const ConnectorInfo& other) const +{ + return connector == other.connector; +} + +void +IceInternal::OutgoingConnectionFactory::destroy() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(_destroyed) + { + return; + } + +#ifdef _STLP_BEGIN_NAMESPACE + // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h + for_each(_connections.begin(), _connections.end(), + voidbind2nd(Ice::secondVoidMemFun1<ConnectorPtr, ConnectionI, ConnectionI::DestructionReason> + (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); +#else + for_each(_connections.begin(), _connections.end(), + bind2nd(Ice::secondVoidMemFun1<const ConnectorPtr, ConnectionI, ConnectionI::DestructionReason> + (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); +#endif + + _destroyed = true; + notifyAll(); +} + +void +IceInternal::OutgoingConnectionFactory::waitUntilFinished() +{ + multimap<ConnectorPtr, ConnectionIPtr> connections; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // First we wait until the factory is destroyed. We also wait + // until there are no pending connections anymore. Only then + // we can be sure the _connections contains all connections. + // + while(!_destroyed || !_pending.empty() || _pendingConnectCount > 0) + { + wait(); + } + + // + // We want to wait until all connections are finished outside the + // thread synchronization. + // + connections = _connections; + } + + for_each(connections.begin(), connections.end(), + Ice::secondVoidMemFun<const ConnectorPtr, ConnectionI>(&ConnectionI::waitUntilFinished)); + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + // Ensure all the connections are finished and reapable at this point. + vector<Ice::ConnectionIPtr> cons; + _reaper->swapConnections(cons); + assert(cons.size() == _connections.size()); + cons.clear(); + _connections.clear(); + _connectionsByEndpoint.clear(); + } +} + +ConnectionIPtr +IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore, + Ice::EndpointSelectionType selType, bool& compress) +{ + assert(!endpts.empty()); + + // + // Apply the overrides. + // + vector<EndpointIPtr> endpoints = applyOverrides(endpts); + + // + // Try to find a connection to one of the given endpoints. + // + Ice::ConnectionIPtr connection = findConnection(endpoints, compress); + if(connection) + { + return connection; + } + + auto_ptr<Ice::LocalException> exception; + + // + // If we didn't find a connection with the endpoints, we create the connectors + // for the endpoints. + // + vector<ConnectorInfo> connectors; + for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { + // + // Create connectors for the endpoint. + // + try + { + vector<ConnectorPtr> cons = (*p)->connectors(); + assert(!cons.empty()); + + if(selType == Random) + { + RandomNumberGenerator rng; + random_shuffle(cons.begin(), cons.end(), rng); + } + + for(vector<ConnectorPtr>::const_iterator r = cons.begin(); r != cons.end(); ++r) + { + assert(*r); + connectors.push_back(ConnectorInfo(*r, *p)); + } + } + catch(const Ice::LocalException& ex) + { + exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + handleException(ex, hasMore || p != endpoints.end() - 1); + } + } + + if(connectors.empty()) + { + assert(exception.get()); + exception->ice_throw(); + } + + // + // Try to get a connection to one of the connectors. A null result indicates that no + // connection was found and that we should try to establish the connection (and that + // the connectors were added to _pending to prevent other threads from establishing + // the connection). + // + connection = getConnection(connectors, 0, compress); + if(connection) + { + return connection; + } + + // + // Try to establish the connection to the connectors. + // + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + vector<ConnectorInfo>::const_iterator q; + for(q = connectors.begin(); q != connectors.end(); ++q) + { + try + { + connection = createConnection(q->connector->connect(), *q); + connection->start(0); + + if(defaultsAndOverrides->overrideCompress) + { + compress = defaultsAndOverrides->overrideCompressValue; + } + else + { + compress = q->endpoint->compress(); + } + + connection->activate(); + break; + } + catch(const Ice::CommunicatorDestroyedException& ex) + { + exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + handleConnectionException(*exception.get(), hasMore || q != connectors.end() - 1); + connection = 0; + break; // No need to continue + } + catch(const Ice::LocalException& ex) + { + exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + handleConnectionException(*exception.get(), hasMore || q != connectors.end() - 1); + connection = 0; + } + } + + // + // Finish creating the connection (this removes the connectors from the _pending + // list and notifies any waiting threads). + // + if(connection) + { + finishGetConnection(connectors, *q, connection, 0); + } + else + { + finishGetConnection(connectors, *exception.get(), 0); + } + + if(!connection) + { + assert(exception.get()); + exception->ice_throw(); + } + + return connection; +} + +void +IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore, + Ice::EndpointSelectionType selType, + const CreateConnectionCallbackPtr& callback) +{ + assert(!endpts.empty()); + + // + // Apply the overrides. + // + vector<EndpointIPtr> endpoints = applyOverrides(endpts); + + // + // Try to find a connection to one of the given endpoints. + // + try + { + bool compress; + Ice::ConnectionIPtr connection = findConnection(endpoints, compress); + if(connection) + { + callback->setConnection(connection, compress); + return; + } + } + catch(const Ice::LocalException& ex) + { + callback->setException(ex); + return; + } + + ConnectCallbackPtr cb = new ConnectCallback(this, endpoints, hasMore, callback, selType); + cb->getConnectors(); +} + +void +IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& routerInfo) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(_destroyed) + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + + assert(routerInfo); + + // + // Search for connections to the router's client proxy endpoints, + // and update the object adapter for such connections, so that + // callbacks from the router can be received over such + // connections. + // + ObjectAdapterPtr adapter = routerInfo->getAdapter(); + vector<EndpointIPtr> endpoints = routerInfo->getClientEndpoints(); + vector<EndpointIPtr>::const_iterator p; + for(p = endpoints.begin(); p != endpoints.end(); ++p) + { + EndpointIPtr endpoint = *p; + + // + // Modify endpoints with overrides. + // + if(_instance->defaultsAndOverrides()->overrideTimeout) + { + endpoint = endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); + } + + // + // The Connection object does not take the compression flag of + // endpoints into account, but instead gets the information + // about whether messages should be compressed or not from + // other sources. In order to allow connection sharing for + // endpoints that differ in the value of the compression flag + // only, we always set the compression flag to false here in + // this connection factory. + // + endpoint = endpoint->compress(false); + + multimap<ConnectorPtr, ConnectionIPtr>::const_iterator q; + for(q = _connections.begin(); q != _connections.end(); ++q) + { + if(q->second->endpoint() == endpoint) + { + q->second->setAdapter(adapter); + } + } + } +} + +void +IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& adapter) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(_destroyed) + { + return; + } + + for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p) + { + if(p->second->getAdapter() == adapter) + { + p->second->setAdapter(0); + } + } +} + +void +IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync) +{ + list<ConnectionIPtr> c; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); + ++p) + { + if(p->second->isActiveOrHolding()) + { + c.push_back(p->second); + } + } + } + + for(list<ConnectionIPtr>::const_iterator p = c.begin(); p != c.end(); ++p) + { + try + { + outAsync->flushConnection(*p); + } + catch(const LocalException&) + { + // Ignore. + } + } +} + +IceInternal::OutgoingConnectionFactory::OutgoingConnectionFactory(const InstancePtr& instance) : + _instance(instance), + _reaper(new ConnectionReaper()), + _destroyed(false), + _pendingConnectCount(0) +{ +} + +IceInternal::OutgoingConnectionFactory::~OutgoingConnectionFactory() +{ + assert(_destroyed); + assert(_connections.empty()); + assert(_connectionsByEndpoint.empty()); + assert(_pending.empty()); + assert(_pendingConnectCount == 0); +} + +vector<EndpointIPtr> +IceInternal::OutgoingConnectionFactory::applyOverrides(const vector<EndpointIPtr>& endpts) +{ + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + vector<EndpointIPtr> endpoints = endpts; + for(vector<EndpointIPtr>::iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { + // + // Modify endpoints with overrides. + // + if(defaultsAndOverrides->overrideTimeout) + { + *p = (*p)->timeout(defaultsAndOverrides->overrideTimeoutValue); + } + } + return endpoints; +} + +ConnectionIPtr +IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr>& endpoints, bool& compress) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_destroyed) + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + assert(!endpoints.empty()); + for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { + ConnectionIPtr connection = find(_connectionsByEndpoint, *p, Ice::constMemFun(&ConnectionI::isActiveOrHolding)); + if(connection) + { + if(defaultsAndOverrides->overrideCompress) + { + compress = defaultsAndOverrides->overrideCompressValue; + } + else + { + compress = (*p)->compress(); + } + return connection; + } + } + return 0; +} + +ConnectionIPtr +IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInfo>& connectors, bool& compress) +{ + // This must be called with the mutex locked. + + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + if(_pending.find(p->connector) != _pending.end()) + { + continue; + } + + ConnectionIPtr connection = find(_connections, p->connector, Ice::constMemFun(&ConnectionI::isActiveOrHolding)); + if(connection) + { + if(defaultsAndOverrides->overrideCompress) + { + compress = defaultsAndOverrides->overrideCompressValue; + } + else + { + compress = p->endpoint->compress(); + } + return connection; + } + } + + return 0; +} + +void +IceInternal::OutgoingConnectionFactory::incPendingConnectCount() +{ + // + // Keep track of the number of pending connects. The outgoing connection factory + // waitUntilFinished() method waits for all the pending connects to terminate before + // to return. This ensures that the communicator client thread pool isn't destroyed + // too soon and will still be available to execute the ice_exception() callbacks for + // the asynchronous requests waiting on a connection to be established. + // + + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_destroyed) + { + throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); + } + ++_pendingConnectCount; +} + +void +IceInternal::OutgoingConnectionFactory::decPendingConnectCount() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + --_pendingConnectCount; + assert(_pendingConnectCount >= 0); + if(_destroyed && _pendingConnectCount == 0) + { + notifyAll(); + } +} + +ConnectionIPtr +IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo>& connectors, + const ConnectCallbackPtr& cb, + bool& compress) +{ + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_destroyed) + { + throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); + } + + // + // Reap closed connections + // + vector<Ice::ConnectionIPtr> cons; + _reaper->swapConnections(cons); + for(vector<Ice::ConnectionIPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p) + { + remove(_connections, (*p)->connector(), *p); + remove(_connectionsByEndpoint, (*p)->endpoint(), *p); + remove(_connectionsByEndpoint, (*p)->endpoint()->compress(true), *p); + } + + // + // Try to get the connection. We may need to wait for other threads to + // finish if one of them is currently establishing a connection to one + // of our connectors. + // + while(true) + { + if(_destroyed) + { + throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); + } + + // + // Search for a matching connection. If we find one, we're done. + // + Ice::ConnectionIPtr connection = findConnection(connectors, compress); + if(connection) + { + return connection; + } + + // + // Determine whether another thread/request is currently attempting to connect to + // one of our endpoints; if so we wait until it's done. + // + if(addToPending(cb, connectors)) + { + // + // If a callback is not specified we wait until another thread notifies us about a + // change to the pending list. Otherwise, if a callback is provided we're done: + // when the pending list changes the callback will be notified and will try to + // get the connection again. + // + if(!cb) + { + wait(); + } + else + { + return 0; + } + } + else + { + // + // If no thread is currently establishing a connection to one of our connectors, + // we get out of this loop and start the connection establishment to one of the + // given connectors. + // + break; + } + } + } + + // + // At this point, we're responsible for establishing the connection to one of + // the given connectors. If it's a non-blocking connect, calling nextConnector + // will start the connection establishment. Otherwise, we return null to get + // the caller to establish the connection. + // + if(cb) + { + cb->nextConnector(); + } + + return 0; +} + +ConnectionIPtr +IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& transceiver, const ConnectorInfo& ci) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_pending.find(ci.connector) != _pending.end() && transceiver); + + // + // Create and add the connection to the connection map. Adding the connection to the map + // is necessary to support the interruption of the connection initialization and validation + // in case the communicator is destroyed. + // + Ice::ConnectionIPtr connection; + try + { + if(_destroyed) + { + throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); + } + + connection = new ConnectionI(_instance, _reaper, transceiver, ci.connector, ci.endpoint->compress(false), 0); + } + catch(const Ice::LocalException&) + { + try + { + transceiver->close(); + } + catch(const Ice::LocalException&) + { + // Ignore + } + throw; + } + + _connections.insert(pair<const ConnectorPtr, ConnectionIPtr>(ci.connector, connection)); + _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint(), connection)); + _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint()->compress(true), + connection)); + return connection; +} + +void +IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors, + const ConnectorInfo& ci, + const ConnectionIPtr& connection, + const ConnectCallbackPtr& cb) +{ + set<ConnectCallbackPtr> connectionCallbacks; + if(cb) + { + connectionCallbacks.insert(cb); + } + + set<ConnectCallbackPtr> callbacks; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector); + if(q != _pending.end()) + { + for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r) + { + if((*r)->hasConnector(ci)) + { + connectionCallbacks.insert(*r); + } + else + { + callbacks.insert(*r); + } + } + _pending.erase(q); + } + } + + set<ConnectCallbackPtr>::iterator r; + for(r = connectionCallbacks.begin(); r != connectionCallbacks.end(); ++r) + { + (*r)->removeFromPending(); + callbacks.erase(*r); + } + for(r = callbacks.begin(); r != callbacks.end(); ++r) + { + (*r)->removeFromPending(); + } + notifyAll(); + } + + bool compress; + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + if(defaultsAndOverrides->overrideCompress) + { + compress = defaultsAndOverrides->overrideCompressValue; + } + else + { + compress = ci.endpoint->compress(); + } + + set<ConnectCallbackPtr>::const_iterator p; + for(p = callbacks.begin(); p != callbacks.end(); ++p) + { + (*p)->getConnection(); + } + for(p = connectionCallbacks.begin(); p != connectionCallbacks.end(); ++p) + { + (*p)->setConnection(connection, compress); + } +} + +void +IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors, + const Ice::LocalException& ex, + const ConnectCallbackPtr& cb) +{ + set<ConnectCallbackPtr> failedCallbacks; + if(cb) + { + failedCallbacks.insert(cb); + } + + set<ConnectCallbackPtr> callbacks; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector); + if(q != _pending.end()) + { + for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r) + { + if((*r)->removeConnectors(connectors)) + { + failedCallbacks.insert(*r); + } + else + { + callbacks.insert(*r); + } + } + _pending.erase(q); + } + } + + for(set<ConnectCallbackPtr>::iterator r = callbacks.begin(); r != callbacks.end(); ++r) + { + assert(failedCallbacks.find(*r) == failedCallbacks.end()); + (*r)->removeFromPending(); + } + notifyAll(); + } + + set<ConnectCallbackPtr>::const_iterator p; + for(p = callbacks.begin(); p != callbacks.end(); ++p) + { + (*p)->getConnection(); + } + for(p = failedCallbacks.begin(); p != failedCallbacks.end(); ++p) + { + (*p)->setException(ex); + } +} + +bool +IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& cb, + const vector<ConnectorInfo>& connectors) +{ + // + // Add the callback to each connector pending list. + // + bool found = false; + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector); + if(q != _pending.end()) + { + found = true; + if(cb) + { + q->second.insert(cb); + } + } + } + + if(found) + { + return true; + } + + // + // If there's no pending connection for the given connectors, we're + // responsible for its establishment. We add empty pending lists, + // other callbacks to the same connectors will be queued. + // + for(vector<ConnectorInfo>::const_iterator r = connectors.begin(); r != connectors.end(); ++r) + { + if(_pending.find(r->connector) == _pending.end()) + { + _pending.insert(pair<ConnectorPtr, set<ConnectCallbackPtr> >(r->connector, set<ConnectCallbackPtr>())); + } + } + return false; +} + +void +IceInternal::OutgoingConnectionFactory::removeFromPending(const ConnectCallbackPtr& cb, + const vector<ConnectorInfo>& connectors) +{ + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector); + if(q != _pending.end()) + { + q->second.erase(cb); + } + } +} + +void +IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex, bool hasMore) +{ + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->retry >= 2) + { + Trace out(_instance->initializationData().logger, traceLevels->retryCat); + + out << "couldn't resolve endpoint host"; + if(dynamic_cast<const CommunicatorDestroyedException*>(&ex)) + { + out << "\n"; + } + else + { + if(hasMore) + { + out << ", trying next endpoint\n"; + } + else + { + out << " and no more endpoints to try\n"; + } + } + out << ex; + } +} + +void +IceInternal::OutgoingConnectionFactory::handleConnectionException(const LocalException& ex, bool hasMore) +{ + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->retry >= 2) + { + Trace out(_instance->initializationData().logger, traceLevels->retryCat); + + out << "connection to endpoint failed"; + if(dynamic_cast<const CommunicatorDestroyedException*>(&ex)) + { + out << "\n"; + } + else + { + if(hasMore) + { + out << ", trying next endpoint\n"; + } + else + { + out << " and no more endpoints to try\n"; + } + } + out << ex; + } +} + +IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const OutgoingConnectionFactoryPtr& factory, + const vector<EndpointIPtr>& endpoints, + bool hasMore, + const CreateConnectionCallbackPtr& cb, + Ice::EndpointSelectionType selType) : + _factory(factory), + _endpoints(endpoints), + _hasMore(hasMore), + _callback(cb), + _selType(selType) +{ + _endpointsIter = _endpoints.begin(); +} + +// +// Methods from ConnectionI.StartCallback +// +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartCompleted(const ConnectionIPtr& connection) +{ + connection->activate(); + _factory->finishGetConnection(_connectors, *_iter, connection, this); +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(const ConnectionIPtr& connection, + const LocalException& ex) +{ + assert(_iter != _connectors.end()); + + _factory->handleConnectionException(ex, _hasMore || _iter != _connectors.end() - 1); + if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) // No need to continue. + { + _factory->finishGetConnection(_connectors, ex, this); + } + else if(++_iter != _connectors.end()) // Try the next connector. + { + nextConnector(); + } + else + { + _factory->finishGetConnection(_connectors, ex, this); + } +} + +// +// Methods from EndpointI_connectors +// +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::connectors(const vector<ConnectorPtr>& connectors) +{ + vector<ConnectorPtr> cons = connectors; + if(_selType == Random) + { + RandomNumberGenerator rng; + random_shuffle(cons.begin(), cons.end(), rng); + } + + for(vector<ConnectorPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p) + { + _connectors.push_back(ConnectorInfo(*p, *_endpointsIter)); + } + + if(++_endpointsIter != _endpoints.end()) + { + nextEndpoint(); + } + else + { + assert(!_connectors.empty()); + + // + // We now have all the connectors for the given endpoints. We can try to obtain the + // connection. + // + _iter = _connectors.begin(); + getConnection(); + } +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::exception(const Ice::LocalException& ex) +{ + _factory->handleException(ex, _hasMore || _endpointsIter != _endpoints.end() - 1); + if(++_endpointsIter != _endpoints.end()) + { + nextEndpoint(); + } + else if(!_connectors.empty()) + { + // + // We now have all the connectors for the given endpoints. We can try to obtain the + // connection. + // + _iter = _connectors.begin(); + getConnection(); + } + else + { + _callback->setException(ex); + _factory->decPendingConnectCount(); // Must be called last. + } +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnectors() +{ + try + { + // + // Notify the factory that there's an async connect pending. This is necessary + // to prevent the outgoing connection factory to be destroyed before all the + // pending asynchronous connects are finished. + // + _factory->incPendingConnectCount(); + } + catch(const Ice::LocalException& ex) + { + _callback->setException(ex); + return; + } + + nextEndpoint(); +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::nextEndpoint() +{ + try + { + assert(_endpointsIter != _endpoints.end()); + (*_endpointsIter)->connectors_async(this); + } + catch(const Ice::LocalException& ex) + { + exception(ex); + } +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection() +{ + try + { + // + // If all the connectors have been created, we ask the factory to get a + // connection. + // + bool compress; + Ice::ConnectionIPtr connection = _factory->getConnection(_connectors, this, compress); + if(!connection) + { + // + // A null return value from getConnection indicates that the connection + // is being established and that everthing has been done to ensure that + // the callback will be notified when the connection establishment is + // done or that the callback already obtain the connection. + // + return; + } + + _callback->setConnection(connection, compress); + _factory->decPendingConnectCount(); // Must be called last. + } + catch(const Ice::LocalException& ex) + { + _callback->setException(ex); + _factory->decPendingConnectCount(); // Must be called last. + } +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector() +{ + Ice::ConnectionIPtr connection; + try + { + assert(_iter != _connectors.end()); + connection = _factory->createConnection(_iter->connector->connect(), *_iter); + connection->start(this); + } + catch(const Ice::LocalException& ex) + { + connectionStartFailed(connection, ex); + } +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::setConnection(const Ice::ConnectionIPtr& connection, + bool compress) +{ + // + // Callback from the factory: the connection to one of the callback + // connectors has been established. + // + _callback->setConnection(connection, compress); + _factory->decPendingConnectCount(); // Must be called last. +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::setException(const Ice::LocalException& ex) +{ + // + // Callback from the factory: connection establishment failed. + // + _callback->setException(ex); + _factory->decPendingConnectCount(); // Must be called last. +} + +bool +IceInternal::OutgoingConnectionFactory::ConnectCallback::hasConnector(const ConnectorInfo& ci) +{ + return find(_connectors.begin(), _connectors.end(), ci) != _connectors.end(); +} + +bool +IceInternal::OutgoingConnectionFactory::ConnectCallback::removeConnectors(const vector<ConnectorInfo>& connectors) +{ + // + // Callback from the factory: connecting to the given connectors + // failed, we remove the connectors and return true if there's + // no more connectors left to try. + // + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + _connectors.erase(remove(_connectors.begin(), _connectors.end(), *p), _connectors.end()); + } + return _connectors.empty(); +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::removeFromPending() +{ + _factory->removeFromPending(this, _connectors); +} + +bool +IceInternal::OutgoingConnectionFactory::ConnectCallback::operator<(const ConnectCallback& rhs) const +{ + return this < &rhs; +} + +void +IceInternal::IncomingConnectionFactory::activate() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateActive); +} + +void +IceInternal::IncomingConnectionFactory::hold() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateHolding); +} + +void +IceInternal::IncomingConnectionFactory::destroy() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed); +} + +void +IceInternal::IncomingConnectionFactory::waitUntilHolding() const +{ + set<ConnectionIPtr> connections; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // First we wait until the connection factory itself is in holding + // state. + // + while(_state < StateHolding) + { + wait(); + } + + // + // We want to wait until all connections are in holding state + // outside the thread synchronization. + // + connections = _connections; + } + + // + // Now we wait until each connection is in holding state. + // + for_each(connections.begin(), connections.end(), Ice::constVoidMemFun(&ConnectionI::waitUntilHolding)); +} + +void +IceInternal::IncomingConnectionFactory::waitUntilFinished() +{ + set<ConnectionIPtr> connections; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // First we wait until the factory is destroyed. If we are using + // an acceptor, we also wait for it to be closed. + // + while(_state != StateFinished) + { + wait(); + } + + // + // Clear the OA. See bug 1673 for the details of why this is necessary. + // + _adapter = 0; + + // We want to wait until all connections are finished outside the + // thread synchronization. + // + connections = _connections; + } + + for_each(connections.begin(), connections.end(), Ice::voidMemFun(&ConnectionI::waitUntilFinished)); + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + // Ensure all the connections are finished and reapable at this point. + vector<Ice::ConnectionIPtr> cons; + _reaper->swapConnections(cons); + assert(cons.size() == _connections.size()); + cons.clear(); + _connections.clear(); + } +} + +EndpointIPtr +IceInternal::IncomingConnectionFactory::endpoint() const +{ + // No mutex protection necessary, _endpoint is immutable. + return _endpoint; +} + +list<ConnectionIPtr> +IceInternal::IncomingConnectionFactory::connections() const +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + list<ConnectionIPtr> result; + + // + // Only copy connections which have not been destroyed. + // + remove_copy_if(_connections.begin(), _connections.end(), back_inserter(result), + not1(Ice::constMemFun(&ConnectionI::isActiveOrHolding))); + + return result; +} + +void +IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync) +{ + list<ConnectionIPtr> c = connections(); // connections() is synchronized, so no need to synchronize here. + + for(list<ConnectionIPtr>::const_iterator p = c.begin(); p != c.end(); ++p) + { + try + { + outAsync->flushConnection(*p); + } + catch(const LocalException&) + { + // Ignore. + } + } +} + +#ifdef ICE_USE_IOCP +bool +IceInternal::IncomingConnectionFactory::startAsync(SocketOperation) +{ + if(_state >= StateClosed) + { + return false; + } + + try + { + _acceptor->startAccept(); + } + catch(const Ice::LocalException& ex) + { + { + Error out(_instance->initializationData().logger); + out << "can't accept connections:\n" << ex << '\n' << _acceptor->toString(); + } + abort(); + } + return true; +} + +bool +IceInternal::IncomingConnectionFactory::finishAsync(SocketOperation) +{ + assert(_acceptor); + try + { + _acceptor->finishAccept(); + } + catch(const LocalException& ex) + { + Error out(_instance->initializationData().logger); + out << "couldn't accept connection:\n" << ex << '\n' << _acceptor->toString(); + return false; + } + return _state < StateClosed; +} +#endif + +void +IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current) +{ + ConnectionIPtr connection; + + ThreadPoolMessage<IncomingConnectionFactory> msg(current, *this); + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + ThreadPoolMessage<IncomingConnectionFactory>::IOScope io(msg); + if(!io) + { + return; + } + + if(_state >= StateClosed) + { + return; + } + else if(_state == StateHolding) + { + IceUtil::ThreadControl::yield(); + return; + } + + // + // Reap closed connections + // + vector<Ice::ConnectionIPtr> cons; + _reaper->swapConnections(cons); + for(vector<Ice::ConnectionIPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p) + { + _connections.erase(*p); + } + + // + // Now accept a new connection. + // + TransceiverPtr transceiver; + try + { + transceiver = _acceptor->accept(); + } + catch(const SocketException& ex) + { + if(noMoreFds(ex.error)) + { + { + Error out(_instance->initializationData().logger); + out << "fatal error: can't accept more connections:\n" << ex << '\n' << _acceptor->toString(); + } + abort(); + } + + // Ignore socket exceptions. + return; + } + catch(const LocalException& ex) + { + // Warn about other Ice local exceptions. + if(_warn) + { + Warning out(_instance->initializationData().logger); + out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + } + return; + } + + assert(transceiver); + + try + { + connection = new ConnectionI(_instance, _reaper, transceiver, 0, _endpoint, _adapter); + } + catch(const LocalException& ex) + { + try + { + transceiver->close(); + } + catch(const Ice::LocalException&) + { + // Ignore. + } + + if(_warn) + { + Warning out(_instance->initializationData().logger); + out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + } + return; + } + + _connections.insert(connection); + } + + assert(connection); + connection->start(this); +} + +void +IceInternal::IncomingConnectionFactory::finished(ThreadPoolCurrent&) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_state == StateClosed); + setState(StateFinished); +} + +string +IceInternal::IncomingConnectionFactory::toString() const +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(_transceiver) + { + return _transceiver->toString(); + } + + assert(_acceptor); + return _acceptor->toString(); +} + +NativeInfoPtr +IceInternal::IncomingConnectionFactory::getNativeInfo() +{ + if(_transceiver) + { + return _transceiver->getNativeInfo(); + } + + assert(_acceptor); + return _acceptor->getNativeInfo(); +} + +void +IceInternal::IncomingConnectionFactory::connectionStartCompleted(const Ice::ConnectionIPtr& connection) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // Initialy, connections are in the holding state. If the factory is active + // we activate the connection. + // + if(_state == StateActive) + { + connection->activate(); + } +} + +void +IceInternal::IncomingConnectionFactory::connectionStartFailed(const Ice::ConnectionIPtr& connection, + const Ice::LocalException& ex) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state >= StateClosed) + { + return; + } + + if(_warn) + { + Warning out(_instance->initializationData().logger); + out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + } +} + +// +// COMPILERFIX: The ConnectionFactory setup is broken out into a separate initialize +// function because when it was part of the constructor C++Builder 2007 apps would +// crash if an execption was thrown from any calls within the constructor. +// +IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const InstancePtr& instance, + const EndpointIPtr& endpoint, + const ObjectAdapterPtr& adapter) : + _instance(instance), + _reaper(new ConnectionReaper()), + _endpoint(endpoint), + _adapter(adapter), + _warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0), + _state(StateHolding) +{ +} + +void +IceInternal::IncomingConnectionFactory::initialize(const string& oaName) +{ + if(_instance->defaultsAndOverrides()->overrideTimeout) + { + const_cast<EndpointIPtr&>(_endpoint) = + _endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); + } + + if(_instance->defaultsAndOverrides()->overrideCompress) + { + const_cast<EndpointIPtr&>(_endpoint) = + _endpoint->compress(_instance->defaultsAndOverrides()->overrideCompressValue); + } + + try + { + const_cast<TransceiverPtr&>(_transceiver) = _endpoint->transceiver(const_cast<EndpointIPtr&>(_endpoint)); + if(_transceiver) + { + ConnectionIPtr connection = new ConnectionI(_instance, _reaper, _transceiver, 0, _endpoint, _adapter); + connection->start(0); + _connections.insert(connection); + } + else + { + const_cast<AcceptorPtr&>(_acceptor) = _endpoint->acceptor(const_cast<EndpointIPtr&>(_endpoint), oaName); + assert(_acceptor); + _acceptor->listen(); + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->initialize(this); + } + } + catch(const Ice::Exception&) + { + if(_transceiver) + { + try + { + _transceiver->close(); + } + catch(const Ice::LocalException&) + { + // Ignore + } + } + + + if(_acceptor) + { + try + { + _acceptor->close(); + } + catch(const Ice::LocalException&) + { + // Ignore + } + } + + _state = StateFinished; + _connections.clear(); + throw; + } +} + +IceInternal::IncomingConnectionFactory::~IncomingConnectionFactory() +{ + assert(_state == StateFinished); + assert(_connections.empty()); +} + +void +IceInternal::IncomingConnectionFactory::setState(State state) +{ + if(_state == state) // Don't switch twice. + { + return; + } + + switch(state) + { + case StateActive: + { + if(_state != StateHolding) // Can only switch from holding to active. + { + return; + } + if(_acceptor) + { + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->_register(this, SocketOperationRead); + } + for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::activate)); + break; + } + + case StateHolding: + { + if(_state != StateActive) // Can only switch from active to holding. + { + return; + } + if(_acceptor) + { + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->unregister(this, SocketOperationRead); + } + for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::hold)); + break; + } + + case StateClosed: + { + if(_acceptor) + { + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->finish(this); + } + else + { + state = StateFinished; + } + +#ifdef ICE_USE_IOCP + // + // With IOCP, we close the acceptor now to cancel all the pending asynchronous + // operations. It's important to wait for the pending asynchronous operations + // to return before ConnectionI::finished(). Otherwise, if there was a pending + // message waiting to be sent, the connection wouldn't know whether or not the + // send failed or succeeded, potentially breaking at-most-once semantics. + // + if(_acceptor) + { + _acceptor->close(); + } +#endif + +#ifdef _STLP_BEGIN_NAMESPACE + // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h + for_each(_connections.begin(), _connections.end(), + voidbind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated)); +#else + for_each(_connections.begin(), _connections.end(), + bind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated)); +#endif + break; + } + + case StateFinished: + { + assert(_state == StateClosed); +#ifndef ICE_USE_IOCP + if(_acceptor) + { + _acceptor->close(); + } +#endif + break; + } + } + + _state = state; + notifyAll(); +} + |