summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionFactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp210
1 files changed, 136 insertions, 74 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 5650b5b0908..9e59b77380c 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -25,6 +25,7 @@
#include <Ice/LocalException.h>
#include <Ice/Functional.h>
#include <Ice/OutgoingAsync.h>
+#include <Ice/CommunicatorI.h>
#include <IceUtil/Random.h>
#include <iterator>
@@ -32,8 +33,8 @@
namespace IceInternal
{
-bool registerForBackgroundNotification(IceInternal::IncomingConnectionFactory*);
-void unregisterForBackgroundNotification(IceInternal::IncomingConnectionFactory*);
+bool registerForBackgroundNotification(const IceInternal::IncomingConnectionFactoryPtr&);
+void unregisterForBackgroundNotification(const IceInternal::IncomingConnectionFactoryPtr&);
}
#endif
@@ -44,7 +45,10 @@ using namespace Ice::Instrumentation;
using namespace IceInternal;
IceUtil::Shared* IceInternal::upCast(OutgoingConnectionFactory* p) { return p; }
+
+#ifndef ICE_CPP11_MAPPING
IceUtil::Shared* IceInternal::upCast(IncomingConnectionFactory* p) { return p; }
+#endif
namespace
{
@@ -57,6 +61,38 @@ struct RandomNumberGenerator : public std::unary_function<ptrdiff_t, ptrdiff_t>
}
};
+#ifdef ICE_CPP11_MAPPING
+template <typename Map> void
+remove(Map& m, const typename Map::key_type& k, const typename Map::mapped_type& v)
+{
+ auto pr = m.equal_range(k);
+ assert(pr.first != pr.second);
+ for(auto q = pr.first; q != pr.second; ++q)
+ {
+ if(q->second.get() == v.get())
+ {
+ m.erase(q);
+ return;
+ }
+ }
+ assert(false); // Nothing was removed which is an error.
+}
+
+template<typename Map, typename Predicate> typename Map::mapped_type
+find(const Map& m, const typename Map::key_type& k, Predicate predicate)
+{
+ auto pr = m.equal_range(k);
+ for(auto q = pr.first; q != pr.second; ++q)
+ {
+ if(predicate(q->second))
+ {
+ return q->second;
+ }
+ }
+ return nullptr;
+}
+
+#else
template <typename K, typename V> void
remove(multimap<K, V>& m, K k, V v)
{
@@ -89,6 +125,26 @@ find(const multimap<K,::IceInternal::Handle<V> >& m,
}
return IceInternal::Handle<V>();
}
+#endif
+
+class StartAcceptor : public IceUtil::TimerTask
+{
+public:
+
+ StartAcceptor(const IncomingConnectionFactoryPtr& factory) : _factory(factory)
+ {
+ }
+
+ void
+ runTimerTask()
+ {
+ _factory->startAcceptor();
+ }
+
+private:
+
+ IncomingConnectionFactoryPtr _factory;
+};
}
@@ -168,7 +224,8 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
}
void
-IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore,
+IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts,
+ bool hasMore,
Ice::EndpointSelectionType selType,
const CreateConnectionCallbackPtr& callback)
{
@@ -198,7 +255,11 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
return;
}
+#ifdef ICE_CPP11_MAPPING
+ auto cb = make_shared<ConnectCallback>(_instance, this, endpoints, hasMore, callback, selType);
+#else
ConnectCallbackPtr cb = new ConnectCallback(_instance, this, endpoints, hasMore, callback, selType);
+#endif
cb->getConnectors();
}
@@ -276,7 +337,8 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad
}
void
-IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync)
+IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync,
+ Ice::CompressBatch compress)
{
list<ConnectionIPtr> c;
@@ -296,7 +358,7 @@ IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const Communicat
{
try
{
- outAsync->flushConnection(*p);
+ outAsync->flushConnection(*p, compress);
}
catch(const LocalException&)
{
@@ -545,8 +607,8 @@ IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& t
throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
}
- connection = new ConnectionI(_communicator, _instance, _monitor, transceiver, ci.connector,
- ci.endpoint->compress(false), 0);
+ connection = ConnectionI::create(_communicator, _instance, _monitor, transceiver, ci.connector,
+ ci.endpoint->compress(false), ICE_NULLPTR);
}
catch(const Ice::LocalException&)
{
@@ -827,7 +889,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartComplete
}
connection->activate();
- _factory->finishGetConnection(_connectors, *_iter, connection, this);
+ _factory->finishGetConnection(_connectors, *_iter, connection, ICE_SHARED_FROM_THIS);
}
void
@@ -920,7 +982,8 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextEndpoint()
try
{
assert(_endpointsIter != _endpoints.end());
- (*_endpointsIter)->connectors_async(_selType, this);
+ (*_endpointsIter)->connectors_async(_selType, ICE_SHARED_FROM_THIS);
+
}
catch(const Ice::LocalException& ex)
{
@@ -938,7 +1001,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection()
// connection.
//
bool compress;
- Ice::ConnectionIPtr connection = _factory->getConnection(_connectors, this, compress);
+ Ice::ConnectionIPtr connection = _factory->getConnection(_connectors, ICE_SHARED_FROM_THIS, compress);
if(!connection)
{
//
@@ -986,7 +1049,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector()
<< _iter->connector->toString();
}
Ice::ConnectionIPtr connection = _factory->createConnection(_iter->connector->connect(), *_iter);
- connection->start(this);
+ connection->start(ICE_SHARED_FROM_THIS);
}
catch(const Ice::LocalException& ex)
{
@@ -1052,7 +1115,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::removeConnectors(const
void
IceInternal::OutgoingConnectionFactory::ConnectCallback::removeFromPending()
{
- _factory->removeFromPending(this, _connectors);
+ _factory->removeFromPending(ICE_SHARED_FROM_THIS, _connectors);
}
bool
@@ -1066,14 +1129,14 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailedIm
{
if(_observer)
{
- _observer->failed(ex.ice_name());
+ _observer->failed(ex.ice_id());
_observer->detach();
}
_factory->handleConnectionException(ex, _hasMore || _iter != _connectors.end() - 1);
if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) // No need to continue.
{
- _factory->finishGetConnection(_connectors, ex, this);
+ _factory->finishGetConnection(_connectors, ex, ICE_SHARED_FROM_THIS);
}
else if(++_iter != _connectors.end()) // Try the next connector.
{
@@ -1081,7 +1144,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailedIm
}
else
{
- _factory->finishGetConnection(_connectors, ex, this);
+ _factory->finishGetConnection(_connectors, ex, ICE_SHARED_FROM_THIS);
}
return false;
}
@@ -1216,7 +1279,8 @@ IceInternal::IncomingConnectionFactory::connections() const
}
void
-IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync)
+IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync,
+ Ice::CompressBatch compress)
{
list<ConnectionIPtr> c = connections(); // connections() is synchronized, so no need to synchronize here.
@@ -1224,7 +1288,7 @@ IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const Communicat
{
try
{
- outAsync->flushConnection(*p);
+ outAsync->flushConnection(*p, compress);
}
catch(const LocalException&)
{
@@ -1233,7 +1297,7 @@ IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const Communicat
}
}
-#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
bool
IceInternal::IncomingConnectionFactory::startAsync(SocketOperation)
{
@@ -1249,11 +1313,8 @@ IceInternal::IncomingConnectionFactory::startAsync(SocketOperation)
}
catch(const Ice::LocalException& ex)
{
- {
- Error out(_instance->initializationData().logger);
- out << "can't accept connections:\n" << ex << '\n' << _acceptor->toString();
- }
- abort();
+ ICE_SET_EXCEPTION_FROM_CLONE(_acceptorException, ex.ice_clone());
+ _acceptor->getNativeInfo()->completed(SocketOperationRead);
}
return true;
}
@@ -1264,13 +1325,22 @@ IceInternal::IncomingConnectionFactory::finishAsync(SocketOperation)
assert(_acceptor);
try
{
+ if(_acceptorException)
+ {
+ _acceptorException->ice_throw();
+ }
_acceptor->finishAccept();
}
catch(const LocalException& ex)
{
+ _acceptorException.reset(ICE_NULLPTR);
+
Error out(_instance->initializationData().logger);
out << "couldn't accept connection:\n" << ex << '\n' << _acceptor->toString();
- return false;
+ if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true))
+ {
+ closeAcceptor();
+ }
}
return _state < StateClosed;
}
@@ -1312,7 +1382,7 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current)
_connections.erase(*p);
}
- if(!_acceptor)
+ if(!_acceptorStarted)
{
return;
}
@@ -1335,11 +1405,13 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current)
{
if(noMoreFds(ex.error))
{
+ Error out(_instance->initializationData().logger);
+ out << "can't accept more connections:\n" << ex << '\n' << _acceptor->toString();
+
+ if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true))
{
- Error out(_instance->initializationData().logger);
- out << "fatal error: can't accept more connections:\n" << ex << '\n' << _acceptor->toString();
+ closeAcceptor();
}
- abort();
}
// Ignore socket exceptions.
@@ -1360,8 +1432,8 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current)
try
{
- connection = new ConnectionI(_adapter->getCommunicator(), _instance, _monitor, transceiver, 0, _endpoint,
- _adapter);
+ connection = ConnectionI::create(_adapter->getCommunicator(), _instance, _monitor, transceiver, 0,
+ _endpoint, _adapter);
}
catch(const LocalException& ex)
{
@@ -1386,41 +1458,34 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current)
}
assert(connection);
- connection->start(this);
+
+ connection->start(ICE_SHARED_FROM_THIS);
}
void
IceInternal::IncomingConnectionFactory::finished(ThreadPoolCurrent&, bool close)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
-#if TARGET_OS_IPHONE != 0
if(_state < StateClosed)
{
- //
- // Finished has been called by stopAcceptor if the state isn't
- // closed.
- //
if(_acceptorStarted && close)
{
- _acceptorStarted = false;
closeAcceptor();
}
return;
}
-#endif
assert(_state == StateClosed);
setState(StateFinished);
- if(close)
+ if(_acceptorStarted && close)
{
closeAcceptor();
}
#if TARGET_OS_IPHONE != 0
sync.release();
- unregisterForBackgroundNotification(this);
+ unregisterForBackgroundNotification(ICE_SHARED_FROM_THIS);
#endif
}
@@ -1428,7 +1493,6 @@ string
IceInternal::IncomingConnectionFactory::toString() const
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
if(_transceiver)
{
return _transceiver->toString();
@@ -1501,13 +1565,14 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
_instance(instance),
_monitor(new FactoryACMMonitor(instance, dynamic_cast<ObjectAdapterI*>(adapter.get())->getACM())),
_endpoint(endpoint),
+ _acceptorStarted(false),
+ _acceptorStopped(false),
_adapter(adapter),
_warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0),
_state(StateHolding)
{
}
-#if TARGET_OS_IPHONE != 0
void
IceInternal::IncomingConnectionFactory::startAcceptor()
{
@@ -1517,18 +1582,17 @@ IceInternal::IncomingConnectionFactory::startAcceptor()
return;
}
+ _acceptorStopped = false;
+
try
{
createAcceptor();
- _acceptorStarted = true;
}
catch(const Ice::Exception& ex)
{
- if(_warn)
- {
- Warning out(_instance->initializationData().logger);
- out << "unable to create acceptor:\n" << ex;
- }
+ Error out(_instance->initializationData().logger);
+ out << "acceptor creation failed:\n" << ex << '\n' << _acceptor->toString();
+ _instance->timer()->schedule(ICE_MAKE_SHARED(StartAcceptor, ICE_SHARED_FROM_THIS), IceUtil::Time::seconds(1));
}
}
@@ -1541,13 +1605,13 @@ IceInternal::IncomingConnectionFactory::stopAcceptor()
return;
}
- if(_adapter->getThreadPool()->finish(this, true))
+ _acceptorStopped = true;
+
+ if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true))
{
- _acceptorStarted = false;
closeAcceptor();
}
}
-#endif
void
IceInternal::IncomingConnectionFactory::initialize()
@@ -1561,7 +1625,6 @@ IceInternal::IncomingConnectionFactory::initialize()
{
_endpoint = _endpoint->compress(_instance->defaultsAndOverrides()->overrideCompressValue);
}
-
try
{
const_cast<TransceiverPtr&>(_transceiver) = _endpoint->transceiver();
@@ -1573,9 +1636,8 @@ IceInternal::IncomingConnectionFactory::initialize()
out << "attempting to bind to " << _endpoint->protocol() << " socket\n" << _transceiver->toString();
}
const_cast<EndpointIPtr&>(_endpoint) = _transceiver->bind();
-
- ConnectionIPtr connection = new ConnectionI(_adapter->getCommunicator(), _instance, 0, _transceiver, 0,
- _endpoint, _adapter);
+ ConnectionIPtr connection(ConnectionI::create(_adapter->getCommunicator(), _instance, 0, _transceiver, 0,
+ _endpoint, _adapter));
connection->start(0);
_connections.insert(connection);
}
@@ -1586,8 +1648,7 @@ IceInternal::IncomingConnectionFactory::initialize()
// The notification center will call back on the factory to
// start the acceptor if necessary.
//
- _acceptorStarted = false;
- registerForBackgroundNotification(this);
+ registerForBackgroundNotification(ICE_SHARED_FROM_THIS);
#else
createAcceptor();
#endif
@@ -1643,7 +1704,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
out << "accepting " << _endpoint->protocol() << " connections at " << _acceptor->toString();
}
- _adapter->getThreadPool()->_register(this, SocketOperationRead);
+ _adapter->getThreadPool()->_register(ICE_SHARED_FROM_THIS, SocketOperationRead);
}
for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::activate));
break;
@@ -1662,7 +1723,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
out << "holding " << _endpoint->protocol() << " connections at " << _acceptor->toString();
}
- _adapter->getThreadPool()->unregister(this, SocketOperationRead);
+ _adapter->getThreadPool()->unregister(ICE_SHARED_FROM_THIS, SocketOperationRead);
}
for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::hold));
break;
@@ -1670,7 +1731,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
case StateClosed:
{
- if(_acceptor)
+ if(_acceptorStarted)
{
//
// If possible, close the acceptor now to prevent new connections from
@@ -1679,7 +1740,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
// the finish() call. Not all selector implementations do support this
// however.
//
- if(_adapter->getThreadPool()->finish(this, true))
+ if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true))
{
closeAcceptor();
}
@@ -1710,9 +1771,9 @@ IceInternal::IncomingConnectionFactory::createAcceptor()
{
try
{
+ assert(!_acceptorStarted);
_acceptor = _endpoint->acceptor(_adapter->getName());
assert(_acceptor);
-
if(_instance->traceLevels()->network >= 2)
{
Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
@@ -1720,26 +1781,25 @@ IceInternal::IncomingConnectionFactory::createAcceptor()
}
_endpoint = _acceptor->listen();
-
if(_instance->traceLevels()->network >= 1)
{
Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
out << "listening for " << _endpoint->protocol() << " connections\n" << _acceptor->toDetailedString();
}
- _adapter->getThreadPool()->initialize(this);
-
+ _adapter->getThreadPool()->initialize(ICE_SHARED_FROM_THIS);
if(_state == StateActive)
{
- _adapter->getThreadPool()->_register(this, SocketOperationRead);
+ _adapter->getThreadPool()->_register(ICE_SHARED_FROM_THIS, SocketOperationRead);
}
+
+ _acceptorStarted = true;
}
catch(const Ice::Exception&)
{
if(_acceptor)
{
_acceptor->close();
- _acceptor = 0;
}
throw;
}
@@ -1756,14 +1816,16 @@ IceInternal::IncomingConnectionFactory::closeAcceptor()
out << "stopping to accept " << _endpoint->protocol() << " connections at " << _acceptor->toString();
}
+ _acceptorStarted = false;
_acceptor->close();
-#if TARGET_OS_IPHONE != 0
//
- // Only clear the acceptor on iOS where it can be destroyed/re-created during the lifetime of the incoming
- // connection factory. On other platforms, we keep it set. This is in particular import for IOCP/WinRT where
- // finishAsync can be called after the acceptor is closed.
+ // If the acceptor hasn't been explicitly stopped (which is the case if the acceptor got closed
+ // because of an unexpected error), try to restart the acceptor in 5 seconds.
//
- _acceptor = 0;
-#endif
+ if(!_acceptorStopped && (_state == StateHolding || _state == StateActive))
+ {
+ _instance->timer()->schedule(ICE_MAKE_SHARED(StartAcceptor, ICE_SHARED_FROM_THIS), IceUtil::Time::seconds(1));
+ }
}
+