diff options
author | Marc Laukien <marc@zeroc.com> | 2004-02-23 14:39:39 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2004-02-23 14:39:39 +0000 |
commit | 12a3bcc93abb7fc436ccd80599fd731d2a477474 (patch) | |
tree | 5c80ec39793bae084e7b29bad08d2f0c40eed666 /cpp/src/Ice/OutgoingAsync.cpp | |
parent | Added some AMI tests. (diff) | |
download | ice-12a3bcc93abb7fc436ccd80599fd731d2a477474.tar.bz2 ice-12a3bcc93abb7fc436ccd80599fd731d2a477474.tar.xz ice-12a3bcc93abb7fc436ccd80599fd731d2a477474.zip |
mutex for OutgoingAsync
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 170 |
1 files changed, 97 insertions, 73 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index ba255519aff..3a5d6d41025 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -41,27 +41,19 @@ IceInternal::OutgoingAsync::OutgoingAsync() : IceInternal::OutgoingAsync::~OutgoingAsync() { - delete __is; - delete __os; + assert(!__is); + assert(!__os); } void IceInternal::OutgoingAsync::__finished(BasicStream& is) { - // - // No mutex protection necessary, this function can only be called - // after __send() and __prepare() have completed. - // - - assert(_reference); - assert(_connection); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor); DispatchStatus status; try { - delete __is; - __is = new BasicStream(_reference->instance.get()); __is->swap(is); Byte b; @@ -155,18 +147,14 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) { warning(); } + + cleanup(); } void IceInternal::OutgoingAsync::__finished(const LocalException& exc) { - // - // No mutex protection necessary, this function can only be called - // after __send() and __prepare() have completed. - // - - assert(_reference); - //assert(_connection); // Might be null, if getConnection() failed. + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor); if(_reference->locatorInfo) { @@ -208,39 +196,35 @@ IceInternal::OutgoingAsync::__finished(const LocalException& exc) { _connection = 0; __send(); + return; } - else + + try { - try - { - ice_exception(exc); - } - catch(const Exception& ex) - { - warning(ex); - } - catch(const std::exception& ex) - { - warning(ex); - } - catch(...) - { - warning(); - } + ice_exception(exc); + } + catch(const Exception& ex) + { + warning(ex); + } + catch(const std::exception& ex) + { + warning(ex); } + catch(...) + { + warning(); + } + + cleanup(); } bool IceInternal::OutgoingAsync::__timedOut() const { - // - // No mutex protection necessary, this function can only be called - // after __send() and __prepare() have completed. - // - - assert(_connection); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor); - if(_connection->timeout() >= 0) + if(_connection && _connection->timeout() >= 0) { return IceUtil::Time::now() >= _absoluteTimeout; } @@ -254,42 +238,56 @@ void IceInternal::OutgoingAsync::__prepare(const ReferencePtr& ref, const string& operation, OperationMode mode, const Context& context) { - // - // No mutex protection necessary, using this object for a new AMI - // call while another one is in progress is not allowed and leads - // to undefined behavior. - // + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor); - _reference = ref; - _connection = _reference->getConnection(); - _cnt = 0; - _mode = mode; - - delete __os; - __os = new BasicStream(_reference->instance.get()); - - _connection->prepareRequest(__os); - _reference->identity.__write(__os); - __os->write(_reference->facet); - __os->write(operation); - __os->write(static_cast<Byte>(_mode)); - __os->writeSize(Int(context.size())); - Context::const_iterator p; - for(p = context.begin(); p != context.end(); ++p) + try + { + // + // We must first wait for other requests to finish. + // + while(_reference) + { + wait(); + } + + _reference = ref; + assert(!_connection); + _connection = _reference->getConnection(); + _cnt = 0; + _mode = mode; + assert(!__is); + __is = new BasicStream(_reference->instance.get()); + assert(!__os); + __os = new BasicStream(_reference->instance.get()); + + _connection->prepareRequest(__os); + + _reference->identity.__write(__os); + __os->write(_reference->facet); + __os->write(operation); + __os->write(static_cast<Byte>(_mode)); + __os->writeSize(Int(context.size())); + Context::const_iterator p; + for(p = context.begin(); p != context.end(); ++p) + { + __os->write(p->first); + __os->write(p->second); + } + + __os->startWriteEncaps(); + } + catch(const LocalException& ex) { - __os->write(p->first); - __os->write(p->second); + cleanup(); + ex.ice_throw(); } - - __os->startWriteEncaps(); } void IceInternal::OutgoingAsync::__send() { - assert(_reference); - //assert(_connection); // Might be null, if called from __finished() for retry. - + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor); + try { while(true) @@ -298,16 +296,16 @@ IceInternal::OutgoingAsync::__send() { _connection = _reference->getConnection(); } - + if(_connection->timeout() >= 0) { _absoluteTimeout = IceUtil::Time::now() + IceUtil::Time::milliSeconds(_connection->timeout()); } - + try { _connection->sendAsyncRequest(__os, this); - + // // Don't do anything after sendAsyncRequest() returned // without an exception. I such case, there will be @@ -375,6 +373,32 @@ IceInternal::OutgoingAsync::warning() const } void +IceInternal::OutgoingAsync::cleanup() +{ + if(_reference) + { + _reference = 0; + } + + if(_connection) + { + _connection = 0; + } + + if(__is) + { + delete __is; + __is = 0; + } + + if(__os) + { + delete __os; + __os = 0; + } +} + +void Ice::AMI_Object_ice_invoke::__invoke(const IceInternal::ReferencePtr& ref, const string& operation, OperationMode mode, const vector<Byte>& inParams, const Context& context) { |