summaryrefslogtreecommitdiff
path: root/cppe/src
diff options
context:
space:
mode:
authorDwayne Boone <dwayne@zeroc.com>2005-12-15 14:32:47 +0000
committerDwayne Boone <dwayne@zeroc.com>2005-12-15 14:32:47 +0000
commit31abdd9f614f38a3b52fadf0a6121a11c50fd064 (patch)
treee696781ce3e7d6537e8e55f0b46f3deef1aca22b /cppe/src
parentOnly removed Monitor for pure blocking for now (diff)
downloadice-31abdd9f614f38a3b52fadf0a6121a11c50fd064.tar.bz2
ice-31abdd9f614f38a3b52fadf0a6121a11c50fd064.tar.xz
ice-31abdd9f614f38a3b52fadf0a6121a11c50fd064.zip
Some cleanup to make code prettier
Diffstat (limited to 'cppe/src')
-rwxr-xr-xcppe/src/IceE/Connection.cpp201
-rw-r--r--cppe/src/IceE/Outgoing.cpp15
2 files changed, 129 insertions, 87 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp
index a926dfe5fb1..8899ce2bee7 100755
--- a/cppe/src/IceE/Connection.cpp
+++ b/cppe/src/IceE/Connection.cpp
@@ -301,8 +301,62 @@ Ice::Connection::prepareRequest(BasicStream* os)
os->writeBlob(_requestHdr);
}
+
+Int
+Ice::Connection::fillRequestId(BasicStream* os)
+{
+ //
+ // Create a new unique request ID.
+ //
+ Int requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ //
+ // Fill in the request ID.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&requestId);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#endif
+
+ return requestId;
+}
+
+void
+Ice::Connection::sendRequest(BasicStream* os)
+{
+ if(!_transceiver) // Has the transceiver already been closed?
+ {
+ assert(_exception.get());
+ _exception->ice_throw(); // The exception is immutable at this point.
+ }
+
+ Int sz = static_cast<Int>(os->b.size());
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
+#else
+ copy(p, p + sizeof(Int), os->b.begin() + 10);
+#endif
+
+ //
+ // Send the request.
+ //
+ os->i = os->b.begin();
+ traceRequest("sending request", *os, _logger, _traceLevels);
+ _transceiver->write(*os, _endpoint->timeout());
+}
+
+#ifdef ICEE_BLOCKING_CLIENT
+
void
-Ice::Connection::sendRequest(BasicStream* os, BasicStream* is, Outgoing* out)
+Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing* out)
{
Int requestId;
@@ -318,78 +372,20 @@ Ice::Connection::sendRequest(BasicStream* os, BasicStream* is, Outgoing* out)
assert(_state < StateClosing);
//
- // Only add to the request map if this is a twoway call.
+ // Fill in request id if it is a twoway call.
//
if(out)
{
- //
- // Create a new unique request ID.
- //
- requestId = _nextRequestId++;
- if(requestId <= 0)
- {
- _nextRequestId = 1;
- requestId = _nextRequestId++;
- }
-
- //
- // Fill in the request ID.
- //
- const Byte* p = reinterpret_cast<const Byte*>(&requestId);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#else
- copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#endif
-
- //
- // Add to the requests map if not blocking.
- //
-#ifndef ICEE_PURE_BLOCKING_CLIENT
-# ifdef ICEE_BLOCKING_CLIENT
- if(!_blocking)
-# endif
- {
- _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
- }
-#endif
+ requestId = fillRequestId(os);
}
}
try
{
IceUtil::Mutex::Lock sendSync(_sendMutex);
+ sendRequest(os);
- if(!_transceiver) // Has the transceiver already been closed?
- {
- assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
- }
-
- Int sz = static_cast<Int>(os->b.size());
- const Byte* p = reinterpret_cast<const Byte*>(&sz);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
-#else
- copy(p, p + sizeof(Int), os->b.begin() + 10);
-#endif
-
- //
- // Send the request.
- //
- os->i = os->b.begin();
- traceRequest("sending request", *os, _logger, _traceLevels);
- _transceiver->write(*os, _endpoint->timeout());
-
-#ifdef ICEE_BLOCKING_CLIENT
- //
- // If blocking client, we wait for the response from the server.
- //
- if(out
-#ifndef ICEE_PURE_BLOCKING_CLIENT
- && _blocking
-#endif
- )
+ if(out)
{
readStream(*is);
@@ -430,20 +426,62 @@ Ice::Connection::sendRequest(BasicStream* os, BasicStream* is, Outgoing* out)
}
}
}
-#endif
}
catch(const LocalException& ex)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateClosed, ex);
assert(_exception.get());
+ _exception->ice_throw();
+ }
+}
+
+#endif
#ifndef ICEE_PURE_BLOCKING_CLIENT
- if(out
-# ifdef ICEE_BLOCKING_CLIENT
- && !_blocking
-# endif
- )
+
+void
+Ice::Connection::sendRequest(BasicStream* os, Outgoing* out)
+{
+ Int requestId;
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ //
+ // Only add to the request map if this is a twoway call.
+ //
+ if(out)
+ {
+ requestId = fillRequestId(os);
+
+ //
+ // Add to the requests map.
+ //
+ _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
+ }
+ }
+
+ try
+ {
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+ sendRequest(os);
+ }
+ catch(const LocalException& ex)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ assert(_exception.get());
+
+ if(out)
{
//
// If the request has already been removed from the
@@ -476,13 +514,14 @@ Ice::Connection::sendRequest(BasicStream* os, BasicStream* is, Outgoing* out)
}
}
else
-#endif
{
_exception->ice_throw();
}
}
}
+#endif
+
#ifdef ICEE_HAS_BATCH
void
@@ -887,13 +926,11 @@ Ice::Connection::Connection(const InstancePtr& instance,
_state(StateNotValidated),
_stateTime(IceUtil::Time::now())
{
-#ifndef ICEE_PURE_BLOCKING_CLIENT
-# ifdef ICEE_BLOCKING_CLIENT
- _blocking = _instance->properties()->getPropertyAsInt("Ice.Blocking") > 0
-# ifndef ICEE_PURE_CLIENT
- && !_adapter
-# endif
- ;
+#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT)
+# ifdef ICEE_PURE_CLIENT
+ _blocking = _instance->properties()->getPropertyAsInt("Ice.Blocking") > 0;
+# else
+ _blocking = _instance->properties()->getPropertyAsInt("Ice.Blocking") > 0 && !_adapter;
# endif
#endif
@@ -942,18 +979,16 @@ Ice::Connection::Connection(const InstancePtr& instance,
}
#endif
-#ifdef ICEE_BLOCKING_CLIENT
-# ifndef ICEE_PURE_BLOCKING_CLIENT
+#ifdef ICEE_PURE_BLOCKING_CLIENT
+ validate();
+#else
+# ifdef ICEE_BLOCKING_CLIENT
if(_blocking)
{
-# endif
validate();
-# ifndef ICEE_PURE_BLOCKING_CLIENT
}
else
# endif
-#endif
-#ifndef ICEE_PURE_BLOCKING_CLIENT
{
__setNoDelete(true);
try
diff --git a/cppe/src/IceE/Outgoing.cpp b/cppe/src/IceE/Outgoing.cpp
index e2222b71ca0..b8a0c0f5deb 100644
--- a/cppe/src/IceE/Outgoing.cpp
+++ b/cppe/src/IceE/Outgoing.cpp
@@ -126,7 +126,7 @@ IceInternal::Outgoing::invoke()
// this object, so we don't need to lock the mutex, keep
// track of state, or save exceptions.
//
- _connection->sendRequest(&_os, 0, this);
+ _connection->sendRequest(&_os, this);
//
// Wait until the request has completed, or until the
@@ -200,7 +200,7 @@ IceInternal::Outgoing::invoke()
// For blocking sends the reply is written directly
// into the incoming stream.
//
- _connection->sendRequest(&_os, &_is, this);
+ _connection->sendBlockingRequest(&_os, &_is, this);
if(!_exception.get())
{
finishedInternal();
@@ -254,7 +254,11 @@ IceInternal::Outgoing::invoke()
// violating "at-most-once".
//
_state = StateInProgress;
- _connection->sendRequest(&_os, 0, 0);
+#ifdef ICEE_BLOCKING_CLIENT
+ _connection->sendBlockingRequest(&_os, 0, 0);
+#else
+ _connection->sendRequest(&_os, 0);
+#endif
break;
}
@@ -496,6 +500,9 @@ IceInternal::Outgoing::finished(const LocalException& ex)
_state = StateLocalException;
_exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
#ifndef ICEE_PURE_BLOCKING_CLIENT
- notify();
+ if(!_connection->blocking())
+ {
+ notify();
+ }
#endif
}