diff options
Diffstat (limited to 'cpp/src/Ice')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 356 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.h | 18 | ||||
-rw-r--r-- | cpp/src/Ice/Reference.cpp | 94 |
3 files changed, 312 insertions, 156 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index a275f61e292..107887e6501 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -55,6 +55,12 @@ IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator<(const Connector return connector < other.connector; } +bool +IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator==(const ConnectorInfo& other) const +{ + return connector == other.connector; +} + void IceInternal::OutgoingConnectionFactory::destroy() { @@ -193,7 +199,8 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // Try to establish the connection to the connectors. // DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); - for(vector<ConnectorInfo>::const_iterator q = connectors.begin(); q != connectors.end(); ++q) + vector<ConnectorInfo>::const_iterator q; + for(q = connectors.begin(); q != connectors.end(); ++q) { try { @@ -209,6 +216,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt compress = q->endpoint->compress(); } + connection->activate(); break; } catch(const Ice::CommunicatorDestroyedException& ex) @@ -230,7 +238,14 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // Finish creating the connection (this removes the connectors from the _pending // list and notifies any waiting threads). // - finishGetConnection(connectors, 0, connection); + if(connection) + { + finishGetConnection(connectors, *q, connection, 0); + } + else + { + finishGetConnection(connectors, *exception.get(), 0); + } if(!connection) { @@ -425,7 +440,7 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) { pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, - multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connectionsByEndpoint.equal_range(*p); + multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connectionsByEndpoint.equal_range(*p); for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = pr.first; q != pr.second; ++q) { @@ -454,8 +469,13 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInf DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) { + if(_pending.find(*p) != _pending.end()) + { + continue; + } + pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator, - multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(*p); + multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(*p); if(pr.first == pr.second) { @@ -520,7 +540,8 @@ IceInternal::OutgoingConnectionFactory::decPendingConnectCount() ConnectionIPtr IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo>& connectors, - const ConnectCallbackPtr& cb, bool& compress) + const ConnectCallbackPtr& cb, + bool& compress) { { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -563,62 +584,27 @@ IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo // finish if one of them is currently establishing a connection to one // of our connectors. // - while(!_destroyed) + while(true) { + if(_destroyed) + { + throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); + } + // // Search for a matching connection. If we find one, we're done. // Ice::ConnectionIPtr connection = findConnection(connectors, compress); if(connection) { - if(cb) - { - // - // This might not be the first getConnection call for the callback. We need - // to ensure that the callback isn't registered with any other pending - // connectors since we just found a connection and therefore don't need to - // wait anymore for other pending connectors. - // - for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) - { - map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); - if(q != _pending.end()) - { - q->second.erase(cb); - } - } - } return connection; } // - // Determine whether another thread is currently attempting to connect to one of our endpoints; - // if so we wait until it's done. + // Determine whether another thread/request is currently attempting to connect to + // one of our endpoints; if so we wait until it's done. // - bool found = false; - for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) - { - map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); - if(q != _pending.end()) - { - found = true; - if(cb) - { - q->second.insert(cb); // Add the callback to each pending connector. - } - } - } - - if(!found) - { - // - // If no thread is currently establishing a connection to one of our connectors, - // we get out of this loop and start the connection establishment to one of the - // given connectors. - // - break; - } - else + if(addToPending(cb, connectors)) { // // If a callback is not specified we wait until another thread notifies us about a @@ -635,23 +621,14 @@ IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo return 0; } } - } - - if(_destroyed) - { - throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); - } - - // - // No connection to any of our endpoints exists yet; we add the given connectors to - // the _pending set to indicate that we're attempting connection establishment to - // these connectors. We might attempt to connect to the same connector multiple times. - // - for(vector<ConnectorInfo>::const_iterator r = connectors.begin(); r != connectors.end(); ++r) - { - if(_pending.find(*r) == _pending.end()) + else { - _pending.insert(pair<ConnectorInfo, set<ConnectCallbackPtr> >(*r, set<ConnectCallbackPtr>())); + // + // If no thread is currently establishing a connection to one of our connectors, + // we get out of this loop and start the connection establishment to one of the + // given connectors. + // + break; } } } @@ -709,47 +686,179 @@ IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& t void IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors, - const ConnectCallbackPtr& cb, - const ConnectionIPtr& connection) + const ConnectorInfo& ci, + const ConnectionIPtr& connection, + const ConnectCallbackPtr& cb) { + set<ConnectCallbackPtr> connectionCallbacks; + if(cb) + { + connectionCallbacks.insert(cb); + } + set<ConnectCallbackPtr> callbacks; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); + if(q != _pending.end()) + { + for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r) + { + if((*r)->hasConnector(ci)) + { + connectionCallbacks.insert(*r); + } + else + { + callbacks.insert(*r); + } + } + _pending.erase(q); + } + } + + set<ConnectCallbackPtr>::iterator r; + for(r = connectionCallbacks.begin(); r != connectionCallbacks.end(); ++r) + { + (*r)->removeFromPending(); + callbacks.erase(*r); + } + for(r = callbacks.begin(); r != callbacks.end(); ++r) + { + (*r)->removeFromPending(); + } + notifyAll(); + } + bool compress; + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + if(defaultsAndOverrides->overrideCompress) + { + compress = defaultsAndOverrides->overrideCompressValue; + } + else + { + compress = ci.endpoint->compress(); + } + + set<ConnectCallbackPtr>::const_iterator p; + for(p = callbacks.begin(); p != callbacks.end(); ++p) + { + (*p)->getConnection(); + } + for(p = connectionCallbacks.begin(); p != connectionCallbacks.end(); ++p) + { + (*p)->setConnection(connection, compress); + } +} + +void +IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors, + const Ice::LocalException& ex, + const ConnectCallbackPtr& cb) +{ + set<ConnectCallbackPtr> failedCallbacks; + if(cb) + { + failedCallbacks.insert(cb); + } + + set<ConnectCallbackPtr> callbacks; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // We're done trying to connect to the given connectors so we remove the - // connectors from the pending list and notify waiting threads. We also - // notify the pending connect callbacks (outside the synchronization). - // - for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) { map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); if(q != _pending.end()) { - callbacks.insert(q->second.begin(), q->second.end()); + for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r) + { + if((*r)->removeConnectors(connectors)) + { + failedCallbacks.insert(*r); + } + else + { + callbacks.insert(*r); + } + } _pending.erase(q); } } + + for(set<ConnectCallbackPtr>::iterator r = callbacks.begin(); r != callbacks.end(); ++r) + { + assert(failedCallbacks.find(*r) == failedCallbacks.end()); + (*r)->removeFromPending(); + } notifyAll(); + } + + set<ConnectCallbackPtr>::const_iterator p; + for(p = callbacks.begin(); p != callbacks.end(); ++p) + { + (*p)->getConnection(); + } + for(p = failedCallbacks.begin(); p != failedCallbacks.end(); ++p) + { + (*p)->setException(ex); + } +} - // - // If the connect attempt succeeded and the communicator is not destroyed, - // activate the connection! - // - if(connection && !_destroyed) +bool +IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& cb, + const vector<ConnectorInfo>& connectors) +{ + // + // Add the callback to each connector pending list. + // + bool found = false; + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); + if(q != _pending.end()) { - connection->activate(); + found = true; + if(cb) + { + q->second.insert(cb); + } } } + if(found) + { + return true; + } + // - // Notify any waiting callbacks. + // If there's no pending connection for the given connectors, we're + // responsible for its establishment. We add empty pending lists, + // other callbacks to the same connectors will be queued. // - for(set<ConnectCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p) + for(vector<ConnectorInfo>::const_iterator r = connectors.begin(); r != connectors.end(); ++r) { - (*p)->getConnection(); + if(_pending.find(*r) == _pending.end()) + { + _pending.insert(pair<ConnectorInfo, set<ConnectCallbackPtr> >(*r, set<ConnectCallbackPtr>())); + } + } + return false; +} + +void +IceInternal::OutgoingConnectionFactory::removeFromPending(const ConnectCallbackPtr& cb, + const vector<ConnectorInfo>& connectors) +{ + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); + if(q != _pending.end()) + { + q->second.erase(cb); + } } } @@ -790,7 +899,7 @@ IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator, - multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(ci); + multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(ci); for(multimap<ConnectorInfo, ConnectionIPtr>::iterator p = pr.first; p != pr.second; ++p) { @@ -802,7 +911,7 @@ IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex } pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, - multimap<EndpointIPtr, ConnectionIPtr>::iterator> qr = _connectionsByEndpoint.equal_range(ci.endpoint); + multimap<EndpointIPtr, ConnectionIPtr>::iterator> qr = _connectionsByEndpoint.equal_range(ci.endpoint); for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = qr.first; q != qr.second; ++q) { @@ -864,20 +973,8 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const O void IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartCompleted(const ConnectionIPtr& connection) { - bool compress; - DefaultsAndOverridesPtr defaultsAndOverrides = _factory->_instance->defaultsAndOverrides(); - if(defaultsAndOverrides->overrideCompress) - { - compress = defaultsAndOverrides->overrideCompressValue; - } - else - { - compress = _iter->endpoint->compress(); - } - - _factory->finishGetConnection(_connectors, this, connection); - _callback->setConnection(connection, compress); - _factory->decPendingConnectCount(); // Must be called last. + connection->activate(); + _factory->finishGetConnection(_connectors, *_iter, connection, this); } void @@ -889,9 +986,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(c _factory->handleException(ex, *_iter, connection, _hasMore || _iter != _connectors.end() - 1); if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) // No need to continue. { - _factory->finishGetConnection(_connectors, this, 0); - _callback->setException(ex); - _factory->decPendingConnectCount(); // Must be called last. + _factory->finishGetConnection(_connectors, ex, this); } else if(++_iter != _connectors.end()) // Try the next connector. { @@ -899,9 +994,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(c } else { - _factory->finishGetConnection(_connectors, this, 0); - _callback->setException(ex); - _factory->decPendingConnectCount(); // Must be called last. + _factory->finishGetConnection(_connectors, ex, this); } } @@ -1016,7 +1109,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection() // A null return value from getConnection indicates that the connection // is being established and that everthing has been done to ensure that // the callback will be notified when the connection establishment is - // done. + // done or that the callback already obtain the connection. // return; } @@ -1050,6 +1143,55 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector() } } +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::setConnection(const Ice::ConnectionIPtr& connection, + bool compress) +{ + // + // Callback from the factory: the connection to one of the callback + // connectors has been established. + // + _callback->setConnection(connection, compress); + _factory->decPendingConnectCount(); // Must be called last. +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::setException(const Ice::LocalException& ex) +{ + // + // Callback from the factory: connection establishment failed. + // + _callback->setException(ex); + _factory->decPendingConnectCount(); // Must be called last. +} + +bool +IceInternal::OutgoingConnectionFactory::ConnectCallback::hasConnector(const ConnectorInfo& ci) +{ + return find(_connectors.begin(), _connectors.end(), ci) != _connectors.end(); +} + +bool +IceInternal::OutgoingConnectionFactory::ConnectCallback::removeConnectors(const vector<ConnectorInfo>& connectors) +{ + // + // Callback from the factory: connecting to the given connectors + // failed, we remove the connectors and return true if there's + // no more connectors left to try. + // + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + _connectors.erase(remove(_connectors.begin(), _connectors.end(), *p), _connectors.end()); + } + return _connectors.empty(); +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::removeFromPending() +{ + _factory->removeFromPending(this, _connectors); +} + bool IceInternal::OutgoingConnectionFactory::ConnectCallback::operator<(const ConnectCallback& rhs) const { diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index 7bd97ab70e1..89ef6bc514b 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -75,6 +75,7 @@ private: } bool operator<(const ConnectorInfo& other) const; + bool operator==(const ConnectorInfo& other) const; ConnectorPtr connector; EndpointIPtr endpoint; @@ -99,8 +100,15 @@ private: void getConnection(); void nextConnector(); + void setConnection(const Ice::ConnectionIPtr&, bool); + void setException(const Ice::LocalException&); + + bool hasConnector(const ConnectorInfo&); + bool removeConnectors(const std::vector<ConnectorInfo>&); + void removeFromPending(); + bool operator<(const ConnectCallback&) const; - + private: const OutgoingConnectionFactoryPtr _factory; @@ -120,7 +128,13 @@ private: void incPendingConnectCount(); void decPendingConnectCount(); Ice::ConnectionIPtr getConnection(const std::vector<ConnectorInfo>&, const ConnectCallbackPtr&, bool&); - void finishGetConnection(const std::vector<ConnectorInfo>&, const ConnectCallbackPtr&, const Ice::ConnectionIPtr&); + void finishGetConnection(const std::vector<ConnectorInfo>&, const ConnectorInfo&, const Ice::ConnectionIPtr&, + const ConnectCallbackPtr&); + void finishGetConnection(const std::vector<ConnectorInfo>&, const Ice::LocalException&, const ConnectCallbackPtr&); + + bool addToPending(const ConnectCallbackPtr&, const std::vector<ConnectorInfo>&); + void removeFromPending(const ConnectCallbackPtr&, const std::vector<ConnectorInfo>&); + Ice::ConnectionIPtr findConnection(const std::vector<ConnectorInfo>&, bool&); Ice::ConnectionIPtr createConnection(const TransceiverPtr&, const ConnectorInfo&); diff --git a/cpp/src/Ice/Reference.cpp b/cpp/src/Ice/Reference.cpp index d3cc01479d6..29c177567c0 100644 --- a/cpp/src/Ice/Reference.cpp +++ b/cpp/src/Ice/Reference.cpp @@ -1678,29 +1678,29 @@ IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& all virtual void setConnection(const Ice::ConnectionIPtr& connection, bool compress) + { + // + // If we have a router, set the object adapter for this router + // (if any) to the new connection, so that callbacks from the + // router can be received over this new connection. + // + if(_routerInfo && _routerInfo->getAdapter()) { - // - // If we have a router, set the object adapter for this router - // (if any) to the new connection, so that callbacks from the - // router can be received over this new connection. - // - if(_routerInfo && _routerInfo->getAdapter()) - { - connection->setAdapter(_routerInfo->getAdapter()); - } - _callback->setConnection(connection, compress); + connection->setAdapter(_routerInfo->getAdapter()); } + _callback->setConnection(connection, compress); + } virtual void setException(const Ice::LocalException& ex) - { - _callback->setException(ex); - } - + { + _callback->setException(ex); + } + CB1(const RouterInfoPtr& routerInfo, const GetConnectionCallbackPtr& callback) : - _routerInfo(routerInfo), _callback(callback) - { - } + _routerInfo(routerInfo), _callback(callback) + { + } private: @@ -1723,49 +1723,49 @@ IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& all virtual void setConnection(const Ice::ConnectionIPtr& connection, bool compress) + { + // + // If we have a router, set the object adapter for this router + // (if any) to the new connection, so that callbacks from the + // router can be received over this new connection. + // + if(_reference->getRouterInfo() && _reference->getRouterInfo()->getAdapter()) { - // - // If we have a router, set the object adapter for this router - // (if any) to the new connection, so that callbacks from the - // router can be received over this new connection. - // - if(_reference->getRouterInfo() && _reference->getRouterInfo()->getAdapter()) - { - connection->setAdapter(_reference->getRouterInfo()->getAdapter()); - } - _callback->setConnection(connection, compress); + connection->setAdapter(_reference->getRouterInfo()->getAdapter()); } + _callback->setConnection(connection, compress); + } virtual void setException(const Ice::LocalException& ex) + { + if(!_exception.get()) { - if(!_exception.get()) - { - _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); - } - - if(++_i == _endpoints.size()) - { - _callback->setException(*_exception.get()); - return; - } - - const bool more = _i != _endpoints.size() - 1; - vector<EndpointIPtr> endpoint; - endpoint.push_back(_endpoints[_i]); - - OutgoingConnectionFactoryPtr factory = _reference->getInstance()->outgoingConnectionFactory(); - factory->create(endpoint, more, _reference->getEndpointSelection(), this); + _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); } - + + if(++_i == _endpoints.size()) + { + _callback->setException(*_exception.get()); + return; + } + + const bool more = _i != _endpoints.size() - 1; + vector<EndpointIPtr> endpoint; + endpoint.push_back(_endpoints[_i]); + + OutgoingConnectionFactoryPtr factory = _reference->getInstance()->outgoingConnectionFactory(); + factory->create(endpoint, more, _reference->getEndpointSelection(), this); + } + CB2(const RoutableReferencePtr& reference, const vector<EndpointIPtr>& endpoints, const GetConnectionCallbackPtr& callback) : _reference(reference), _endpoints(endpoints), _callback(callback), _i(0) - { - } + { + } private: |