summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp66
1 files changed, 53 insertions, 13 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 420bdb38427..ca9710cc6b7 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -1294,24 +1294,48 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
void
IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ std::map<Ice::Int, Outgoing*> requests;
+ std::map<Ice::Int, OutgoingAsyncPtr> asyncRequests;
+
+ {
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+
+ threadPool->promoteFollower();
+
+ if(_state == StateActive || _state == StateClosing)
+ {
+ registerWithPool();
+ }
+ else if(_state == StateClosed && _transceiver)
+ {
+ _transceiver->close();
+ {
+ // See _queryMutex comment in header file.
+ IceUtil::Mutex::Lock s(_queryMutex);
+ _transceiver = 0;
+ }
+ _threadPool = 0; // We don't need the thread pool anymore.
+ notifyAll();
+ }
- threadPool->promoteFollower();
+ if(_state == StateClosed || _state == StateClosing)
+ {
+ requests.swap(_requests);
+ _requestsHint = _requests.end();
- if(_state == StateActive || _state == StateClosing)
+ asyncRequests.swap(_asyncRequests);
+ _asyncRequestsHint = _asyncRequests.end();
+ }
+ }
+
+ for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p)
{
- registerWithPool();
+ p->second->finished(*_exception.get()); // Exception is immutable at this point.
}
- else if(_state == StateClosed && _transceiver)
+
+ for(map<Int, OutgoingAsyncPtr>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q)
{
- _transceiver->close();
- {
- // See _queryMutex comment in header file.
- IceUtil::Mutex::Lock s(_queryMutex);
- _transceiver = 0;
- }
- _threadPool = 0; // We don't need the thread pool anymore.
- notifyAll();
+ q->second->__finished(*_exception.get()); // Exception is immutable at this point.
}
}
@@ -1434,6 +1458,12 @@ IceInternal::Connection::~Connection()
void
IceInternal::Connection::setState(State state, const LocalException& ex)
{
+ //
+ // If setState() is called with an exception, then only closed and
+ // closing states are permissible.
+ //
+ assert(state == StateClosing || state == StateClosed);
+
if(_state == state) // Don't switch twice.
{
return;
@@ -1467,6 +1497,15 @@ IceInternal::Connection::setState(State state, const LocalException& ex)
//
setState(state);
+ //
+ // We can't call __finished() on async requests here, because
+ // it's possible that the callback will trigger another call,
+ // which then might block on this connection. Calling
+ // finished() at this place works for non-async calls, but in
+ // order to keep the logic of sending exceptions to requests
+ // in one place, we don't do this here either. Instead, we
+ // move all the code into the finished() operation.
+ /*
for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
{
p->second->finished(*_exception.get());
@@ -1480,6 +1519,7 @@ IceInternal::Connection::setState(State state, const LocalException& ex)
}
_asyncRequests.clear();
_asyncRequestsHint = _asyncRequests.end();
+ */
}
void