diff options
author | Dwayne Boone <dwayne@zeroc.com> | 2006-02-21 16:54:03 +0000 |
---|---|---|
committer | Dwayne Boone <dwayne@zeroc.com> | 2006-02-21 16:54:03 +0000 |
commit | 77f1dc4b62c2d50040ecf8a76dfce1d979c9d15a (patch) | |
tree | cf4408e91e0f56bef835a1dc07b40ad27904b017 /cppe/src/IceE/Connection.cpp | |
parent | Removed OutgoingM (diff) | |
download | ice-77f1dc4b62c2d50040ecf8a76dfce1d979c9d15a.tar.bz2 ice-77f1dc4b62c2d50040ecf8a76dfce1d979c9d15a.tar.xz ice-77f1dc4b62c2d50040ecf8a76dfce1d979c9d15a.zip |
Removed OutgoingM
Diffstat (limited to 'cppe/src/IceE/Connection.cpp')
-rwxr-xr-x | cppe/src/IceE/Connection.cpp | 170 |
1 files changed, 120 insertions, 50 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp index c4e57f4644a..89ac9eda507 100755 --- a/cppe/src/IceE/Connection.cpp +++ b/cppe/src/IceE/Connection.cpp @@ -28,6 +28,8 @@ # include <IceE/Incoming.h> #endif +#include <iostream> + using namespace std; using namespace Ice; using namespace IceInternal; @@ -357,28 +359,27 @@ Ice::Connection::sendRequest(BasicStream* os) #ifdef ICEE_BLOCKING_CLIENT void -Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing* out) +Ice::Connection::sendBlockingRequest(BasicStream* os, Outgoing* out) { Int requestId; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_exception.get()) - { - _exception->ice_throw(); - } + if(_exception.get()) + { + _exception->ice_throw(); + } - assert(_state > StateNotValidated); - assert(_state < StateClosing); + assert(_state > StateNotValidated); + assert(_state < StateClosing); - // - // Fill in request id if it is a twoway call. - // - if(out) - { - requestId = fillRequestId(os); - } + // + // Fill in request id if it is a twoway call. + // + if(out) + { + requestId = fillRequestId(os); + } } try @@ -389,13 +390,14 @@ Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing* if(out) { - readStream(*is); + os->reset(); + readStream(*os); } } if(out) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_state != StateClosed) { @@ -404,9 +406,9 @@ Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing* ServantManager* servantManager; ObjectAdapter* adapter; - parseMessage(*is, requestId, invokeNum, servantManager, adapter); + parseMessage(*os, requestId, invokeNum, servantManager, adapter); #else - parseMessage(*is, requestId); + parseMessage(*os, requestId); #endif } @@ -431,7 +433,7 @@ Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing* } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateClosed, ex); assert(_exception.get()); _exception->ice_throw(); @@ -443,43 +445,84 @@ Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing* #ifndef ICEE_PURE_BLOCKING_CLIENT void -Ice::Connection::sendRequest(BasicStream* os, OutgoingM* out) +Ice::Connection::sendRequest(BasicStream* os, Outgoing* out) { Int requestId; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_exception.get()) - { - _exception->ice_throw(); - } + if(_exception.get()) + { + _exception->ice_throw(); + } - assert(_state > StateNotValidated); - assert(_state < StateClosing); + assert(_state > StateNotValidated); + assert(_state < StateClosing); - // - // Only add to the request map if this is a twoway call. - // - if(out) - { - requestId = fillRequestId(os); + // + // Only add to the request map if this is a twoway call. + // + if(out) + { + requestId = fillRequestId(os); - // - // Add to the requests map. - // - _requestsHint = _requests.insert(_requests.end(), pair<const Int, OutgoingM*>(requestId, out)); - } + // + // Add to the requests map. + // + _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); + } } + bool timedOut = false; try { - IceUtil::Mutex::Lock sendSync(_sendMutex); - sendRequest(os); + { + IceUtil::Mutex::Lock sendSync(_sendMutex); + sendRequest(os); + } + + if(out) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // Wait until the request has completed, or until the + // request times out. + // + Int tout = timeout(); + IceUtil::Time expireTime; + if(tout >= 0) + { + expireTime = IceUtil::Time::now() + IceUtil::Time::milliSeconds(tout); + } + while(out->state() == Outgoing::StateInProgress && !timedOut) + { + if(tout >= 0) + { + IceUtil::Time now = IceUtil::Time::now(); + if(now < expireTime) + { + timedWait(expireTime - now); + } + + // + // Make sure we woke up because of timeout and not another response. + // + if(out->state() == Outgoing::StateInProgress && IceUtil::Time::now() > expireTime) + { + timedOut = true; + } + } + else + { + wait(); + } + } + } } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateClosed, ex); assert(_exception.get()); @@ -499,11 +542,11 @@ Ice::Connection::sendRequest(BasicStream* os, OutgoingM* out) // very elaborate and complex design, which would be bad // for performance. // - map<Int, OutgoingM*>::iterator p = _requests.find(requestId); + map<Int, Outgoing*>::iterator p = _requests.find(requestId); if(p != _requests.end()) { if(p == _requestsHint) - { + { _requests.erase(p++); _requestsHint = p; } @@ -520,6 +563,26 @@ Ice::Connection::sendRequest(BasicStream* os, OutgoingM* out) _exception->ice_throw(); } } + + if(timedOut) + { + // + // Must be called outside the synchronization of this + // object. + // + exception(TimeoutException(__FILE__, __LINE__)); + + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // We must wait until the exception has propagted + // back to the Outgoing object. + // + while(out->state() == Outgoing::StateInProgress) + { + wait(); + } + } } #endif @@ -1575,7 +1638,7 @@ Ice::Connection::parseMessage(BasicStream& stream, Int& requestId stream.read(requestId); - map<Int, OutgoingM*>::iterator p = _requests.end(); + map<Int, Outgoing*>::iterator p = _requests.end(); if(_requestsHint != _requests.end()) { @@ -1608,6 +1671,7 @@ Ice::Connection::parseMessage(BasicStream& stream, Int& requestId { _requests.erase(p); } + notifyAll(); // Wake up threads waiting in sendRequest() } break; @@ -1867,7 +1931,7 @@ Ice::Connection::run() readStream(stream); auto_ptr<LocalException> exception; - map<Int, OutgoingM*> requests; + map<Int, Outgoing*> requests; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1934,9 +1998,15 @@ Ice::Connection::run() #ifndef ICEE_PURE_CLIENT invokeAll(in, invokeNum, requestId, servantManager, adapter); #endif - for(map<Int, OutgoingM*>::iterator p = requests.begin(); p != requests.end(); ++p) + if(requests.size() != 0) { - p->second->finished(*_exception.get()); // The exception is immutable at this point. + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p) + { + p->second->finished(*_exception.get()); // The exception is immutable at this point. + } + notifyAll(); // Wake up threads waiting in sendRequest() } if(exception.get()) |