summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/config/TestUtil.py4
-rw-r--r--cpp/src/Ice/Acceptor.h1
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp97
-rw-r--r--cpp/src/Ice/ConnectionFactory.h3
-rw-r--r--cpp/src/Ice/ConnectionI.cpp177
-rw-r--r--cpp/src/Ice/ConnectionI.h2
-rw-r--r--cpp/src/Ice/TcpAcceptor.cpp9
-rw-r--r--cpp/src/Ice/TcpAcceptor.h1
-rw-r--r--cpp/src/IceSSL/SslAcceptor.cpp9
-rw-r--r--cpp/src/IceSSL/SslAcceptor.h1
-rw-r--r--cpp/test/Ice/operations/Client.cpp5
11 files changed, 220 insertions, 89 deletions
diff --git a/cpp/config/TestUtil.py b/cpp/config/TestUtil.py
index e70e9888832..52b6c00d380 100644
--- a/cpp/config/TestUtil.py
+++ b/cpp/config/TestUtil.py
@@ -29,8 +29,8 @@ compress = 1
# thread per connection mode.
#
-threadPerConnection = 0
-#threadPerConnection = 1
+#threadPerConnection = 0
+threadPerConnection = 1
#
# If you don't set "host" below, then the Ice library will try to find
diff --git a/cpp/src/Ice/Acceptor.h b/cpp/src/Ice/Acceptor.h
index 05342bb5154..9403208706d 100644
--- a/cpp/src/Ice/Acceptor.h
+++ b/cpp/src/Ice/Acceptor.h
@@ -31,6 +31,7 @@ public:
virtual void close() = 0;
virtual void listen() = 0;
virtual TransceiverPtr accept(int) = 0;
+ virtual void connectToSelf() = 0;
virtual std::string toString() const = 0;
};
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 103296f1fe3..98cd946cf83 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -287,16 +287,17 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpts
connection = new ConnectionI(_instance, transceiver, endpoint, 0);
//
- // In thread per acceptor mode, the thread per connection (in
- // ConnectionI) will take care of connection validation.
+ // In thread per connection mode, the thread per
+ // connection (in ConnectionI) will take care of
+ // connection validation.
//
- if(!_instance->threadPerConnection())
+ if(_instance->threadPerConnection())
{
- connection->validate();
+ connection->waitUntilValidated();
}
else
{
- connection->waitUntilValidated();
+ connection->validate();
}
if(_instance->defaultsAndOverrides()->overrideCompress)
@@ -542,6 +543,7 @@ IceInternal::IncomingConnectionFactory::waitUntilHolding() const
void
IceInternal::IncomingConnectionFactory::waitUntilFinished()
{
+ IceUtil::ThreadPtr threadPerAcceptor;
list<ConnectionIPtr> connections;
{
@@ -555,6 +557,11 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished()
wait();
}
+ assert(_state == StateClosed);
+
+ threadPerAcceptor = _threadPerAcceptor;
+ _threadPerAcceptor = 0;
+
//
// We want to wait until all connections are finished outside the
// thread synchronization.
@@ -562,6 +569,11 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished()
connections.swap(_connections);
}
+ if(threadPerAcceptor)
+ {
+ threadPerAcceptor->getThreadControl().join();
+ }
+
for_each(connections.begin(), connections.end(), Ice::voidMemFun(&ConnectionI::waitUntilFinished));
}
@@ -722,22 +734,24 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
}
return;
}
-
+
+ assert(transceiver);
+
//
// Create a connection object for the connection.
//
- assert(transceiver);
connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter);
_connections.push_back(connection);
}
-
+
+ assert(connection);
+
//
// We validate outside the thread synchronization, to not block
// the factory.
//
try
{
- assert(connection);
connection->validate();
}
catch(const LocalException&)
@@ -819,18 +833,18 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
if(_transceiver)
{
ConnectionIPtr connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter);
-
+
//
- // In thread per acceptor mode, the thread per connection (in
- // ConnectionI) will take care of connection validation.
+ // In thread per connection mode, the thread per connection
+ // (in ConnectionI) will take care of connection validation.
//
- if(!_instance->threadPerConnection())
+ if(_instance->threadPerConnection())
{
- connection->validate();
+ connection->waitUntilValidated();
}
else
{
- connection->waitUntilValidated();
+ connection->validate();
}
_connections.push_back(connection);
@@ -858,18 +872,20 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
// thread per acceptor, that accepts new connections on this
// endpoint.
//
- IceUtil::ThreadPtr thread = new ThreadPerAcceptor(this);
- thread->start();
- thread->getThreadControl().detach();
+ _threadPerAcceptor = new ThreadPerAcceptor(this);
+ _threadPerAcceptor->start();
}
}
}
IceInternal::IncomingConnectionFactory::~IncomingConnectionFactory()
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
assert(_state == StateClosed);
assert(!_acceptor);
assert(_connections.empty());
+ assert(!_threadPerAcceptor);
}
void
@@ -914,10 +930,11 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
if(_instance->threadPerConnection())
{
- // TODO: This is not correct! We cannot close the
- // acceptor before our thread per acceptor terminates.
- _acceptor->close();
- _acceptor = 0;
+ //
+ // Connect to our own acceptor, which unblocks our
+ // thread per acceptor stuch in accept().
+ //
+ _acceptor->connectToSelf();
}
else
{
@@ -994,12 +1011,10 @@ IceInternal::IncomingConnectionFactory::run()
catch(const SocketException&)
{
// Ignore socket exceptions.
- continue;
}
catch(const TimeoutException&)
{
// Ignore timeouts.
- continue;
}
catch(const LocalException& ex)
{
@@ -1009,11 +1024,10 @@ IceInternal::IncomingConnectionFactory::run()
Warning out(_instance->logger());
out << "connection exception:\n" << ex << '\n' << acceptor->toString();
}
- continue;
}
ConnectionIPtr connection;
-
+
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -1024,9 +1038,32 @@ IceInternal::IncomingConnectionFactory::run()
if(_state == StateClosed)
{
- assert(transceiver);
- transceiver->close();
- break;
+ if(transceiver)
+ {
+ try
+ {
+ transceiver->close();
+ }
+ catch(const LocalException&)
+ {
+ // Here we ignore any exceptions in close().
+ }
+ }
+
+ try
+ {
+ _acceptor->close();
+ }
+ catch(const LocalException& ex)
+ {
+ _acceptor = 0;
+ notifyAll();
+ ex.ice_throw();
+ }
+
+ _acceptor = 0;
+ notifyAll();
+ return;
}
assert(_state == StateActive);
@@ -1037,7 +1074,7 @@ IceInternal::IncomingConnectionFactory::run()
_connections.erase(remove_if(_connections.begin(), _connections.end(),
Ice::constMemFun(&ConnectionI::isFinished)),
_connections.end());
-
+
//
// Create a connection object for the connection.
//
diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h
index dc34fab9e80..ff6a4813c17 100644
--- a/cpp/src/Ice/ConnectionFactory.h
+++ b/cpp/src/Ice/ConnectionFactory.h
@@ -119,7 +119,8 @@ private:
IncomingConnectionFactoryPtr _factory;
};
friend class ThreadPerAcceptor;
-
+ IceUtil::ThreadPtr _threadPerAcceptor;
+
AcceptorPtr _acceptor;
const TransceiverPtr _transceiver;
const EndpointPtr _endpoint;
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index aaa99590ff8..e6782af3802 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -199,8 +199,28 @@ Ice::ConnectionI::isDestroyed() const
bool
Ice::ConnectionI::isFinished() const
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- return _transceiver == 0 && _dispatchCount == 0;
+ IceUtil::ThreadPtr threadPerConnection;
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(_transceiver != 0 || _dispatchCount != 0)
+ {
+ return false;
+ }
+
+ assert(_state == StateClosed);
+
+ threadPerConnection = _threadPerConnection;
+ _threadPerConnection = 0;
+ }
+
+ if(threadPerConnection)
+ {
+ threadPerConnection->getThreadControl().join();
+ }
+
+ return true;
}
void
@@ -228,70 +248,82 @@ Ice::ConnectionI::waitUntilHolding() const
void
Ice::ConnectionI::waitUntilFinished()
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // We wait indefinitely until connection closing has been
- // initiated. We also wait indefinitely until all outstanding
- // requests are completed. Otherwise we couldn't guarantee that
- // there are no outstanding calls when deactivate() is called on
- // the servant locators.
- //
- while(_state < StateClosing || _dispatchCount > 0)
+ IceUtil::ThreadPtr threadPerConnection;
+
{
- wait();
- }
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- //
- // Now we must wait until close() has been called on the
- // transceiver.
- //
- while(_transceiver)
- {
- if(_state != StateClosed && _endpoint->timeout() >= 0)
+ //
+ // We wait indefinitely until connection closing has been
+ // initiated. We also wait indefinitely until all outstanding
+ // requests are completed. Otherwise we couldn't guarantee
+ // that there are no outstanding calls when deactivate() is
+ // called on the servant locators.
+ //
+ while(_state < StateClosing || _dispatchCount > 0)
{
- IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout());
- IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now();
-
- if(waitTime > IceUtil::Time())
+ wait();
+ }
+
+ //
+ // Now we must wait until close() has been called on the
+ // transceiver.
+ //
+ while(_transceiver)
+ {
+ if(_state != StateClosed && _endpoint->timeout() >= 0)
{
- //
- // We must wait a bit longer until we close this
- // connection.
- //
- if(!timedWait(waitTime))
+ IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout());
+ IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now();
+
+ if(waitTime > IceUtil::Time())
+ {
+ //
+ // We must wait a bit longer until we close this
+ // connection.
+ //
+ if(!timedWait(waitTime))
+ {
+ setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
+ }
+ }
+ else
{
+ //
+ // We already waited long enough, so let's close this
+ // connection!
+ //
setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
}
+
+ //
+ // No return here, we must still wait until close() is
+ // called on the _transceiver.
+ //
}
else
{
- //
- // We already waited long enough, so let's close this
- // connection!
- //
- setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
+ wait();
}
-
- //
- // No return here, we must still wait until close() is
- // called on the _transceiver.
- //
- }
- else
- {
- wait();
}
- }
- assert(_state == StateClosed);
+ assert(_state == StateClosed);
+
+ threadPerConnection = _threadPerConnection;
+ _threadPerConnection = 0;
+ }
+
+ if(threadPerConnection)
+ {
+ threadPerConnection->getThreadControl().join();
+ }
}
void
Ice::ConnectionI::monitor()
{
IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
-
+
if(!sync.acquired())
{
return;
@@ -1311,9 +1343,8 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
// If we are in thread per connection mode, create the thread
// for this connection.
//
- IceUtil::ThreadPtr thread = new ThreadPerConnection(this);
- thread->start();
- thread->getThreadControl().detach();
+ _threadPerConnection = new ThreadPerConnection(this);
+ _threadPerConnection->start();
}
vector<Byte>& requestHdr = const_cast<vector<Byte>&>(_requestHdr);
@@ -1355,9 +1386,12 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
Ice::ConnectionI::~ConnectionI()
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
assert(_state == StateClosed);
assert(!_transceiver);
assert(_dispatchCount == 0);
+ assert(!_threadPerConnection);
}
void
@@ -2070,7 +2104,9 @@ Ice::ConnectionI::run()
const bool warnUdp = _instance->properties()->getPropertyAsInt("Ice.Warn.Datagrams") > 0;
- while(true)
+ bool closed = false;
+
+ while(!closed)
{
//
// We must accept new connections outside the thread
@@ -2176,6 +2212,11 @@ Ice::ConnectionI::run()
ObjectAdapterPtr adapter;
OutgoingAsyncPtr outAsync;
+ auto_ptr<LocalException> exception;
+
+ map<Int, Outgoing*> requests;
+ map<Int, AsyncRequest> asyncRequests;
+
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -2207,17 +2248,25 @@ Ice::ConnectionI::run()
}
catch(const LocalException& ex)
{
- _transceiver = 0;
- notifyAll();
- ex.ice_throw();
+ exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone()));
}
_transceiver = 0;
notifyAll();
- return;
+
+ closed = true;
+ }
+
+ if(_state == StateClosed || _state == StateClosing)
+ {
+ requests.swap(_requests);
+ _requestsHint = _requests.end();
+
+ asyncRequests.swap(_asyncRequests);
+ _asyncRequestsHint = _asyncRequests.end();
}
}
-
+
//
// Asynchronous replies must be handled outside the thread
// synchronization, so that nested calls are possible.
@@ -2233,6 +2282,22 @@ Ice::ConnectionI::run()
// calls are possible.
//
invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter);
+
+ for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p)
+ {
+ p->second->finished(*_exception.get()); // The exception is immutable at this point.
+ }
+
+ for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q)
+ {
+ q->second.p->__finished(*_exception.get()); // The exception is immutable at this point.
+ }
+
+ if(exception.get())
+ {
+ assert(closed);
+ exception->ice_throw();
+ }
}
}
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 9e6b462b135..725daf6f9bc 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -147,6 +147,8 @@ private:
ConnectionIPtr _connection;
};
friend class ThreadPerConnection;
+ // Defined as mutable because "isFinished() const" sets this to 0.
+ mutable IceUtil::ThreadPtr _threadPerConnection;
IceInternal::TransceiverPtr _transceiver;
const std::string _desc;
diff --git a/cpp/src/Ice/TcpAcceptor.cpp b/cpp/src/Ice/TcpAcceptor.cpp
index 8cc3feae491..3493fc3998d 100644
--- a/cpp/src/Ice/TcpAcceptor.cpp
+++ b/cpp/src/Ice/TcpAcceptor.cpp
@@ -74,6 +74,15 @@ IceInternal::TcpAcceptor::accept(int timeout)
return new TcpTransceiver(_instance, fd);
}
+void
+IceInternal::TcpAcceptor::connectToSelf()
+{
+ SOCKET fd = createSocket(false);
+ setBlock(fd, false);
+ doConnect(fd, _addr, -1);
+ closeSocket(fd);
+}
+
string
IceInternal::TcpAcceptor::toString() const
{
diff --git a/cpp/src/Ice/TcpAcceptor.h b/cpp/src/Ice/TcpAcceptor.h
index 8526e112b0b..a128aba529a 100644
--- a/cpp/src/Ice/TcpAcceptor.h
+++ b/cpp/src/Ice/TcpAcceptor.h
@@ -33,6 +33,7 @@ public:
virtual void close();
virtual void listen();
virtual TransceiverPtr accept(int);
+ virtual void connectToSelf();
virtual std::string toString() const;
bool equivalent(const std::string&, int) const;
diff --git a/cpp/src/IceSSL/SslAcceptor.cpp b/cpp/src/IceSSL/SslAcceptor.cpp
index da0fbae72bf..a72e2f6e00f 100644
--- a/cpp/src/IceSSL/SslAcceptor.cpp
+++ b/cpp/src/IceSSL/SslAcceptor.cpp
@@ -74,6 +74,15 @@ IceSSL::SslAcceptor::accept(int timeout)
return _plugin->createTransceiver(IceSSL::Server, fd, timeout);
}
+void
+IceSSL::SslAcceptor::connectToSelf()
+{
+ SOCKET fd = createSocket(false);
+ setBlock(fd, false);
+ doConnect(fd, _addr, -1);
+ closeSocket(fd);
+}
+
string
IceSSL::SslAcceptor::toString() const
{
diff --git a/cpp/src/IceSSL/SslAcceptor.h b/cpp/src/IceSSL/SslAcceptor.h
index 9cc3e08cc18..47f7f52fd5c 100644
--- a/cpp/src/IceSSL/SslAcceptor.h
+++ b/cpp/src/IceSSL/SslAcceptor.h
@@ -33,6 +33,7 @@ public:
virtual void close();
virtual void listen();
virtual IceInternal::TransceiverPtr accept(int);
+ virtual void connectToSelf();
virtual std::string toString() const;
bool equivalent(const std::string&, int) const;
diff --git a/cpp/test/Ice/operations/Client.cpp b/cpp/test/Ice/operations/Client.cpp
index d43deacf936..c1d4e1d58e7 100644
--- a/cpp/test/Ice/operations/Client.cpp
+++ b/cpp/test/Ice/operations/Client.cpp
@@ -20,14 +20,19 @@ run(int argc, char* argv[], const Ice::CommunicatorPtr& communicator)
Test::MyClassPrx myClass = allTests(communicator, false);
cout << "testing server shutdown... " << flush;
+ cout << "--- 1 ---" << endl;
myClass->shutdown();
+ cout << "--- 2 ---" << endl;
try
{
+ cout << "--- 3 ---" << endl;
myClass->opVoid();
+ cout << "--- 3.5 ---" << endl;
test(false);
}
catch(const Ice::LocalException&)
{
+ cout << "--- 4 ---" << endl;
cout << "ok" << endl;
}