summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp122
1 files changed, 37 insertions, 85 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 13640e70677..a4ac2f3e7ae 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -35,42 +35,34 @@ void IceInternal::decRef(AMI_Object_ice_invoke* p) { p->__decRef(); }
IceInternal::OutgoingAsync::OutgoingAsync() :
__is(0),
- __os(0),
- _state(StateUnsent)
+ __os(0)
{
}
IceInternal::OutgoingAsync::~OutgoingAsync()
{
- assert(!_reference);
- assert(!_connection);
- assert(!__is);
- assert(!__os);
+ delete __is;
+ delete __os;
}
void
IceInternal::OutgoingAsync::__finished(BasicStream& is)
{
//
- // Wait until sending has completed.
+ // No mutex protection necessary, this function can only be called
+ // after __send() and __prepare() have completed.
//
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- while(_state < StateSent)
- {
- _monitor.wait();
- }
- }
assert(_reference);
assert(_connection);
- assert(__is);
- assert(__os);
DispatchStatus status;
try
{
+ delete __is;
+ __is = new BasicStream(_reference->instance.get());
+
__is->swap(is);
Byte b;
@@ -171,20 +163,12 @@ void
IceInternal::OutgoingAsync::__finished(const LocalException& exc)
{
//
- // Wait until sending has completed.
+ // No mutex protection necessary, this function can only be called
+ // after __send() and __prepare() have completed.
//
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- while(_state < StateSent)
- {
- _monitor.wait();
- }
- }
assert(_reference);
- assert(_connection);
- assert(__is);
- assert(__os);
+ //assert(_connection); // Might be null, if getConnection() failed.
if(_reference->locatorInfo)
{
@@ -224,11 +208,6 @@ IceInternal::OutgoingAsync::__finished(const LocalException& exc)
if(doRetry)
{
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(_state == StateSent);
- _state = StateUnsent;
- }
_connection = 0;
__send();
}
@@ -236,7 +215,6 @@ IceInternal::OutgoingAsync::__finished(const LocalException& exc)
{
try
{
- __cleanup();
ice_exception(exc);
}
catch(const Exception& ex)
@@ -257,7 +235,14 @@ IceInternal::OutgoingAsync::__finished(const LocalException& exc)
bool
IceInternal::OutgoingAsync::__timedOut() const
{
- if(_connection && _connection->timeout() >= 0)
+ //
+ // No mutex protection necessary, this function can only be called
+ // after __send() and __prepare() have completed.
+ //
+
+ assert(_connection);
+
+ if(_connection->timeout() >= 0)
{
return IceUtil::Time::now() >= _absoluteTimeout;
}
@@ -271,20 +256,20 @@ void
IceInternal::OutgoingAsync::__prepare(const ReferencePtr& ref, const string& operation, OperationMode mode,
const Context& context)
{
- assert(!_reference);
- assert(!_connection);
- assert(!__is);
- assert(!__os);
- assert(_state == StateUnsent);
+ //
+ // No mutex protection necessary, using this object for a new AMI
+ // call while another one is in progress is not allowed and leads
+ // to undefined behavior.
+ //
_reference = ref;
_connection = _reference->getConnection();
- __is = new BasicStream(_reference->instance.get());
- __os = new BasicStream(_reference->instance.get());
-
_cnt = 0;
_mode = mode;
+ delete __os;
+ __os = new BasicStream(_reference->instance.get());
+
_connection->prepareRequest(__os);
_reference->identity.__write(__os);
__os->write(_reference->facet);
@@ -305,10 +290,7 @@ void
IceInternal::OutgoingAsync::__send()
{
assert(_reference);
- //assert(_connection); // Might be 0, in case we retry from __finished().
- assert(__is);
- assert(__os);
- assert(_state == StateUnsent);
+ //assert(_connection); // Might be null, if called from __finished() for retry.
try
{
@@ -327,7 +309,15 @@ IceInternal::OutgoingAsync::__send()
try
{
_connection->sendAsyncRequest(__os, this);
- break;
+
+ //
+ // Don't do anything after sendAsyncRequest() returned
+ // without an exception. I such case, there will be
+ // callbacks, i.e., calls to the __finished()
+ // functions. Since there is no mutex protection, we
+ // cannot modify state here and in such callbacks.
+ //
+ return;
}
catch(const LocalException& ex)
{
@@ -352,44 +342,7 @@ IceInternal::OutgoingAsync::__send()
}
catch(const LocalException& ex)
{
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(_state == StateUnsent);
- _state = StateSent;
- _monitor.notify();
- }
-
__finished(ex);
- return;
- }
-
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(_state == StateUnsent);
- _state = StateSent;
- _monitor.notify();
- }
-}
-
-void
-IceInternal::OutgoingAsync::__cleanup()
-{
- assert(_reference);
- assert(_connection);
- assert(__is);
- assert(__os);
-
- _reference = 0;
- _connection = 0;
- delete __is;
- __is = 0;
- delete __os;
- __os = 0;
-
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(_state == StateSent);
- _state = StateUnsent;
}
}
@@ -455,6 +408,5 @@ Ice::AMI_Object_ice_invoke::__response(bool ok) // ok == true means no user exce
__finished(ex);
return;
}
- __cleanup();
ice_response(ok, outParams);
}