summaryrefslogtreecommitdiff
path: root/cppe/src
diff options
context:
space:
mode:
authorDwayne Boone <dwayne@zeroc.com>2005-12-13 17:03:20 +0000
committerDwayne Boone <dwayne@zeroc.com>2005-12-13 17:03:20 +0000
commit60858a33bbce82f8af38fa70d26fc46d771aece4 (patch)
treeccd38b1dd37803de6dffbf889053fe894db62188 /cppe/src
parentAdded replica group tests (diff)
downloadice-60858a33bbce82f8af38fa70d26fc46d771aece4.tar.bz2
ice-60858a33bbce82f8af38fa70d26fc46d771aece4.tar.xz
ice-60858a33bbce82f8af38fa70d26fc46d771aece4.zip
Removed sync from Outgoing for blocking sends
Diffstat (limited to 'cppe/src')
-rwxr-xr-xcppe/src/IceE/Connection.cpp26
-rw-r--r--cppe/src/IceE/Instance.cpp21
-rw-r--r--cppe/src/IceE/Instance.h12
-rw-r--r--cppe/src/IceE/Outgoing.cpp143
4 files changed, 107 insertions, 95 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp
index f6df199a2c3..ef329c27daa 100755
--- a/cppe/src/IceE/Connection.cpp
+++ b/cppe/src/IceE/Connection.cpp
@@ -302,7 +302,7 @@ Ice::Connection::prepareRequest(BasicStream* os)
}
void
-Ice::Connection::sendRequest(BasicStream* os, Outgoing* out)
+Ice::Connection::sendRequest(BasicStream* os, BasicStream* is, Outgoing* out)
{
Int requestId;
@@ -391,8 +391,7 @@ Ice::Connection::sendRequest(BasicStream* os, Outgoing* out)
#endif
)
{
- BasicStream stream(_instance.get());
- readStream(stream);
+ readStream(*is);
#ifndef ICEE_PURE_CLIENT
Int invokeNum = 0;
@@ -406,9 +405,9 @@ Ice::Connection::sendRequest(BasicStream* os, Outgoing* out)
if(_state != StateClosed)
{
#ifndef ICEE_PURE_CLIENT
- parseMessage(stream, requestId, out, invokeNum, servantManager, adapter);
+ parseMessage(*is, requestId, invokeNum, servantManager, adapter);
#else
- parseMessage(stream, requestId, out);
+ parseMessage(*is, requestId);
#endif
}
@@ -762,6 +761,14 @@ Ice::Connection::endpoint() const
return _endpoint; // No mutex protection necessary, _endpoint is immutable.
}
+#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT)
+bool
+Ice::Connection::blocking() const
+{
+ return _blocking;
+}
+#endif
+
#ifndef ICEE_PURE_CLIENT
void
@@ -882,7 +889,7 @@ Ice::Connection::Connection(const InstancePtr& instance,
{
#ifndef ICEE_PURE_BLOCKING_CLIENT
# ifdef ICEE_BLOCKING_CLIENT
- _blocking = _instance->blocking()
+ _blocking = _instance->properties()->getPropertyAsInt("Ice.Blocking") > 0
# ifndef ICEE_PURE_CLIENT
&& !_adapter
# endif
@@ -1356,7 +1363,7 @@ Ice::Connection::initiateShutdown() const
}
void
-Ice::Connection::parseMessage(BasicStream& stream, Int& requestId, Outgoing* out
+Ice::Connection::parseMessage(BasicStream& stream, Int& requestId
#ifndef ICEE_PURE_CLIENT
,Int& invokeNum, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter
#endif
@@ -1410,7 +1417,6 @@ Ice::Connection::parseMessage(BasicStream& stream, Int& requestId, Outgoing* out
{
throw UnknownRequestIdException(__FILE__, __LINE__);
}
- out->finished(stream);
break;
}
@@ -1813,9 +1819,9 @@ Ice::Connection::run()
if(_state != StateClosed)
{
#ifndef ICEE_PURE_CLIENT
- parseMessage(stream, requestId, 0, invokeNum, servantManager, adapter);
+ parseMessage(stream, requestId, invokeNum, servantManager, adapter);
#else
- parseMessage(stream, requestId, 0);
+ parseMessage(stream, requestId);
#endif
}
diff --git a/cppe/src/IceE/Instance.cpp b/cppe/src/IceE/Instance.cpp
index 0f3f59a89a2..c30b41df662 100644
--- a/cppe/src/IceE/Instance.cpp
+++ b/cppe/src/IceE/Instance.cpp
@@ -202,20 +202,14 @@ IceInternal::Instance::objectAdapterFactory() const
}
#endif
-#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT)
-bool
-IceInternal::Instance::blocking() const
-{
- return _blocking;
-}
-#endif
-
+#ifndef ICEE_PURE_BLOCKING_CLIENT
size_t
IceInternal::Instance::threadPerConnectionStackSize() const
{
// No mutex lock, immutable.
return _threadPerConnectionStackSize;
}
+#endif
EndpointFactoryPtr
IceInternal::Instance::endpointFactory() const
@@ -298,10 +292,9 @@ IceInternal::Instance::getDefaultContext() const
IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const PropertiesPtr& properties) :
_state(StateActive),
_properties(properties),
- _messageSizeMax(0),
- _threadPerConnectionStackSize(0)
-#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT)
- , _blocking(false)
+ _messageSizeMax(0)
+#ifndef ICEE_PURE_BLOCKING_CLIENT
+ , _threadPerConnectionStackSize(0)
#endif
{
try
@@ -450,10 +443,6 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Prope
}
#ifndef ICEE_PURE_BLOCKING_CLIENT
-# ifdef ICEE_BLOCKING_CLIENT
- const_cast<bool&>(_blocking) = _properties->getPropertyAsInt("Ice.Blocking") > 0;
-# endif
-
{
Int stackSize = _properties->getPropertyAsInt("Ice.ThreadPerConnection.StackSize");
if(stackSize < 0)
diff --git a/cppe/src/IceE/Instance.h b/cppe/src/IceE/Instance.h
index a9af7a54819..5971b5b94ca 100644
--- a/cppe/src/IceE/Instance.h
+++ b/cppe/src/IceE/Instance.h
@@ -58,16 +58,14 @@ public:
#endif
void setDefaultContext(const ::Ice::Context&);
::Ice::Context getDefaultContext() const;
+#ifndef ICEE_PURE_BLOCKING_CLIENT
size_t threadPerConnectionStackSize() const;
+#endif
#ifndef ICEE_PURE_CLIENT
ObjectAdapterFactoryPtr objectAdapterFactory() const;
#endif
-#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT)
- bool blocking() const;
-#endif
-
private:
Instance(const Ice::CommunicatorPtr&, const Ice::PropertiesPtr&);
@@ -89,7 +87,9 @@ private:
const TraceLevelsPtr _traceLevels; // Immutable, not reset by destroy().
const DefaultsAndOverridesPtr _defaultsAndOverrides; // Immutable, not reset by destroy().
const size_t _messageSizeMax; // Immutable, not reset by destroy().
+#ifndef ICEE_PURE_BLOCKING_CLIENT
const size_t _threadPerConnectionStackSize;
+#endif
#ifdef ICEE_HAS_ROUTER
RouterManagerPtr _routerManager;
#endif
@@ -105,10 +105,6 @@ private:
#ifndef ICEE_PURE_CLIENT
ObjectAdapterFactoryPtr _objectAdapterFactory;
#endif
-
-#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT)
- const bool _blocking;
-#endif
};
}
diff --git a/cppe/src/IceE/Outgoing.cpp b/cppe/src/IceE/Outgoing.cpp
index c75ded2a4cc..c5561b62c7d 100644
--- a/cppe/src/IceE/Outgoing.cpp
+++ b/cppe/src/IceE/Outgoing.cpp
@@ -113,78 +113,94 @@ IceInternal::Outgoing::invoke()
{
case Reference::ModeTwoway:
{
- //
- // We let all exceptions raised by sending directly
- // propagate to the caller, because they can be retried
- // without violating "at-most-once". In case of such
- // exceptions, the connection object does not call back on
- // this object, so we don't need to lock the mutex, keep
- // track of state, or save exceptions.
- //
- _connection->sendRequest(&_os, this);
+#ifndef ICEE_PURE_BLOCKING_CLIENT
+#ifdef ICEE_BLOCKING_CLIENT
+ if(!_connection->blocking())
+ {
+#endif
+ //
+ // We let all exceptions raised by sending directly
+ // propagate to the caller, because they can be retried
+ // without violating "at-most-once". In case of such
+ // exceptions, the connection object does not call back on
+ // this object, so we don't need to lock the mutex, keep
+ // track of state, or save exceptions.
+ //
+ _connection->sendRequest(&_os, 0, this);
- //
- // Wait until the request has completed, or until the
- // request times out.
- //
+ //
+ // Wait until the request has completed, or until the
+ // request times out.
+ //
- bool timedOut = false;
+ bool timedOut = false;
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- //
- // It's possible that the request has already
- // completed, due to a regular response, or because of
- // an exception. So we only change the state to "in
- // progress" if it is still "unsent".
- //
- if(_state == StateUnsent)
- {
- _state = StateInProgress;
- }
+ //
+ // It's possible that the request has already
+ // completed, due to a regular response, or because of
+ // an exception. So we only change the state to "in
+ // progress" if it is still "unsent".
+ //
+ if(_state == StateUnsent)
+ {
+ _state = StateInProgress;
+ }
- Int timeout = _connection->timeout();
- while(_state == StateInProgress && !timedOut)
- {
- if(timeout >= 0)
- {
- timedWait(IceUtil::Time::milliSeconds(timeout));
+ Int timeout = _connection->timeout();
+ while(_state == StateInProgress && !timedOut)
+ {
+ if(timeout >= 0)
+ {
+ timedWait(IceUtil::Time::milliSeconds(timeout));
- if(_state == StateInProgress)
- {
- timedOut = true;
- }
+ if(_state == StateInProgress)
+ {
+ timedOut = true;
+ }
+ }
+ else
+ {
+ wait();
+ }
}
- else
+ }
+
+ if(timedOut)
+ {
+ //
+ // Must be called outside the synchronization of this
+ // object.
+ //
+ _connection->exception(TimeoutException(__FILE__, __LINE__));
+
+ //
+ // We must wait until the exception set above has
+ // propagated to this Outgoing object.
+ //
{
- wait();
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ while(_state == StateInProgress)
+ {
+ wait();
+ }
}
- }
+ }
+#ifdef ICEE_BLOCKING_CLIENT
}
-
- if(timedOut)
+ else
{
- //
- // Must be called outside the synchronization of this
- // object.
- //
- _connection->exception(TimeoutException(__FILE__, __LINE__));
-
- //
- // We must wait until the exception set above has
- // propagated to this Outgoing object.
- //
+ _connection->sendRequest(&_os, &_is, this);
+ if(!_exception.get())
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- while(_state == StateInProgress)
- {
- wait();
- }
+ finishedInternal();
}
}
-
+#endif
+#endif
if(_exception.get())
{
//
@@ -232,7 +248,7 @@ IceInternal::Outgoing::invoke()
// violating "at-most-once".
//
_state = StateInProgress;
- _connection->sendRequest(&_os, 0);
+ _connection->sendRequest(&_os, 0, 0);
break;
}
@@ -297,6 +313,13 @@ IceInternal::Outgoing::finished(BasicStream& is)
assert(_state <= StateInProgress);
_is.swap(is);
+ finishedInternal();
+ notify();
+}
+
+void
+IceInternal::Outgoing::finishedInternal()
+{
Byte status;
_is.read(status);
@@ -449,8 +472,6 @@ IceInternal::Outgoing::finished(BasicStream& is)
break;
}
}
-
- notify();
}
void