diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 210 |
1 files changed, 136 insertions, 74 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 5650b5b0908..9e59b77380c 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -25,6 +25,7 @@ #include <Ice/LocalException.h> #include <Ice/Functional.h> #include <Ice/OutgoingAsync.h> +#include <Ice/CommunicatorI.h> #include <IceUtil/Random.h> #include <iterator> @@ -32,8 +33,8 @@ namespace IceInternal { -bool registerForBackgroundNotification(IceInternal::IncomingConnectionFactory*); -void unregisterForBackgroundNotification(IceInternal::IncomingConnectionFactory*); +bool registerForBackgroundNotification(const IceInternal::IncomingConnectionFactoryPtr&); +void unregisterForBackgroundNotification(const IceInternal::IncomingConnectionFactoryPtr&); } #endif @@ -44,7 +45,10 @@ using namespace Ice::Instrumentation; using namespace IceInternal; IceUtil::Shared* IceInternal::upCast(OutgoingConnectionFactory* p) { return p; } + +#ifndef ICE_CPP11_MAPPING IceUtil::Shared* IceInternal::upCast(IncomingConnectionFactory* p) { return p; } +#endif namespace { @@ -57,6 +61,38 @@ struct RandomNumberGenerator : public std::unary_function<ptrdiff_t, ptrdiff_t> } }; +#ifdef ICE_CPP11_MAPPING +template <typename Map> void +remove(Map& m, const typename Map::key_type& k, const typename Map::mapped_type& v) +{ + auto pr = m.equal_range(k); + assert(pr.first != pr.second); + for(auto q = pr.first; q != pr.second; ++q) + { + if(q->second.get() == v.get()) + { + m.erase(q); + return; + } + } + assert(false); // Nothing was removed which is an error. +} + +template<typename Map, typename Predicate> typename Map::mapped_type +find(const Map& m, const typename Map::key_type& k, Predicate predicate) +{ + auto pr = m.equal_range(k); + for(auto q = pr.first; q != pr.second; ++q) + { + if(predicate(q->second)) + { + return q->second; + } + } + return nullptr; +} + +#else template <typename K, typename V> void remove(multimap<K, V>& m, K k, V v) { @@ -89,6 +125,26 @@ find(const multimap<K,::IceInternal::Handle<V> >& m, } return IceInternal::Handle<V>(); } +#endif + +class StartAcceptor : public IceUtil::TimerTask +{ +public: + + StartAcceptor(const IncomingConnectionFactoryPtr& factory) : _factory(factory) + { + } + + void + runTimerTask() + { + _factory->startAcceptor(); + } + +private: + + IncomingConnectionFactoryPtr _factory; +}; } @@ -168,7 +224,8 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() } void -IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore, +IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, + bool hasMore, Ice::EndpointSelectionType selType, const CreateConnectionCallbackPtr& callback) { @@ -198,7 +255,11 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt return; } +#ifdef ICE_CPP11_MAPPING + auto cb = make_shared<ConnectCallback>(_instance, this, endpoints, hasMore, callback, selType); +#else ConnectCallbackPtr cb = new ConnectCallback(_instance, this, endpoints, hasMore, callback, selType); +#endif cb->getConnectors(); } @@ -276,7 +337,8 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad } void -IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync) +IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync, + Ice::CompressBatch compress) { list<ConnectionIPtr> c; @@ -296,7 +358,7 @@ IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const Communicat { try { - outAsync->flushConnection(*p); + outAsync->flushConnection(*p, compress); } catch(const LocalException&) { @@ -545,8 +607,8 @@ IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& t throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); } - connection = new ConnectionI(_communicator, _instance, _monitor, transceiver, ci.connector, - ci.endpoint->compress(false), 0); + connection = ConnectionI::create(_communicator, _instance, _monitor, transceiver, ci.connector, + ci.endpoint->compress(false), ICE_NULLPTR); } catch(const Ice::LocalException&) { @@ -827,7 +889,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartComplete } connection->activate(); - _factory->finishGetConnection(_connectors, *_iter, connection, this); + _factory->finishGetConnection(_connectors, *_iter, connection, ICE_SHARED_FROM_THIS); } void @@ -920,7 +982,8 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextEndpoint() try { assert(_endpointsIter != _endpoints.end()); - (*_endpointsIter)->connectors_async(_selType, this); + (*_endpointsIter)->connectors_async(_selType, ICE_SHARED_FROM_THIS); + } catch(const Ice::LocalException& ex) { @@ -938,7 +1001,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection() // connection. // bool compress; - Ice::ConnectionIPtr connection = _factory->getConnection(_connectors, this, compress); + Ice::ConnectionIPtr connection = _factory->getConnection(_connectors, ICE_SHARED_FROM_THIS, compress); if(!connection) { // @@ -986,7 +1049,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector() << _iter->connector->toString(); } Ice::ConnectionIPtr connection = _factory->createConnection(_iter->connector->connect(), *_iter); - connection->start(this); + connection->start(ICE_SHARED_FROM_THIS); } catch(const Ice::LocalException& ex) { @@ -1052,7 +1115,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::removeConnectors(const void IceInternal::OutgoingConnectionFactory::ConnectCallback::removeFromPending() { - _factory->removeFromPending(this, _connectors); + _factory->removeFromPending(ICE_SHARED_FROM_THIS, _connectors); } bool @@ -1066,14 +1129,14 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailedIm { if(_observer) { - _observer->failed(ex.ice_name()); + _observer->failed(ex.ice_id()); _observer->detach(); } _factory->handleConnectionException(ex, _hasMore || _iter != _connectors.end() - 1); if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) // No need to continue. { - _factory->finishGetConnection(_connectors, ex, this); + _factory->finishGetConnection(_connectors, ex, ICE_SHARED_FROM_THIS); } else if(++_iter != _connectors.end()) // Try the next connector. { @@ -1081,7 +1144,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailedIm } else { - _factory->finishGetConnection(_connectors, ex, this); + _factory->finishGetConnection(_connectors, ex, ICE_SHARED_FROM_THIS); } return false; } @@ -1216,7 +1279,8 @@ IceInternal::IncomingConnectionFactory::connections() const } void -IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync) +IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync, + Ice::CompressBatch compress) { list<ConnectionIPtr> c = connections(); // connections() is synchronized, so no need to synchronize here. @@ -1224,7 +1288,7 @@ IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const Communicat { try { - outAsync->flushConnection(*p); + outAsync->flushConnection(*p, compress); } catch(const LocalException&) { @@ -1233,7 +1297,7 @@ IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const Communicat } } -#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) +#if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP) bool IceInternal::IncomingConnectionFactory::startAsync(SocketOperation) { @@ -1249,11 +1313,8 @@ IceInternal::IncomingConnectionFactory::startAsync(SocketOperation) } catch(const Ice::LocalException& ex) { - { - Error out(_instance->initializationData().logger); - out << "can't accept connections:\n" << ex << '\n' << _acceptor->toString(); - } - abort(); + ICE_SET_EXCEPTION_FROM_CLONE(_acceptorException, ex.ice_clone()); + _acceptor->getNativeInfo()->completed(SocketOperationRead); } return true; } @@ -1264,13 +1325,22 @@ IceInternal::IncomingConnectionFactory::finishAsync(SocketOperation) assert(_acceptor); try { + if(_acceptorException) + { + _acceptorException->ice_throw(); + } _acceptor->finishAccept(); } catch(const LocalException& ex) { + _acceptorException.reset(ICE_NULLPTR); + Error out(_instance->initializationData().logger); out << "couldn't accept connection:\n" << ex << '\n' << _acceptor->toString(); - return false; + if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true)) + { + closeAcceptor(); + } } return _state < StateClosed; } @@ -1312,7 +1382,7 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current) _connections.erase(*p); } - if(!_acceptor) + if(!_acceptorStarted) { return; } @@ -1335,11 +1405,13 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current) { if(noMoreFds(ex.error)) { + Error out(_instance->initializationData().logger); + out << "can't accept more connections:\n" << ex << '\n' << _acceptor->toString(); + + if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true)) { - Error out(_instance->initializationData().logger); - out << "fatal error: can't accept more connections:\n" << ex << '\n' << _acceptor->toString(); + closeAcceptor(); } - abort(); } // Ignore socket exceptions. @@ -1360,8 +1432,8 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current) try { - connection = new ConnectionI(_adapter->getCommunicator(), _instance, _monitor, transceiver, 0, _endpoint, - _adapter); + connection = ConnectionI::create(_adapter->getCommunicator(), _instance, _monitor, transceiver, 0, + _endpoint, _adapter); } catch(const LocalException& ex) { @@ -1386,41 +1458,34 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current) } assert(connection); - connection->start(this); + + connection->start(ICE_SHARED_FROM_THIS); } void IceInternal::IncomingConnectionFactory::finished(ThreadPoolCurrent&, bool close) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - -#if TARGET_OS_IPHONE != 0 if(_state < StateClosed) { - // - // Finished has been called by stopAcceptor if the state isn't - // closed. - // if(_acceptorStarted && close) { - _acceptorStarted = false; closeAcceptor(); } return; } -#endif assert(_state == StateClosed); setState(StateFinished); - if(close) + if(_acceptorStarted && close) { closeAcceptor(); } #if TARGET_OS_IPHONE != 0 sync.release(); - unregisterForBackgroundNotification(this); + unregisterForBackgroundNotification(ICE_SHARED_FROM_THIS); #endif } @@ -1428,7 +1493,6 @@ string IceInternal::IncomingConnectionFactory::toString() const { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_transceiver) { return _transceiver->toString(); @@ -1501,13 +1565,14 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance _instance(instance), _monitor(new FactoryACMMonitor(instance, dynamic_cast<ObjectAdapterI*>(adapter.get())->getACM())), _endpoint(endpoint), + _acceptorStarted(false), + _acceptorStopped(false), _adapter(adapter), _warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0), _state(StateHolding) { } -#if TARGET_OS_IPHONE != 0 void IceInternal::IncomingConnectionFactory::startAcceptor() { @@ -1517,18 +1582,17 @@ IceInternal::IncomingConnectionFactory::startAcceptor() return; } + _acceptorStopped = false; + try { createAcceptor(); - _acceptorStarted = true; } catch(const Ice::Exception& ex) { - if(_warn) - { - Warning out(_instance->initializationData().logger); - out << "unable to create acceptor:\n" << ex; - } + Error out(_instance->initializationData().logger); + out << "acceptor creation failed:\n" << ex << '\n' << _acceptor->toString(); + _instance->timer()->schedule(ICE_MAKE_SHARED(StartAcceptor, ICE_SHARED_FROM_THIS), IceUtil::Time::seconds(1)); } } @@ -1541,13 +1605,13 @@ IceInternal::IncomingConnectionFactory::stopAcceptor() return; } - if(_adapter->getThreadPool()->finish(this, true)) + _acceptorStopped = true; + + if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true)) { - _acceptorStarted = false; closeAcceptor(); } } -#endif void IceInternal::IncomingConnectionFactory::initialize() @@ -1561,7 +1625,6 @@ IceInternal::IncomingConnectionFactory::initialize() { _endpoint = _endpoint->compress(_instance->defaultsAndOverrides()->overrideCompressValue); } - try { const_cast<TransceiverPtr&>(_transceiver) = _endpoint->transceiver(); @@ -1573,9 +1636,8 @@ IceInternal::IncomingConnectionFactory::initialize() out << "attempting to bind to " << _endpoint->protocol() << " socket\n" << _transceiver->toString(); } const_cast<EndpointIPtr&>(_endpoint) = _transceiver->bind(); - - ConnectionIPtr connection = new ConnectionI(_adapter->getCommunicator(), _instance, 0, _transceiver, 0, - _endpoint, _adapter); + ConnectionIPtr connection(ConnectionI::create(_adapter->getCommunicator(), _instance, 0, _transceiver, 0, + _endpoint, _adapter)); connection->start(0); _connections.insert(connection); } @@ -1586,8 +1648,7 @@ IceInternal::IncomingConnectionFactory::initialize() // The notification center will call back on the factory to // start the acceptor if necessary. // - _acceptorStarted = false; - registerForBackgroundNotification(this); + registerForBackgroundNotification(ICE_SHARED_FROM_THIS); #else createAcceptor(); #endif @@ -1643,7 +1704,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); out << "accepting " << _endpoint->protocol() << " connections at " << _acceptor->toString(); } - _adapter->getThreadPool()->_register(this, SocketOperationRead); + _adapter->getThreadPool()->_register(ICE_SHARED_FROM_THIS, SocketOperationRead); } for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::activate)); break; @@ -1662,7 +1723,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); out << "holding " << _endpoint->protocol() << " connections at " << _acceptor->toString(); } - _adapter->getThreadPool()->unregister(this, SocketOperationRead); + _adapter->getThreadPool()->unregister(ICE_SHARED_FROM_THIS, SocketOperationRead); } for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::hold)); break; @@ -1670,7 +1731,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) case StateClosed: { - if(_acceptor) + if(_acceptorStarted) { // // If possible, close the acceptor now to prevent new connections from @@ -1679,7 +1740,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) // the finish() call. Not all selector implementations do support this // however. // - if(_adapter->getThreadPool()->finish(this, true)) + if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true)) { closeAcceptor(); } @@ -1710,9 +1771,9 @@ IceInternal::IncomingConnectionFactory::createAcceptor() { try { + assert(!_acceptorStarted); _acceptor = _endpoint->acceptor(_adapter->getName()); assert(_acceptor); - if(_instance->traceLevels()->network >= 2) { Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); @@ -1720,26 +1781,25 @@ IceInternal::IncomingConnectionFactory::createAcceptor() } _endpoint = _acceptor->listen(); - if(_instance->traceLevels()->network >= 1) { Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); out << "listening for " << _endpoint->protocol() << " connections\n" << _acceptor->toDetailedString(); } - _adapter->getThreadPool()->initialize(this); - + _adapter->getThreadPool()->initialize(ICE_SHARED_FROM_THIS); if(_state == StateActive) { - _adapter->getThreadPool()->_register(this, SocketOperationRead); + _adapter->getThreadPool()->_register(ICE_SHARED_FROM_THIS, SocketOperationRead); } + + _acceptorStarted = true; } catch(const Ice::Exception&) { if(_acceptor) { _acceptor->close(); - _acceptor = 0; } throw; } @@ -1756,14 +1816,16 @@ IceInternal::IncomingConnectionFactory::closeAcceptor() out << "stopping to accept " << _endpoint->protocol() << " connections at " << _acceptor->toString(); } + _acceptorStarted = false; _acceptor->close(); -#if TARGET_OS_IPHONE != 0 // - // Only clear the acceptor on iOS where it can be destroyed/re-created during the lifetime of the incoming - // connection factory. On other platforms, we keep it set. This is in particular import for IOCP/WinRT where - // finishAsync can be called after the acceptor is closed. + // If the acceptor hasn't been explicitly stopped (which is the case if the acceptor got closed + // because of an unexpected error), try to restart the acceptor in 5 seconds. // - _acceptor = 0; -#endif + if(!_acceptorStopped && (_state == StateHolding || _state == StateActive)) + { + _instance->timer()->schedule(ICE_MAKE_SHARED(StartAcceptor, ICE_SHARED_FROM_THIS), IceUtil::Time::seconds(1)); + } } + |