summaryrefslogtreecommitdiff
path: root/cppe/src/IceE/Connection.cpp
diff options
context:
space:
mode:
authorDwayne Boone <dwayne@zeroc.com>2006-02-21 16:54:03 +0000
committerDwayne Boone <dwayne@zeroc.com>2006-02-21 16:54:03 +0000
commit77f1dc4b62c2d50040ecf8a76dfce1d979c9d15a (patch)
treecf4408e91e0f56bef835a1dc07b40ad27904b017 /cppe/src/IceE/Connection.cpp
parentRemoved OutgoingM (diff)
downloadice-77f1dc4b62c2d50040ecf8a76dfce1d979c9d15a.tar.bz2
ice-77f1dc4b62c2d50040ecf8a76dfce1d979c9d15a.tar.xz
ice-77f1dc4b62c2d50040ecf8a76dfce1d979c9d15a.zip
Removed OutgoingM
Diffstat (limited to 'cppe/src/IceE/Connection.cpp')
-rwxr-xr-xcppe/src/IceE/Connection.cpp170
1 files changed, 120 insertions, 50 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp
index c4e57f4644a..89ac9eda507 100755
--- a/cppe/src/IceE/Connection.cpp
+++ b/cppe/src/IceE/Connection.cpp
@@ -28,6 +28,8 @@
# include <IceE/Incoming.h>
#endif
+#include <iostream>
+
using namespace std;
using namespace Ice;
using namespace IceInternal;
@@ -357,28 +359,27 @@ Ice::Connection::sendRequest(BasicStream* os)
#ifdef ICEE_BLOCKING_CLIENT
void
-Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing* out)
+Ice::Connection::sendBlockingRequest(BasicStream* os, Outgoing* out)
{
Int requestId;
-
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_exception.get())
- {
- _exception->ice_throw();
- }
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
- //
- // Fill in request id if it is a twoway call.
- //
- if(out)
- {
- requestId = fillRequestId(os);
- }
+ //
+ // Fill in request id if it is a twoway call.
+ //
+ if(out)
+ {
+ requestId = fillRequestId(os);
+ }
}
try
@@ -389,13 +390,14 @@ Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing*
if(out)
{
- readStream(*is);
+ os->reset();
+ readStream(*os);
}
}
if(out)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_state != StateClosed)
{
@@ -404,9 +406,9 @@ Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing*
ServantManager* servantManager;
ObjectAdapter* adapter;
- parseMessage(*is, requestId, invokeNum, servantManager, adapter);
+ parseMessage(*os, requestId, invokeNum, servantManager, adapter);
#else
- parseMessage(*is, requestId);
+ parseMessage(*os, requestId);
#endif
}
@@ -431,7 +433,7 @@ Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing*
}
catch(const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateClosed, ex);
assert(_exception.get());
_exception->ice_throw();
@@ -443,43 +445,84 @@ Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing*
#ifndef ICEE_PURE_BLOCKING_CLIENT
void
-Ice::Connection::sendRequest(BasicStream* os, OutgoingM* out)
+Ice::Connection::sendRequest(BasicStream* os, Outgoing* out)
{
Int requestId;
-
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_exception.get())
- {
- _exception->ice_throw();
- }
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
- //
- // Only add to the request map if this is a twoway call.
- //
- if(out)
- {
- requestId = fillRequestId(os);
+ //
+ // Only add to the request map if this is a twoway call.
+ //
+ if(out)
+ {
+ requestId = fillRequestId(os);
- //
- // Add to the requests map.
- //
- _requestsHint = _requests.insert(_requests.end(), pair<const Int, OutgoingM*>(requestId, out));
- }
+ //
+ // Add to the requests map.
+ //
+ _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
+ }
}
+ bool timedOut = false;
try
{
- IceUtil::Mutex::Lock sendSync(_sendMutex);
- sendRequest(os);
+ {
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+ sendRequest(os);
+ }
+
+ if(out)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // Wait until the request has completed, or until the
+ // request times out.
+ //
+ Int tout = timeout();
+ IceUtil::Time expireTime;
+ if(tout >= 0)
+ {
+ expireTime = IceUtil::Time::now() + IceUtil::Time::milliSeconds(tout);
+ }
+ while(out->state() == Outgoing::StateInProgress && !timedOut)
+ {
+ if(tout >= 0)
+ {
+ IceUtil::Time now = IceUtil::Time::now();
+ if(now < expireTime)
+ {
+ timedWait(expireTime - now);
+ }
+
+ //
+ // Make sure we woke up because of timeout and not another response.
+ //
+ if(out->state() == Outgoing::StateInProgress && IceUtil::Time::now() > expireTime)
+ {
+ timedOut = true;
+ }
+ }
+ else
+ {
+ wait();
+ }
+ }
+ }
}
catch(const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateClosed, ex);
assert(_exception.get());
@@ -499,11 +542,11 @@ Ice::Connection::sendRequest(BasicStream* os, OutgoingM* out)
// very elaborate and complex design, which would be bad
// for performance.
//
- map<Int, OutgoingM*>::iterator p = _requests.find(requestId);
+ map<Int, Outgoing*>::iterator p = _requests.find(requestId);
if(p != _requests.end())
{
if(p == _requestsHint)
- {
+ {
_requests.erase(p++);
_requestsHint = p;
}
@@ -520,6 +563,26 @@ Ice::Connection::sendRequest(BasicStream* os, OutgoingM* out)
_exception->ice_throw();
}
}
+
+ if(timedOut)
+ {
+ //
+ // Must be called outside the synchronization of this
+ // object.
+ //
+ exception(TimeoutException(__FILE__, __LINE__));
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // We must wait until the exception has propagted
+ // back to the Outgoing object.
+ //
+ while(out->state() == Outgoing::StateInProgress)
+ {
+ wait();
+ }
+ }
}
#endif
@@ -1575,7 +1638,7 @@ Ice::Connection::parseMessage(BasicStream& stream, Int& requestId
stream.read(requestId);
- map<Int, OutgoingM*>::iterator p = _requests.end();
+ map<Int, Outgoing*>::iterator p = _requests.end();
if(_requestsHint != _requests.end())
{
@@ -1608,6 +1671,7 @@ Ice::Connection::parseMessage(BasicStream& stream, Int& requestId
{
_requests.erase(p);
}
+ notifyAll(); // Wake up threads waiting in sendRequest()
}
break;
@@ -1867,7 +1931,7 @@ Ice::Connection::run()
readStream(stream);
auto_ptr<LocalException> exception;
- map<Int, OutgoingM*> requests;
+ map<Int, Outgoing*> requests;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -1934,9 +1998,15 @@ Ice::Connection::run()
#ifndef ICEE_PURE_CLIENT
invokeAll(in, invokeNum, requestId, servantManager, adapter);
#endif
- for(map<Int, OutgoingM*>::iterator p = requests.begin(); p != requests.end(); ++p)
+ if(requests.size() != 0)
{
- p->second->finished(*_exception.get()); // The exception is immutable at this point.
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p)
+ {
+ p->second->finished(*_exception.get()); // The exception is immutable at this point.
+ }
+ notifyAll(); // Wake up threads waiting in sendRequest()
}
if(exception.get())