summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionFactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp97
1 files changed, 67 insertions, 30 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 103296f1fe3..98cd946cf83 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -287,16 +287,17 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpts
connection = new ConnectionI(_instance, transceiver, endpoint, 0);
//
- // In thread per acceptor mode, the thread per connection (in
- // ConnectionI) will take care of connection validation.
+ // In thread per connection mode, the thread per
+ // connection (in ConnectionI) will take care of
+ // connection validation.
//
- if(!_instance->threadPerConnection())
+ if(_instance->threadPerConnection())
{
- connection->validate();
+ connection->waitUntilValidated();
}
else
{
- connection->waitUntilValidated();
+ connection->validate();
}
if(_instance->defaultsAndOverrides()->overrideCompress)
@@ -542,6 +543,7 @@ IceInternal::IncomingConnectionFactory::waitUntilHolding() const
void
IceInternal::IncomingConnectionFactory::waitUntilFinished()
{
+ IceUtil::ThreadPtr threadPerAcceptor;
list<ConnectionIPtr> connections;
{
@@ -555,6 +557,11 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished()
wait();
}
+ assert(_state == StateClosed);
+
+ threadPerAcceptor = _threadPerAcceptor;
+ _threadPerAcceptor = 0;
+
//
// We want to wait until all connections are finished outside the
// thread synchronization.
@@ -562,6 +569,11 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished()
connections.swap(_connections);
}
+ if(threadPerAcceptor)
+ {
+ threadPerAcceptor->getThreadControl().join();
+ }
+
for_each(connections.begin(), connections.end(), Ice::voidMemFun(&ConnectionI::waitUntilFinished));
}
@@ -722,22 +734,24 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
}
return;
}
-
+
+ assert(transceiver);
+
//
// Create a connection object for the connection.
//
- assert(transceiver);
connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter);
_connections.push_back(connection);
}
-
+
+ assert(connection);
+
//
// We validate outside the thread synchronization, to not block
// the factory.
//
try
{
- assert(connection);
connection->validate();
}
catch(const LocalException&)
@@ -819,18 +833,18 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
if(_transceiver)
{
ConnectionIPtr connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter);
-
+
//
- // In thread per acceptor mode, the thread per connection (in
- // ConnectionI) will take care of connection validation.
+ // In thread per connection mode, the thread per connection
+ // (in ConnectionI) will take care of connection validation.
//
- if(!_instance->threadPerConnection())
+ if(_instance->threadPerConnection())
{
- connection->validate();
+ connection->waitUntilValidated();
}
else
{
- connection->waitUntilValidated();
+ connection->validate();
}
_connections.push_back(connection);
@@ -858,18 +872,20 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
// thread per acceptor, that accepts new connections on this
// endpoint.
//
- IceUtil::ThreadPtr thread = new ThreadPerAcceptor(this);
- thread->start();
- thread->getThreadControl().detach();
+ _threadPerAcceptor = new ThreadPerAcceptor(this);
+ _threadPerAcceptor->start();
}
}
}
IceInternal::IncomingConnectionFactory::~IncomingConnectionFactory()
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
assert(_state == StateClosed);
assert(!_acceptor);
assert(_connections.empty());
+ assert(!_threadPerAcceptor);
}
void
@@ -914,10 +930,11 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
if(_instance->threadPerConnection())
{
- // TODO: This is not correct! We cannot close the
- // acceptor before our thread per acceptor terminates.
- _acceptor->close();
- _acceptor = 0;
+ //
+ // Connect to our own acceptor, which unblocks our
+ // thread per acceptor stuch in accept().
+ //
+ _acceptor->connectToSelf();
}
else
{
@@ -994,12 +1011,10 @@ IceInternal::IncomingConnectionFactory::run()
catch(const SocketException&)
{
// Ignore socket exceptions.
- continue;
}
catch(const TimeoutException&)
{
// Ignore timeouts.
- continue;
}
catch(const LocalException& ex)
{
@@ -1009,11 +1024,10 @@ IceInternal::IncomingConnectionFactory::run()
Warning out(_instance->logger());
out << "connection exception:\n" << ex << '\n' << acceptor->toString();
}
- continue;
}
ConnectionIPtr connection;
-
+
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -1024,9 +1038,32 @@ IceInternal::IncomingConnectionFactory::run()
if(_state == StateClosed)
{
- assert(transceiver);
- transceiver->close();
- break;
+ if(transceiver)
+ {
+ try
+ {
+ transceiver->close();
+ }
+ catch(const LocalException&)
+ {
+ // Here we ignore any exceptions in close().
+ }
+ }
+
+ try
+ {
+ _acceptor->close();
+ }
+ catch(const LocalException& ex)
+ {
+ _acceptor = 0;
+ notifyAll();
+ ex.ice_throw();
+ }
+
+ _acceptor = 0;
+ notifyAll();
+ return;
}
assert(_state == StateActive);
@@ -1037,7 +1074,7 @@ IceInternal::IncomingConnectionFactory::run()
_connections.erase(remove_if(_connections.begin(), _connections.end(),
Ice::constMemFun(&ConnectionI::isFinished)),
_connections.end());
-
+
//
// Create a connection object for the connection.
//