diff options
author | Marc Laukien <marc@zeroc.com> | 2004-02-17 03:05:24 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2004-02-17 03:05:24 +0000 |
commit | 4f0c2d50a93b4aaf96e309bbf0795610ba137d11 (patch) | |
tree | d136ad246e474848010b5f19c1f425dfd7eae464 /cpp/src | |
parent | more AMI (diff) | |
download | ice-4f0c2d50a93b4aaf96e309bbf0795610ba137d11.tar.bz2 ice-4f0c2d50a93b4aaf96e309bbf0795610ba137d11.tar.xz ice-4f0c2d50a93b4aaf96e309bbf0795610ba137d11.zip |
more and more AMI fixes..
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 109 | ||||
-rw-r--r-- | cpp/src/Ice/Connection.h | 3 | ||||
-rw-r--r-- | cpp/src/Ice/Outgoing.cpp | 306 | ||||
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 272 |
4 files changed, 368 insertions, 322 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index e019c29d250..78e29294a75 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -58,7 +58,7 @@ IceInternal::Connection::validate() if(_adapter) { IceUtil::Mutex::Lock sendSync(_sendMutex); - assert(_threadPool); // The transceiver cannot be closed already. + assert(_transceiver); // The transceiver cannot be closed already. // // Incoming connections play the active role with respect to @@ -258,7 +258,7 @@ bool IceInternal::Connection::isFinished() const { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - return _threadPool == 0 && _dispatchCount == 0; + return _transceiver == 0 && _dispatchCount == 0; } void @@ -293,7 +293,7 @@ IceInternal::Connection::waitUntilFinished() // Now we must wait for connection closure. If there is a timeout, // we force the connection closure. // - while(_threadPool) + while(_transceiver) { if(_state != StateClosed && _endpoint->timeout() >= 0) { @@ -412,6 +412,8 @@ IceInternal::Connection::sendRequest(BasicStream* os, Outgoing* out) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(!(out && _endpoint->datagram())); // Twoway requests cannot be datagrams. + if(_exception.get()) { _exception->ice_throw(); @@ -427,15 +429,6 @@ IceInternal::Connection::sendRequest(BasicStream* os, Outgoing* out) requestId = _nextRequestId++; } - // - // Only add to the request map if this is a twoway call. - // - if(out) - { - assert(!_endpoint->datagram()); // Twoway requests cannot be datagrams. - _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); - } - if(_acmTimeout > 0) { _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); @@ -445,7 +438,7 @@ IceInternal::Connection::sendRequest(BasicStream* os, Outgoing* out) try { IceUtil::Mutex::Lock sendSync(_sendMutex); - if(!_threadPool) // Has the transceiver already been closed? + if(!_transceiver) // Has the transceiver already been closed? { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_exception.get()); @@ -524,6 +517,39 @@ IceInternal::Connection::sendRequest(BasicStream* os, Outgoing* out) assert(_exception.get()); _exception->ice_throw(); } + + // + // Only add to the request map if this was a twoway call, and if + // there was no exception above. + // + if(out) + { + auto_ptr<LocalException> exception; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // If there is already an exception set, we call + // finished() directly (below, outside the thread + // synchronization), because it's possible that finished() + // has already been called on the request map. + // + if(_exception.get()) + { + exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(_exception->ice_clone())); + } + else + { + _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); + } + } + + if(exception.get()) + { + out->finished(*exception.get()); + } + } } void @@ -534,6 +560,8 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(!_endpoint->datagram()); // Twoway requests cannot be datagrams, and async implies twoway. + if(_exception.get()) { _exception->ice_throw(); @@ -549,10 +577,6 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt requestId = _nextRequestId++; } - assert(!_endpoint->datagram()); // Twoway requests cannot be datagrams, and async implies twoway. - _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), - pair<const Int, OutgoingAsyncPtr>(requestId, out)); - if(_acmTimeout > 0) { _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); @@ -562,7 +586,7 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt try { IceUtil::Mutex::Lock sendSync(_sendMutex); - if(!_threadPool) // Has the transceiver already been closed? + if(!_transceiver) // Has the transceiver already been closed? { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_exception.get()); @@ -638,6 +662,36 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt assert(_exception.get()); _exception->ice_throw(); } + + // + // Only add to the request map if there was no exception above. + // + auto_ptr<LocalException> exception; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // If there is already an exception set, we call __finished() + // directly (below, outside the thread synchronization), + // because it's possible that __finished() has already been + // called on the request map. + // + if(_exception.get()) + { + exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(_exception->ice_clone())); + } + else + { + _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), + pair<const Int, OutgoingAsyncPtr>(requestId, out)); + } + } + + if(exception.get()) + { + out->__finished(*exception.get()); + } } void @@ -758,7 +812,7 @@ IceInternal::Connection::flushBatchRequest() try { IceUtil::Mutex::Lock sendSync(_sendMutex); - if(!_threadPool) // Has the transceiver already been closed? + if(!_transceiver) // Has the transceiver already been closed? { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_exception.get()); @@ -876,7 +930,7 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag) try { IceUtil::Mutex::Lock sendSync(_sendMutex); - if(!_threadPool) // Has the transceiver already been closed? + if(!_transceiver) // Has the transceiver already been closed? { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_exception.get()); @@ -1415,8 +1469,10 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) closeException = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); } + assert(_transceiver); + _transceiver = 0; assert(_threadPool); - _threadPool = 0; // We don't need the thread pool anymore. + _threadPool = 0; notifyAll(); } @@ -1456,8 +1512,7 @@ IceInternal::Connection::exception(const LocalException& ex) string IceInternal::Connection::toString() const { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - return _transceiver->toString(); + return _transceiverToString; } bool @@ -1484,6 +1539,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance, const ObjectAdapterPtr& adapter) : EventHandler(instance), _transceiver(transceiver), + _transceiverToString(transceiver->toString()), _endpoint(endpoint), _adapter(adapter), _logger(_instance->logger()), // Cached for better performance. @@ -1556,6 +1612,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance, IceInternal::Connection::~Connection() { assert(_state == StateClosed); + assert(!_transceiver); assert(!_threadPool); assert(_dispatchCount == 0); assert(_proxyCount == 0); @@ -1726,8 +1783,10 @@ IceInternal::Connection::setState(State state) // Here we ignore any exceptions in close(). } + assert(_transceiver); + _transceiver = 0; assert(_threadPool); - _threadPool = 0; // We don't need the thread pool anymore. + _threadPool = 0; //notifyAll(); // We notify already below. } else @@ -1766,7 +1825,7 @@ IceInternal::Connection::initiateShutdown() const if(!_endpoint->datagram()) { IceUtil::Mutex::Lock sendSync(_sendMutex); - assert(_threadPool); // The transceiver cannot be closed already. + assert(_transceiver); // The transceiver cannot be closed already. // // Before we shut down, we send a close connection message. diff --git a/cpp/src/Ice/Connection.h b/cpp/src/Ice/Connection.h index a3027721ae0..6a54d641e22 100644 --- a/cpp/src/Ice/Connection.h +++ b/cpp/src/Ice/Connection.h @@ -133,7 +133,8 @@ private: bool closingOK() const; - const TransceiverPtr _transceiver; + TransceiverPtr _transceiver; + const std::string _transceiverToString; const EndpointPtr _endpoint; Ice::ObjectAdapterPtr _adapter; diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index d546a9c6aa2..72cd38182cf 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -103,28 +103,19 @@ IceInternal::Outgoing::invoke() { case Reference::ModeTwoway: { - try - { - _connection->sendRequest(&_os, this); - } - catch(const LocalException& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - _state = StateLocalException; - _exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); - - // - // If soemthing goes wrong during sending, we can - // always retry the request without violating - // "at-most-once", and therefore do not have to wrap - // the exception in NonRepeatable. - // - _exception->ice_throw(); - } + // + // We let all exceptions raised by sending directly + // propagate to the caller, because they can be retried + // without violating "at-most-once". In case of such + // exceptions, the connection object does not call back on + // this object, so we don't need to lock the mutex, keep + // track of state, or save exceptions. + // + _connection->sendRequest(&_os, this); // - // Wait until the request has completed or for a timeout. + // Wait until the request has completed, or until the + // request times out. // bool timedOut = false; @@ -223,12 +214,11 @@ IceInternal::Outgoing::invoke() { // // For oneway and datagram requests, the connection object - // does not call back on this object. Therefore we don't - // need to lock the mutex, we don't need to set the state, - // and we also don't need to save exceptions. Furthermore, - // all exceptions from sending oneways or datagrams can be - // retried without violating "at-most-once", so we let - // exceptions simply propagate directly to the caller. + // never calls back on this object. Therefore we don't + // need to lock the mutex, keep track of state, or save + // exceptions. We simply let all exceptions from sending + // propagate to the caller, because such exceptions can be + // retried without violating "at-most-once". // _connection->sendRequest(&_os, 0); break; @@ -264,148 +254,143 @@ IceInternal::Outgoing::finished(BasicStream& is) assert(_reference->mode == Reference::ModeTwoway); // Can only be called for twoways. - // - // The state might be StateLocalException if there was a timeout - // in invoke(). - // - if(_state <= StateInProgress) - { - _is.swap(is); - Byte status; - _is.read(status); + assert(_state <= StateInProgress); - switch(static_cast<DispatchStatus>(status)) + _is.swap(is); + Byte status; + _is.read(status); + + switch(static_cast<DispatchStatus>(status)) + { + case DispatchOK: { - case DispatchOK: - { - // - // Input and output parameters are always sent in an - // encapsulation, which makes it possible to forward - // oneway requests as blobs. - // - _is.startReadEncaps(); - _state = StateOK; - break; - } - - case DispatchUserException: - { - // - // Input and output parameters are always sent in an - // encapsulation, which makes it possible to forward - // oneway requests as blobs. - // - _is.startReadEncaps(); - _state = StateUserException; - break; - } + // + // Input and output parameters are always sent in an + // encapsulation, which makes it possible to forward + // oneway requests as blobs. + // + _is.startReadEncaps(); + _state = StateOK; + break; + } + + case DispatchUserException: + { + // + // Input and output parameters are always sent in an + // encapsulation, which makes it possible to forward + // oneway requests as blobs. + // + _is.startReadEncaps(); + _state = StateUserException; + break; + } + + case DispatchObjectNotExist: + case DispatchFacetNotExist: + case DispatchOperationNotExist: + { + _state = StateLocalException; + // Don't read the exception members directly into the + // exception. Otherwise if reading fails and raises an + // exception, you will have a memory leak. + Identity ident; + ident.__read(&_is); + vector<string> facet; + _is.read(facet); + string operation; + _is.read(operation); - case DispatchObjectNotExist: - case DispatchFacetNotExist: - case DispatchOperationNotExist: + RequestFailedException* ex; + switch(static_cast<DispatchStatus>(status)) { - _state = StateLocalException; - // Don't read the exception members directly into the - // exception. Otherwise if reading fails and raises an - // exception, you will have a memory leak. - Identity ident; - ident.__read(&_is); - vector<string> facet; - _is.read(facet); - string operation; - _is.read(operation); - - RequestFailedException* ex; - switch(static_cast<DispatchStatus>(status)) + case DispatchObjectNotExist: { - case DispatchObjectNotExist: - { - ex = new ObjectNotExistException(__FILE__, __LINE__); - break; - } - - case DispatchFacetNotExist: - { - ex = new FacetNotExistException(__FILE__, __LINE__); - break; - } - - case DispatchOperationNotExist: - { - ex = new OperationNotExistException(__FILE__, __LINE__); - break; - } - - default: - { - ex = 0; // To keep the compiler from complaining. - assert(false); - break; - } + ex = new ObjectNotExistException(__FILE__, __LINE__); + break; + } + + case DispatchFacetNotExist: + { + ex = new FacetNotExistException(__FILE__, __LINE__); + break; + } + + case DispatchOperationNotExist: + { + ex = new OperationNotExistException(__FILE__, __LINE__); + break; + } + + default: + { + ex = 0; // To keep the compiler from complaining. + assert(false); + break; } - - ex->id = ident; - ex->facet = facet; - ex->operation = operation; - _exception = auto_ptr<LocalException>(ex); - break; } - case DispatchUnknownException: - case DispatchUnknownLocalException: - case DispatchUnknownUserException: + ex->id = ident; + ex->facet = facet; + ex->operation = operation; + _exception = auto_ptr<LocalException>(ex); + break; + } + + case DispatchUnknownException: + case DispatchUnknownLocalException: + case DispatchUnknownUserException: + { + _state = StateLocalException; + // Don't read the exception members directly into the + // exception. Otherwise if reading fails and raises an + // exception, you will have a memory leak. + string unknown; + _is.read(unknown); + + UnknownException* ex; + switch(static_cast<DispatchStatus>(status)) { - _state = StateLocalException; - // Don't read the exception members directly into the - // exception. Otherwise if reading fails and raises an - // exception, you will have a memory leak. - string unknown; - _is.read(unknown); - - UnknownException* ex; - switch(static_cast<DispatchStatus>(status)) + case DispatchUnknownException: { - case DispatchUnknownException: - { - ex = new UnknownException(__FILE__, __LINE__); - break; - } - - case DispatchUnknownLocalException: - { - ex = new UnknownLocalException(__FILE__, __LINE__); - break; - } - - case DispatchUnknownUserException: - { - ex = new UnknownUserException(__FILE__, __LINE__); - break; - } - - default: - { - ex = 0; // To keep the compiler from complaining. - assert(false); - break; - } + ex = new UnknownException(__FILE__, __LINE__); + break; + } + + case DispatchUnknownLocalException: + { + ex = new UnknownLocalException(__FILE__, __LINE__); + break; + } + + case DispatchUnknownUserException: + { + ex = new UnknownUserException(__FILE__, __LINE__); + break; + } + + default: + { + ex = 0; // To keep the compiler from complaining. + assert(false); + break; } - - ex->unknown = unknown; - _exception = auto_ptr<LocalException>(ex); - break; } - default: - { - _state = StateLocalException; - _exception = auto_ptr<LocalException>(new UnknownReplyStatusException(__FILE__, __LINE__)); - break; - } + ex->unknown = unknown; + _exception = auto_ptr<LocalException>(ex); + break; + } + + default: + { + _state = StateLocalException; + _exception = auto_ptr<LocalException>(new UnknownReplyStatusException(__FILE__, __LINE__)); + break; } - - notify(); } + + notify(); } void @@ -415,14 +400,9 @@ IceInternal::Outgoing::finished(const LocalException& ex) assert(_reference->mode == Reference::ModeTwoway); // Can only be called for twoways. - // - // The state might be StateLocalException if there was a timeout - // in invoke(). - // - if(_state <= StateInProgress) - { - _state = StateLocalException; - _exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); - notify(); - } + assert(_state <= StateInProgress); + + _state = StateLocalException; + _exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); + notify(); } diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 8acf39d6a6c..58b31d02626 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -21,6 +21,7 @@ #include <Ice/Properties.h> #include <Ice/LoggerUtil.h> #include <Ice/LocatorInfo.h> +#include <Ice/ProxyFactory.h> using namespace std; using namespace Ice; @@ -34,7 +35,8 @@ void IceInternal::decRef(AMI_Object_ice_invoke* p) { p->__decRef(); } IceInternal::OutgoingAsync::OutgoingAsync() : __is(0), - __os(0) + __os(0), + _cnt(0) { } @@ -47,134 +49,17 @@ IceInternal::OutgoingAsync::~OutgoingAsync() } void -IceInternal::OutgoingAsync::__finished(const LocalException& exc) -{ - if(_reference->locatorInfo) - { - _reference->locatorInfo->clearObjectCache(_reference); - } - -/* - ProxyFactoryPtr proxyFactory = _reference->instance->proxyFactory(); - if(proxyFactory) - { - proxyFactory->checkRetryAfterException(ex, cnt); - } - else - { - ex.ice_throw(); // The communicator is already destroyed, so we cannot retry. - } -*/ - - try - { - ice_exception(exc); - } - catch(const Exception& ex) - { - warning(ex); - } - catch(const std::exception& ex) - { - warning(ex); - } - catch(...) - { - warning(); - } - - assert(_connection); - _connection->decProxyCount(); - _connection = 0; - - assert(__is); - delete __is; - __is = 0; - - assert(__os); - delete __os; - __os = 0; -} - -bool -IceInternal::OutgoingAsync::__timedOut() const -{ - if(_connection->timeout() >= 0) - { - return IceUtil::Time::now() >= _absoluteTimeout; - } - else - { - return false; - } -} - -void -IceInternal::OutgoingAsync::__prepare(const ReferencePtr& ref, const string& operation, OperationMode mode, - const Context& context) -{ - assert(!_reference); - _reference = ref; - - assert(!_connection); - _connection = _reference->getConnection(); - _connection->incProxyCount(); - - assert(!__is); - __is = new BasicStream(_reference->instance.get()); - - assert(!__os); - __os = new BasicStream(_reference->instance.get()); - - _connection->prepareRequest(__os); - _reference->identity.__write(__os); - __os->write(_reference->facet); - __os->write(operation); - __os->write(static_cast<Byte>(mode)); - __os->writeSize(Int(context.size())); - Context::const_iterator p; - for(p = context.begin(); p != context.end(); ++p) - { - __os->write(p->first); - __os->write(p->second); - } - - __os->startWriteEncaps(); -} - -void -IceInternal::OutgoingAsync::__send() -{ - if(_connection->timeout() >= 0) - { - _absoluteTimeout = IceUtil::Time::now() + IceUtil::Time::milliSeconds(_connection->timeout()); - } - - try - { - _connection->sendAsyncRequest(__os, this); - } - catch(const LocalException&) - { - // - // Twoway requests report exceptions using finished(). - // - assert(false); - } -} - -void IceInternal::OutgoingAsync::__finished(BasicStream& is) { DispatchStatus status; - + try { __is->swap(is); Byte b; __is->read(b); status = static_cast<DispatchStatus>(b); - + switch(status) { case DispatchOK: @@ -192,7 +77,7 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) __is->read(ex.operation); throw ex; } - + case DispatchFacetNotExist: { FacetNotExistException ex(__FILE__, __LINE__); @@ -264,20 +149,120 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) warning(); } - assert(_reference); - _reference = 0; + cleanup(); +} - assert(_connection); - _connection->decProxyCount(); - _connection = 0; +void +IceInternal::OutgoingAsync::__finished(const LocalException& exc) +{ + try + { + ice_exception(exc); + } + catch(const Exception& ex) + { + warning(ex); + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + cleanup(); +} + +bool +IceInternal::OutgoingAsync::__timedOut() const +{ + if(_connection && _connection->timeout() >= 0) + { + return IceUtil::Time::now() >= _absoluteTimeout; + } + else + { + return false; + } +} + +void +IceInternal::OutgoingAsync::__prepare(const ReferencePtr& ref, const string& operation, OperationMode mode, + const Context& context) +{ + assert(!_reference); + _reference = ref; + + assert(!_connection); + _connection = _reference->getConnection(); + _connection->incProxyCount(); + + assert(_cnt == 0); - assert(__is); - delete __is; - __is = 0; - - assert(__os); - delete __os; - __os = 0; + assert(!__is); + __is = new BasicStream(_reference->instance.get()); + + assert(!__os); + __os = new BasicStream(_reference->instance.get()); + + _connection->prepareRequest(__os); + _reference->identity.__write(__os); + __os->write(_reference->facet); + __os->write(operation); + __os->write(static_cast<Byte>(mode)); + __os->writeSize(Int(context.size())); + Context::const_iterator p; + for(p = context.begin(); p != context.end(); ++p) + { + __os->write(p->first); + __os->write(p->second); + } + + __os->startWriteEncaps(); +} + +void +IceInternal::OutgoingAsync::__send() +{ + if(_connection->timeout() >= 0) + { + _absoluteTimeout = IceUtil::Time::now() + IceUtil::Time::milliSeconds(_connection->timeout()); + } + + try + { + while(true) + { + try + { + _connection->sendAsyncRequest(__os, this); + break; + } + catch(const LocalException& ex) + { + if(_reference->locatorInfo) + { + _reference->locatorInfo->clearObjectCache(_reference); + } + + ProxyFactoryPtr proxyFactory = _reference->instance->proxyFactory(); + if(proxyFactory) + { + proxyFactory->checkRetryAfterException(ex, _cnt); + } + else + { + ex.ice_throw(); // The communicator is already destroyed, so we cannot retry. + } + } + } + } + catch(const LocalException& ex) + { + __finished(ex); + } } void @@ -311,6 +296,27 @@ IceInternal::OutgoingAsync::warning() const } void +IceInternal::OutgoingAsync::cleanup() +{ + assert(_reference); + _reference = 0; + + assert(_connection); + _connection->decProxyCount(); + _connection = 0; + + _cnt = 0; + + assert(__is); + delete __is; + __is = 0; + + assert(__os); + delete __os; + __os = 0; +} + +void Ice::AMI_Object_ice_invoke::__invoke(const IceInternal::ReferencePtr& ref, const string& operation, OperationMode mode, const vector<Byte>& inParams, const Context& context) { |