summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp177
1 files changed, 121 insertions, 56 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index aaa99590ff8..e6782af3802 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -199,8 +199,28 @@ Ice::ConnectionI::isDestroyed() const
bool
Ice::ConnectionI::isFinished() const
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- return _transceiver == 0 && _dispatchCount == 0;
+ IceUtil::ThreadPtr threadPerConnection;
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(_transceiver != 0 || _dispatchCount != 0)
+ {
+ return false;
+ }
+
+ assert(_state == StateClosed);
+
+ threadPerConnection = _threadPerConnection;
+ _threadPerConnection = 0;
+ }
+
+ if(threadPerConnection)
+ {
+ threadPerConnection->getThreadControl().join();
+ }
+
+ return true;
}
void
@@ -228,70 +248,82 @@ Ice::ConnectionI::waitUntilHolding() const
void
Ice::ConnectionI::waitUntilFinished()
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // We wait indefinitely until connection closing has been
- // initiated. We also wait indefinitely until all outstanding
- // requests are completed. Otherwise we couldn't guarantee that
- // there are no outstanding calls when deactivate() is called on
- // the servant locators.
- //
- while(_state < StateClosing || _dispatchCount > 0)
+ IceUtil::ThreadPtr threadPerConnection;
+
{
- wait();
- }
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- //
- // Now we must wait until close() has been called on the
- // transceiver.
- //
- while(_transceiver)
- {
- if(_state != StateClosed && _endpoint->timeout() >= 0)
+ //
+ // We wait indefinitely until connection closing has been
+ // initiated. We also wait indefinitely until all outstanding
+ // requests are completed. Otherwise we couldn't guarantee
+ // that there are no outstanding calls when deactivate() is
+ // called on the servant locators.
+ //
+ while(_state < StateClosing || _dispatchCount > 0)
{
- IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout());
- IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now();
-
- if(waitTime > IceUtil::Time())
+ wait();
+ }
+
+ //
+ // Now we must wait until close() has been called on the
+ // transceiver.
+ //
+ while(_transceiver)
+ {
+ if(_state != StateClosed && _endpoint->timeout() >= 0)
{
- //
- // We must wait a bit longer until we close this
- // connection.
- //
- if(!timedWait(waitTime))
+ IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout());
+ IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now();
+
+ if(waitTime > IceUtil::Time())
+ {
+ //
+ // We must wait a bit longer until we close this
+ // connection.
+ //
+ if(!timedWait(waitTime))
+ {
+ setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
+ }
+ }
+ else
{
+ //
+ // We already waited long enough, so let's close this
+ // connection!
+ //
setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
}
+
+ //
+ // No return here, we must still wait until close() is
+ // called on the _transceiver.
+ //
}
else
{
- //
- // We already waited long enough, so let's close this
- // connection!
- //
- setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
+ wait();
}
-
- //
- // No return here, we must still wait until close() is
- // called on the _transceiver.
- //
- }
- else
- {
- wait();
}
- }
- assert(_state == StateClosed);
+ assert(_state == StateClosed);
+
+ threadPerConnection = _threadPerConnection;
+ _threadPerConnection = 0;
+ }
+
+ if(threadPerConnection)
+ {
+ threadPerConnection->getThreadControl().join();
+ }
}
void
Ice::ConnectionI::monitor()
{
IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
-
+
if(!sync.acquired())
{
return;
@@ -1311,9 +1343,8 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
// If we are in thread per connection mode, create the thread
// for this connection.
//
- IceUtil::ThreadPtr thread = new ThreadPerConnection(this);
- thread->start();
- thread->getThreadControl().detach();
+ _threadPerConnection = new ThreadPerConnection(this);
+ _threadPerConnection->start();
}
vector<Byte>& requestHdr = const_cast<vector<Byte>&>(_requestHdr);
@@ -1355,9 +1386,12 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
Ice::ConnectionI::~ConnectionI()
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
assert(_state == StateClosed);
assert(!_transceiver);
assert(_dispatchCount == 0);
+ assert(!_threadPerConnection);
}
void
@@ -2070,7 +2104,9 @@ Ice::ConnectionI::run()
const bool warnUdp = _instance->properties()->getPropertyAsInt("Ice.Warn.Datagrams") > 0;
- while(true)
+ bool closed = false;
+
+ while(!closed)
{
//
// We must accept new connections outside the thread
@@ -2176,6 +2212,11 @@ Ice::ConnectionI::run()
ObjectAdapterPtr adapter;
OutgoingAsyncPtr outAsync;
+ auto_ptr<LocalException> exception;
+
+ map<Int, Outgoing*> requests;
+ map<Int, AsyncRequest> asyncRequests;
+
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -2207,17 +2248,25 @@ Ice::ConnectionI::run()
}
catch(const LocalException& ex)
{
- _transceiver = 0;
- notifyAll();
- ex.ice_throw();
+ exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone()));
}
_transceiver = 0;
notifyAll();
- return;
+
+ closed = true;
+ }
+
+ if(_state == StateClosed || _state == StateClosing)
+ {
+ requests.swap(_requests);
+ _requestsHint = _requests.end();
+
+ asyncRequests.swap(_asyncRequests);
+ _asyncRequestsHint = _asyncRequests.end();
}
}
-
+
//
// Asynchronous replies must be handled outside the thread
// synchronization, so that nested calls are possible.
@@ -2233,6 +2282,22 @@ Ice::ConnectionI::run()
// calls are possible.
//
invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter);
+
+ for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p)
+ {
+ p->second->finished(*_exception.get()); // The exception is immutable at this point.
+ }
+
+ for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q)
+ {
+ q->second.p->__finished(*_exception.get()); // The exception is immutable at this point.
+ }
+
+ if(exception.get())
+ {
+ assert(closed);
+ exception->ice_throw();
+ }
}
}