summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2004-01-24 14:39:21 +0000
committerMarc Laukien <marc@zeroc.com>2004-01-24 14:39:21 +0000
commita22e2269d6b11b5f4113f5de17083ec3d77a5346 (patch)
treec4b8d01a0ececf74fb3f260d47563580eac9c568 /cpp/src/Ice/Connection.cpp
parentreorganizing transform code (diff)
downloadice-a22e2269d6b11b5f4113f5de17083ec3d77a5346.tar.bz2
ice-a22e2269d6b11b5f4113f5de17083ec3d77a5346.tar.xz
ice-a22e2269d6b11b5f4113f5de17083ec3d77a5346.zip
fixes
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp156
1 files changed, 88 insertions, 68 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index d19493b9ec9..6a2d185c3b3 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -39,7 +39,7 @@ void IceInternal::decRef(Connection* p) { p->__decRef(); }
void
IceInternal::Connection::validate()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_exception.get())
{
@@ -207,21 +207,21 @@ IceInternal::Connection::validate()
void
IceInternal::Connection::activate()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateActive);
}
void
IceInternal::Connection::hold()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateHolding);
}
void
IceInternal::Connection::destroy(DestructionReason reason)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
switch(reason)
{
@@ -242,28 +242,28 @@ IceInternal::Connection::destroy(DestructionReason reason)
bool
IceInternal::Connection::isValidated() const
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
return _state > StateNotValidated;
}
bool
IceInternal::Connection::isDestroyed() const
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
return _state >= StateClosing;
}
bool
IceInternal::Connection::isFinished() const
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
return _threadPool == 0 && _dispatchCount == 0;
}
void
IceInternal::Connection::waitUntilHolding() const
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
while(_state < StateHolding || _dispatchCount > 0)
{
@@ -274,7 +274,7 @@ IceInternal::Connection::waitUntilHolding() const
void
IceInternal::Connection::waitUntilFinished()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
//
// We wait indefinitely until connection closing has been
@@ -337,7 +337,7 @@ void
IceInternal::Connection::monitor()
{
- IceUtil::Monitor<IceUtil::RecMutex>::TryLock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
if(!sync.acquired())
{
return;
@@ -379,7 +379,7 @@ IceInternal::Connection::monitor()
void
IceInternal::Connection::incProxyCount()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_proxyCount >= 0);
++_proxyCount;
}
@@ -387,7 +387,7 @@ IceInternal::Connection::incProxyCount()
void
IceInternal::Connection::decProxyCount()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_proxyCount > 0);
--_proxyCount;
@@ -409,7 +409,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
Int requestId;
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_exception.get())
{
@@ -428,28 +428,30 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
return;
}
}
-
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- requestId = _nextRequestId++;
- if(requestId <= 0)
+ else
{
- _nextRequestId = 1;
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
requestId = _nextRequestId++;
- }
-
- //
- // Only add to the request map if this is a twoway call.
- //
- if(!_endpoint->datagram() && !oneway)
- {
- _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
- }
-
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ //
+ // Only add to the request map if this is a twoway call.
+ //
+ if(!_endpoint->datagram() && !oneway)
+ {
+ _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
}
}
@@ -526,7 +528,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
}
catch(const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateClosed, ex);
assert(_exception.get());
@@ -548,33 +550,49 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
assert(!_endpoint->datagram()); // Async requests are always twoway.
Int requestId;
+ auto_ptr<LocalException> exception;
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_exception.get())
{
- out->__finished(*_exception.get());
- return;
+ //
+ // __finished() is called outside the thread
+ // synchronization. See below.
+ //
+ exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(_exception->ice_clone()));
}
-
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- requestId = _nextRequestId++;
- if(requestId <= 0)
+ else
{
- _nextRequestId = 1;
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
+ pair<const Int, OutgoingAsyncPtr>(requestId, out));
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
}
+ }
- _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
- pair<const Int, OutgoingAsyncPtr>(requestId, out));
-
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
- }
+ //
+ // Exceptions for asynchronous messages must be handled outside
+ // the thread synchronization, so that nested calls are possible.
+ //
+ if(exception.get())
+ {
+ out->__finished(*_exception.get());
+ return;
}
try
@@ -647,7 +665,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
}
catch(const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateClosed, ex);
assert(_exception.get());
}
@@ -727,7 +745,7 @@ void
IceInternal::Connection::flushBatchRequest()
{
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
//
// Wait if flushing is currently in progress.
@@ -836,7 +854,7 @@ IceInternal::Connection::flushBatchRequest()
}
catch(const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateClosed, ex);
assert(_exception.get());
@@ -858,7 +876,7 @@ IceInternal::Connection::flushBatchRequest()
}
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
//
// Reset the batch stream, and notify that flushing is over.
@@ -938,12 +956,12 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag)
}
catch(const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateClosed, ex);
}
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
try
{
@@ -972,7 +990,7 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag)
void
IceInternal::Connection::sendNoResponse()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
try
{
@@ -1009,7 +1027,7 @@ IceInternal::Connection::endpoint() const
void
IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
//
// We never change the thread pool with which we were initially
@@ -1030,7 +1048,7 @@ IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter)
ObjectAdapterPtr
IceInternal::Connection::getAdapter() const
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
return _adapter;
}
@@ -1091,7 +1109,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
}
catch(const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
threadPool->promoteFollower();
setState(StateClosed, ex);
return;
@@ -1105,7 +1123,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
//
if(messageType == closeConnectionMsg)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_state == StateClosed)
{
@@ -1150,7 +1168,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
Int requestId = 0;
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_state == StateClosed)
{
@@ -1377,7 +1395,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
}
catch(const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateClosed, ex);
}
}
@@ -1393,13 +1411,13 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
map<Int, OutgoingAsyncPtr> asyncRequests;
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_state == StateActive || _state == StateClosing)
{
registerWithPool();
}
- else if(_state == StateClosed && _transceiver)
+ else if(_state == StateClosed)
{
try
{
@@ -1410,6 +1428,7 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
closeException = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone()));
}
+ assert(_threadPool);
_threadPool = 0; // We don't need the thread pool anymore.
notifyAll();
}
@@ -1443,14 +1462,14 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
void
IceInternal::Connection::exception(const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateClosed, ex);
}
string
IceInternal::Connection::toString() const
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
return _transceiver->toString();
}
@@ -1714,6 +1733,7 @@ IceInternal::Connection::setState(State state)
// Here we ignore any exceptions in close().
}
+ assert(_threadPool);
_threadPool = 0; // We don't need the thread pool anymore.
}
else