summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2004-02-23 14:39:39 +0000
committerMarc Laukien <marc@zeroc.com>2004-02-23 14:39:39 +0000
commit12a3bcc93abb7fc436ccd80599fd731d2a477474 (patch)
tree5c80ec39793bae084e7b29bad08d2f0c40eed666 /cpp/src/Ice/OutgoingAsync.cpp
parentAdded some AMI tests. (diff)
downloadice-12a3bcc93abb7fc436ccd80599fd731d2a477474.tar.bz2
ice-12a3bcc93abb7fc436ccd80599fd731d2a477474.tar.xz
ice-12a3bcc93abb7fc436ccd80599fd731d2a477474.zip
mutex for OutgoingAsync
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp170
1 files changed, 97 insertions, 73 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index ba255519aff..3a5d6d41025 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -41,27 +41,19 @@ IceInternal::OutgoingAsync::OutgoingAsync() :
IceInternal::OutgoingAsync::~OutgoingAsync()
{
- delete __is;
- delete __os;
+ assert(!__is);
+ assert(!__os);
}
void
IceInternal::OutgoingAsync::__finished(BasicStream& is)
{
- //
- // No mutex protection necessary, this function can only be called
- // after __send() and __prepare() have completed.
- //
-
- assert(_reference);
- assert(_connection);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor);
DispatchStatus status;
try
{
- delete __is;
- __is = new BasicStream(_reference->instance.get());
__is->swap(is);
Byte b;
@@ -155,18 +147,14 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
{
warning();
}
+
+ cleanup();
}
void
IceInternal::OutgoingAsync::__finished(const LocalException& exc)
{
- //
- // No mutex protection necessary, this function can only be called
- // after __send() and __prepare() have completed.
- //
-
- assert(_reference);
- //assert(_connection); // Might be null, if getConnection() failed.
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor);
if(_reference->locatorInfo)
{
@@ -208,39 +196,35 @@ IceInternal::OutgoingAsync::__finished(const LocalException& exc)
{
_connection = 0;
__send();
+ return;
}
- else
+
+ try
{
- try
- {
- ice_exception(exc);
- }
- catch(const Exception& ex)
- {
- warning(ex);
- }
- catch(const std::exception& ex)
- {
- warning(ex);
- }
- catch(...)
- {
- warning();
- }
+ ice_exception(exc);
+ }
+ catch(const Exception& ex)
+ {
+ warning(ex);
+ }
+ catch(const std::exception& ex)
+ {
+ warning(ex);
}
+ catch(...)
+ {
+ warning();
+ }
+
+ cleanup();
}
bool
IceInternal::OutgoingAsync::__timedOut() const
{
- //
- // No mutex protection necessary, this function can only be called
- // after __send() and __prepare() have completed.
- //
-
- assert(_connection);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor);
- if(_connection->timeout() >= 0)
+ if(_connection && _connection->timeout() >= 0)
{
return IceUtil::Time::now() >= _absoluteTimeout;
}
@@ -254,42 +238,56 @@ void
IceInternal::OutgoingAsync::__prepare(const ReferencePtr& ref, const string& operation, OperationMode mode,
const Context& context)
{
- //
- // No mutex protection necessary, using this object for a new AMI
- // call while another one is in progress is not allowed and leads
- // to undefined behavior.
- //
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor);
- _reference = ref;
- _connection = _reference->getConnection();
- _cnt = 0;
- _mode = mode;
-
- delete __os;
- __os = new BasicStream(_reference->instance.get());
-
- _connection->prepareRequest(__os);
- _reference->identity.__write(__os);
- __os->write(_reference->facet);
- __os->write(operation);
- __os->write(static_cast<Byte>(_mode));
- __os->writeSize(Int(context.size()));
- Context::const_iterator p;
- for(p = context.begin(); p != context.end(); ++p)
+ try
+ {
+ //
+ // We must first wait for other requests to finish.
+ //
+ while(_reference)
+ {
+ wait();
+ }
+
+ _reference = ref;
+ assert(!_connection);
+ _connection = _reference->getConnection();
+ _cnt = 0;
+ _mode = mode;
+ assert(!__is);
+ __is = new BasicStream(_reference->instance.get());
+ assert(!__os);
+ __os = new BasicStream(_reference->instance.get());
+
+ _connection->prepareRequest(__os);
+
+ _reference->identity.__write(__os);
+ __os->write(_reference->facet);
+ __os->write(operation);
+ __os->write(static_cast<Byte>(_mode));
+ __os->writeSize(Int(context.size()));
+ Context::const_iterator p;
+ for(p = context.begin(); p != context.end(); ++p)
+ {
+ __os->write(p->first);
+ __os->write(p->second);
+ }
+
+ __os->startWriteEncaps();
+ }
+ catch(const LocalException& ex)
{
- __os->write(p->first);
- __os->write(p->second);
+ cleanup();
+ ex.ice_throw();
}
-
- __os->startWriteEncaps();
}
void
IceInternal::OutgoingAsync::__send()
{
- assert(_reference);
- //assert(_connection); // Might be null, if called from __finished() for retry.
-
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor);
+
try
{
while(true)
@@ -298,16 +296,16 @@ IceInternal::OutgoingAsync::__send()
{
_connection = _reference->getConnection();
}
-
+
if(_connection->timeout() >= 0)
{
_absoluteTimeout = IceUtil::Time::now() + IceUtil::Time::milliSeconds(_connection->timeout());
}
-
+
try
{
_connection->sendAsyncRequest(__os, this);
-
+
//
// Don't do anything after sendAsyncRequest() returned
// without an exception. I such case, there will be
@@ -375,6 +373,32 @@ IceInternal::OutgoingAsync::warning() const
}
void
+IceInternal::OutgoingAsync::cleanup()
+{
+ if(_reference)
+ {
+ _reference = 0;
+ }
+
+ if(_connection)
+ {
+ _connection = 0;
+ }
+
+ if(__is)
+ {
+ delete __is;
+ __is = 0;
+ }
+
+ if(__os)
+ {
+ delete __os;
+ __os = 0;
+ }
+}
+
+void
Ice::AMI_Object_ice_invoke::__invoke(const IceInternal::ReferencePtr& ref, const string& operation, OperationMode mode,
const vector<Byte>& inParams, const Context& context)
{