diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
commit | a4f93259dc3494d98addf38e69b87eb557d432b3 (patch) | |
tree | d2b78bb5cea24e33dc1b46be22dba6167e96c9ed /cpp/src/Ice/Proxy.cpp | |
parent | Fix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff) | |
download | ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2 ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip |
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'cpp/src/Ice/Proxy.cpp')
-rw-r--r-- | cpp/src/Ice/Proxy.cpp | 1212 |
1 files changed, 260 insertions, 952 deletions
diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index fa5c73b6088..8f4273a58ab 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -15,8 +15,8 @@ #include <Ice/Outgoing.h> #include <Ice/OutgoingAsync.h> #include <Ice/ConnectRequestHandler.h> +#include <Ice/CollocatedRequestHandler.h> #include <Ice/ConnectionRequestHandler.h> -#include <Ice/Direct.h> #include <Ice/Reference.h> #include <Ice/EndpointI.h> #include <Ice/Instance.h> @@ -135,26 +135,34 @@ IceProxy::Ice::Object::ice_toString() const bool IceProxy::Ice::Object::ice_isA(const string& typeId, const Context* context) { - InvocationObserver __observer(this, ice_isA_name, context); - int __cnt = 0; - while(true) + __checkTwowayOnly(ice_isA_name); + Outgoing __og(this, ice_isA_name, ::Ice::Nonmutating, context); + try + { + BasicStream* __os = __og.startWriteParams(DefaultFormat); + __os->write(typeId, false); + __og.endWriteParams(); + } + catch(const ::Ice::LocalException& __ex) + { + __og.abort(__ex); + } + if(!__og.invoke()) { - Handle< ::IceDelegate::Ice::Object> __del; try { - __checkTwowayOnly(ice_isA_name); - __del = __getDelegate(false); - return __del->ice_isA(typeId, context, __observer); + __og.throwUserException(); } - catch(const LocalExceptionWrapper& __ex) + catch(const ::Ice::UserException& __ex) { - __handleExceptionWrapperRelaxed(__del, __ex, true, __cnt, __observer); - } - catch(const LocalException& __ex) - { - __handleException(__del, __ex, true, __cnt, __observer); + throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); } } + bool __ret; + BasicStream* __is = __og.startReadParams(); + __is->read(__ret); + __og.endReadParams(); + return __ret; } Ice::AsyncResultPtr @@ -163,8 +171,8 @@ IceProxy::Ice::Object::begin_ice_isA(const string& typeId, const ::IceInternal::CallbackBasePtr& del, const ::Ice::LocalObjectPtr& cookie) { - OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_isA_name, del, cookie); __checkAsyncTwowayOnly(ice_isA_name); + OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_isA_name, del, cookie); try { __result->__prepare(ice_isA_name, Nonmutating, ctx); @@ -173,7 +181,7 @@ IceProxy::Ice::Object::begin_ice_isA(const string& typeId, __result->__endWriteParams(); __result->__invoke(true); } - catch(const LocalException& __ex) + catch(const Exception& __ex) { __result->__invokeExceptionAsync(__ex); } @@ -185,54 +193,44 @@ IceProxy::Ice::Object::end_ice_isA(const AsyncResultPtr& __result) { AsyncResult::__check(__result, this, ice_isA_name); bool __ok = __result->__wait(); - try + if(!__ok) { - if(!__ok) + try { - try - { - __result->__throwUserException(); - } - catch(const UserException& __ex) - { - throw UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); - } + __result->__throwUserException(); + } + catch(const UserException& __ex) + { + throw UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); } - bool __ret; - IceInternal::BasicStream* __is = __result->__startReadParams(); - __is->read(__ret); - __result->__endReadParams(); - return __ret; - } - catch(const ::Ice::LocalException& ex) - { - __result->__getObserver().failed(ex.ice_name()); - throw; } + bool __ret; + IceInternal::BasicStream* __is = __result->__startReadParams(); + __is->read(__ret); + __result->__endReadParams(); + return __ret; } void IceProxy::Ice::Object::ice_ping(const Context* context) { - InvocationObserver __observer(this, ice_ping_name, context); - int __cnt = 0; - while(true) + Outgoing __og(this, ice_ping_name, ::Ice::Nonmutating, context); + __og.writeEmptyParams(); + bool __ok = __og.invoke(); + if(__og.hasResponse()) { - Handle<IceDelegate::Ice::Object> __del; - try - { - __del = __getDelegate(false); - __del->ice_ping(context, __observer); - return; - } - catch(const LocalExceptionWrapper& __ex) - { - __handleExceptionWrapperRelaxed(__del, __ex, true, __cnt, __observer); - } - catch(const LocalException& __ex) + if(!__ok) { - __handleException(__del, __ex, true, __cnt, __observer); + try + { + __og.throwUserException(); + } + catch(const ::Ice::UserException& __ex) + { + throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); + } } + __og.readEmptyParams(); } } @@ -248,7 +246,7 @@ IceProxy::Ice::Object::begin_ice_ping(const Context* ctx, __result->__writeEmptyParams(); __result->__invoke(true); } - catch(const LocalException& __ex) + catch(const Exception& __ex) { __result->__invokeExceptionAsync(__ex); } @@ -264,51 +262,49 @@ IceProxy::Ice::Object::end_ice_ping(const AsyncResultPtr& __result) vector<string> IceProxy::Ice::Object::ice_ids(const Context* context) { - InvocationObserver __observer(this, ice_ids_name, context); - int __cnt = 0; - while(true) + __checkTwowayOnly(ice_ids_name); + Outgoing __og(this, ice_ids_name, ::Ice::Nonmutating, context); + __og.writeEmptyParams(); + if(!__og.invoke()) { - Handle<IceDelegate::Ice::Object> __del; try { - __checkTwowayOnly(ice_ids_name); - __del = __getDelegate(false); - return __del->ice_ids(context, __observer); + __og.throwUserException(); } - catch(const LocalExceptionWrapper& __ex) + catch(const ::Ice::UserException& __ex) { - __handleExceptionWrapperRelaxed(__del, __ex, true, __cnt, __observer); - } - catch(const LocalException& __ex) - { - __handleException(__del, __ex, true, __cnt, __observer); + throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); } } + vector<string> __ret; + BasicStream* __is = __og.startReadParams(); + __is->read(__ret, false); + __og.endReadParams(); + return __ret; } string IceProxy::Ice::Object::ice_id(const Context* context) { - InvocationObserver __observer(this, ice_id_name, context); - int __cnt = 0; - while(true) + __checkTwowayOnly(ice_id_name); + Outgoing __og(this, ice_id_name, ::Ice::Nonmutating, context); + __og.writeEmptyParams(); + if(!__og.invoke()) { - Handle<IceDelegate::Ice::Object> __del; try { - __checkTwowayOnly(ice_id_name); - __del = __getDelegate(false); - return __del->ice_id(context, __observer); - } - catch(const LocalExceptionWrapper& __ex) - { - __handleExceptionWrapperRelaxed(__del, __ex, true, __cnt, __observer); + __og.throwUserException(); } - catch(const LocalException& __ex) + catch(const ::Ice::UserException& __ex) { - __handleException(__del, __ex, true, __cnt, __observer); + throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); } } + string __ret; + BasicStream* __is = __og.startReadParams(); + __is->read(__ret, false); + __og.endReadParams(); + return __ret; } AsyncResultPtr @@ -316,15 +312,15 @@ IceProxy::Ice::Object::begin_ice_ids(const Context* ctx, const ::IceInternal::CallbackBasePtr& del, const ::Ice::LocalObjectPtr& cookie) { - OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_ids_name, del, cookie); __checkAsyncTwowayOnly(ice_ids_name); + OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_ids_name, del, cookie); try { __result->__prepare(ice_ids_name, Nonmutating, ctx); __result->__writeEmptyParams(); __result->__invoke(true); } - catch(const LocalException& __ex) + catch(const Exception& __ex) { __result->__invokeExceptionAsync(__ex); } @@ -336,30 +332,22 @@ IceProxy::Ice::Object::end_ice_ids(const AsyncResultPtr& __result) { AsyncResult::__check(__result, this, ice_ids_name); bool __ok = __result->__wait(); - try + if(!__ok) { - if(!__ok) + try { - try - { - __result->__throwUserException(); - } - catch(const UserException& __ex) - { - throw UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); - } + __result->__throwUserException(); + } + catch(const UserException& __ex) + { + throw UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); } - vector<string> __ret; - IceInternal::BasicStream* __is = __result->__startReadParams(); - __is->read(__ret); - __result->__endReadParams(); - return __ret; - } - catch(const ::Ice::LocalException& ex) - { - __result->__getObserver().failed(ex.ice_name()); - throw; } + vector<string> __ret; + IceInternal::BasicStream* __is = __result->__startReadParams(); + __is->read(__ret); + __result->__endReadParams(); + return __ret; } AsyncResultPtr @@ -367,15 +355,15 @@ IceProxy::Ice::Object::begin_ice_id(const Context* ctx, const ::IceInternal::CallbackBasePtr& del, const ::Ice::LocalObjectPtr& cookie) { - OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_id_name, del, cookie); __checkAsyncTwowayOnly(ice_id_name); + OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_id_name, del, cookie); try { __result->__prepare(ice_id_name, Nonmutating, ctx); __result->__writeEmptyParams(); __result->__invoke(true); } - catch(const LocalException& __ex) + catch(const Exception& __ex) { __result->__invokeExceptionAsync(__ex); } @@ -387,31 +375,22 @@ IceProxy::Ice::Object::end_ice_id(const AsyncResultPtr& __result) { AsyncResult::__check(__result, this, ice_id_name); bool __ok = __result->__wait(); - try + if(!__ok) { - if(!__ok) + try { - try - { - __result->__throwUserException(); - } - catch(const UserException& __ex) - { - throw UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); - } + __result->__throwUserException(); + } + catch(const UserException& __ex) + { + throw UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); } - string __ret; - IceInternal::BasicStream* __is = __result->__startReadParams(); - __is->read(__ret); - __result->__endReadParams(); - return __ret; - - } - catch(const ::Ice::LocalException& ex) - { - __result->__getObserver().failed(ex.ice_name()); - throw; } + string __ret; + IceInternal::BasicStream* __is = __result->__startReadParams(); + __is->read(__ret); + __result->__endReadParams(); + return __ret; } bool @@ -512,18 +491,10 @@ IceProxy::Ice::Object::end_ice_invoke(vector<Byte>& outEncaps, const AsyncResult bool ok = __result->__wait(); if(_reference->getMode() == Reference::ModeTwoway) { - try - { - const Byte* v; - Int sz; - __result->__readParamEncaps(v, sz); - vector<Byte>(v, v + sz).swap(outEncaps); - } - catch(const ::Ice::LocalException& ex) - { - __result->__getObserver().failed(ex.ice_name()); - throw; - } + const Byte* v; + Int sz; + __result->__readParamEncaps(v, sz); + vector<Byte>(v, v + sz).swap(outEncaps); } return ok; } @@ -535,33 +506,24 @@ IceProxy::Ice::Object::ice_invoke(const string& operation, vector<Byte>& outEncaps, const Context* context) { - InvocationObserver __observer(this, operation, context); - int __cnt = 0; - while(true) + Outgoing __og(this, operation, mode, context); + try { - Handle< ::IceDelegate::Ice::Object> __del; - try - { - __del = __getDelegate(false); - return __del->ice_invoke(operation, mode, inEncaps, outEncaps, context, __observer); - } - catch(const LocalExceptionWrapper& __ex) - { - bool canRetry = mode == Nonmutating || mode == Idempotent; - if(canRetry) - { - __handleExceptionWrapperRelaxed(__del, __ex, true, __cnt, __observer); - } - else - { - __handleExceptionWrapper(__del, __ex, __observer); - } - } - catch(const LocalException& __ex) - { - __handleException(__del, __ex, true, __cnt, __observer); - } + __og.writeParamEncaps(inEncaps.first, static_cast<Int>(inEncaps.second - inEncaps.first)); + } + catch(const ::Ice::LocalException& __ex) + { + __og.abort(__ex); } + bool ok = __og.invoke(); + if(_reference->getMode() == Reference::ModeTwoway) + { + const Byte* v; + Int sz; + __og.readParamEncaps(v, sz); + vector<Byte>(v, v + sz).swap(outEncaps); + } + return ok; } bool @@ -628,7 +590,7 @@ IceProxy::Ice::Object::begin_ice_invoke(const string& operation, __result->__writeParamEncaps(inEncaps.first, static_cast<Int>(inEncaps.second - inEncaps.first)); __result->__invoke(true); } - catch(const LocalException& __ex) + catch(const Exception& __ex) { __result->__invokeExceptionAsync(__ex); } @@ -642,17 +604,9 @@ IceProxy::Ice::Object::___end_ice_invoke(pair<const Byte*, const Byte*>& outEnca bool ok = __result->__wait(); if(_reference->getMode() == Reference::ModeTwoway) { - try - { - Int sz; - __result->__readParamEncaps(outEncaps.first, sz); - outEncaps.second = outEncaps.first + sz; - } - catch(const ::Ice::LocalException& ex) - { - __result->__getObserver().failed(ex.ice_name()); - throw; - } + Int sz; + __result->__readParamEncaps(outEncaps.first, sz); + outEncaps.second = outEncaps.first + sz; } return ok; } @@ -1147,20 +1101,36 @@ IceProxy::Ice::Object::ice_getConnectionId() const ConnectionPtr IceProxy::Ice::Object::ice_getConnection() { - InvocationObserver __observer(this, "ice_getConnection", 0); - int __cnt = 0; + InvocationObserver observer(this, "ice_getConnection", 0); + int cnt = 0; while(true) { - Handle< ::IceDelegate::Ice::Object> __del; + RequestHandlerPtr handler; try { - __del = __getDelegate(false); - return __del->__getRequestHandler()->getConnection(true); // Wait for the connection to be established. - + handler = __getRequestHandler(false); + return handler->getConnection(true); // Wait for the connection to be established. } - catch(const LocalException& __ex) + catch(const IceInternal::RetryException&) { - __handleException(__del, __ex, true, __cnt, __observer); + __setRequestHandler(handler, 0); // Clear request handler and retry. + } + catch(const Exception& ex) + { + try + { + int interval = __handleException(ex, handler, Idempotent, false, cnt); + observer.retried(); + if(interval > 0) + { + IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval)); + } + } + catch(const Exception& exc) + { + observer.failed(exc.ice_name()); + throw; + } } } } @@ -1168,17 +1138,17 @@ IceProxy::Ice::Object::ice_getConnection() ConnectionPtr IceProxy::Ice::Object::ice_getCachedConnection() const { - Handle< ::IceDelegate::Ice::Object> __del; + RequestHandlerPtr __handler; { IceUtil::Mutex::Lock sync(_mutex); - __del = _delegate; + __handler = _requestHandler; } - if(__del) + if(__handler) { try { - return __del->__getRequestHandler()->getConnection(false); + return __handler->getConnection(false); } catch(const LocalException&) { @@ -1190,22 +1160,8 @@ IceProxy::Ice::Object::ice_getCachedConnection() const void IceProxy::Ice::Object::ice_flushBatchRequests() { - // - // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch - // requests were queued with the connection, they would be lost without being noticed. - // - InvocationObserver __observer(this, ice_flushBatchRequests_name, 0); - Handle< ::IceDelegate::Ice::Object> __del; - int __cnt = -1; // Don't retry. - try - { - __del = __getDelegate(false); - __del->ice_flushBatchRequests(__observer); - } - catch(const LocalException& __ex) - { - __handleException(__del, __ex, true, __cnt, __observer); - } + BatchOutgoing __og(this, ice_flushBatchRequests_name); + __og.invoke(); } bool @@ -1236,7 +1192,7 @@ IceProxy::Ice::Object::begin_ice_flushBatchRequestsInternal(const ::IceInternal: { __result->__invoke(); } - catch(const LocalException& __ex) + catch(const Exception& __ex) { __result->__invokeExceptionAsync(__ex); } @@ -1259,80 +1215,44 @@ IceProxy::Ice::Object::__hash() const void IceProxy::Ice::Object::__copyFrom(const ObjectPrx& from) { - ReferencePtr ref; - Handle< ::IceDelegateD::Ice::Object> delegateD; - Handle< ::IceDelegateM::Ice::Object> delegateM; - - { - IceUtil::Mutex::Lock sync(from->_mutex); - - ref = from->_reference; - delegateD = dynamic_cast< ::IceDelegateD::Ice::Object*>(from->_delegate.get()); - delegateM = dynamic_cast< ::IceDelegateM::Ice::Object*>(from->_delegate.get()); - } - - // - // No need to synchronize "*this", as this operation is only - // called upon initialization. - // - - assert(!_reference); - assert(!_delegate); - - _reference = ref; - - if(_reference->getCacheConnection()) - { - // - // The _delegate attribute is only used if "cache connection" - // is enabled. If it's not enabled, we don't keep track of the - // delegate -- a new delegate is created for each invocations. - // - - if(delegateD) - { - Handle< ::IceDelegateD::Ice::Object> delegate = __createDelegateD(); - delegate->__copyFrom(delegateD); - _delegate = delegate; - } - else if(delegateM) - { - Handle< ::IceDelegateM::Ice::Object> delegate = __createDelegateM(); - delegate->__copyFrom(delegateM); - _delegate = delegate; - } - } + IceUtil::Mutex::Lock sync(from->_mutex); + _reference = from->_reference; + _requestHandler = from->_requestHandler; } int -IceProxy::Ice::Object::__handleException(const ::IceInternal::Handle< ::IceDelegate::Ice::Object>& delegate, - const LocalException& ex, - bool sleep, - int& cnt, - InvocationObserver& observer) +IceProxy::Ice::Object::__handleException(const Exception& ex, + const RequestHandlerPtr& handler, + OperationMode mode, + bool sent, + int& cnt) { + __setRequestHandler(handler, 0); // Clear the request handler + // - // Only _delegate needs to be mutex protected here. + // We only retry local exception, system exceptions aren't retried. // + // A CloseConnectionException indicates graceful server shutdown, and is therefore + // always repeatable without violating "at-most-once". That's because by sending a + // close connection message, the server guarantees that all outstanding requests + // can safely be repeated. + // + // An ObjectNotExistException can always be retried as well without violating + // "at-most-once" (see the implementation of the checkRetryAfterException method + // of the ProxyFactory class for the reasons why it can be useful). + // + // If the request didn't get sent or if it's non-mutating or idempotent it can + // also always be retried if the retry count isn't reached. + // + const LocalException* localEx = dynamic_cast<const LocalException*>(&ex); + if(localEx && (!sent || + mode == Nonmutating || mode == Idempotent || + dynamic_cast<const CloseConnectionException*>(&ex) || + dynamic_cast<const ObjectNotExistException*>(&ex))) { - IceUtil::Mutex::Lock sync(_mutex); - if(delegate.get() == _delegate.get()) - { - _delegate = 0; - } - } - - try - { - if(cnt == -1) // Don't retry if the retry count is -1. - { - ex.ice_throw(); - } - - int interval = 0; try { - interval = _reference->getInstance()->proxyFactory()->checkRetryAfterException(ex, _reference, sleep, cnt); + return _reference->getInstance()->proxyFactory()->checkRetryAfterException(*localEx, _reference, cnt); } catch(const CommunicatorDestroyedException&) { @@ -1341,61 +1261,12 @@ IceProxy::Ice::Object::__handleException(const ::IceInternal::Handle< ::IceDeleg // ex.ice_throw(); } - observer.retried(); - return interval; - } - catch(const ::Ice::LocalException& ex) - { - observer.failed(ex.ice_name()); - throw; - } - return 0; // Keep the compiler happy. -} - -int -IceProxy::Ice::Object::__handleExceptionWrapper(const ::IceInternal::Handle< ::IceDelegate::Ice::Object>& delegate, - const LocalExceptionWrapper& ex, - InvocationObserver& observer) -{ - { - IceUtil::Mutex::Lock sync(_mutex); - if(delegate.get() == _delegate.get()) - { - _delegate = 0; - } - } - - if(!ex.retry()) - { - observer.failed(ex.get()->ice_name()); - ex.get()->ice_throw(); - } - - return 0; -} - -int -IceProxy::Ice::Object::__handleExceptionWrapperRelaxed(const ::IceInternal::Handle< ::IceDelegate::Ice::Object>& del, - const LocalExceptionWrapper& ex, - bool sleep, - int& cnt, - InvocationObserver& observer) -{ - if(!ex.retry()) - { - return __handleException(del, *ex.get(), sleep, cnt, observer); } else { - { - IceUtil::Mutex::Lock sync(_mutex); - if(del.get() == _delegate.get()) - { - _delegate = 0; - } - } - return 0; + ex.ice_throw(); // Retry could break at-most-once semantics, don't retry. } + return 0; // Keep the compiler happy. } void @@ -1427,166 +1298,15 @@ IceProxy::Ice::Object::__checkAsyncTwowayOnly(const string& name) const } void -IceProxy::Ice::Object::__end(const ::Ice::AsyncResultPtr& __result, const std::string& operation) const -{ - AsyncResult::__check(__result, this, operation); - bool __ok = __result->__wait(); - if(_reference->getMode() == Reference::ModeTwoway) - { - try - { - if(!__ok) - { - try - { - __result->__throwUserException(); - } - catch(const UserException& __ex) - { - throw UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); - } - } - __result->__readEmptyParams(); - } - catch(const ::Ice::LocalException& ex) - { - __result->__getObserver().failed(ex.ice_name()); - throw; - } - } -} - -namespace IceProxy -{ - -namespace Ice -{ - -ostream& -operator<<(ostream& os, const ::IceProxy::Ice::Object& p) -{ - return os << p.ice_toString(); -} - -} - -} - -Handle< ::IceDelegate::Ice::Object> -IceProxy::Ice::Object::__getDelegate(bool ami) -{ - if(_reference->getCacheConnection()) - { - IceUtil::Mutex::Lock sync(_mutex); - if(_delegate) - { - return _delegate; - } - _delegate = createDelegate(true); // Connect asynchrously to avoid blocking with the proxy mutex locked. - return _delegate; - } - else - { - const Reference::Mode mode = _reference->getMode(); - return createDelegate(ami || mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram); - } -} - -void -IceProxy::Ice::Object::__setRequestHandler(const Handle< ::IceDelegate::Ice::Object>& delegate, - const ::IceInternal::RequestHandlerPtr& handler) -{ - if(_reference->getCacheConnection()) - { - IceUtil::Mutex::Lock sync(_mutex); - if(_delegate.get() == delegate.get()) - { - if(dynamic_cast< ::IceDelegateM::Ice::Object*>(_delegate.get())) - { - _delegate = __createDelegateM(); - _delegate->__setRequestHandler(handler); - } - else if(dynamic_cast< ::IceDelegateD::Ice::Object*>(_delegate.get())) - { - _delegate = __createDelegateD(); - _delegate->__setRequestHandler(handler); - } - } - } -} - -Handle< ::IceDelegateM::Ice::Object> -IceProxy::Ice::Object::__createDelegateM() -{ - return Handle< ::IceDelegateM::Ice::Object>(new ::IceDelegateM::Ice::Object); -} - -Handle< ::IceDelegateD::Ice::Object> -IceProxy::Ice::Object::__createDelegateD() -{ - return Handle< ::IceDelegateD::Ice::Object>(new ::IceDelegateD::Ice::Object); -} - -IceProxy::Ice::Object* -IceProxy::Ice::Object::__newInstance() const -{ - return new Object; -} - -Handle< ::IceDelegate::Ice::Object> -IceProxy::Ice::Object::createDelegate(bool async) -{ - if(_reference->getCollocationOptimized()) - { - ObjectAdapterPtr adapter = _reference->getInstance()->objectAdapterFactory()->findObjectAdapter(this); - if(adapter) - { - Handle< ::IceDelegateD::Ice::Object> d = __createDelegateD(); - d->setup(_reference, adapter); - return d; - } - } - - Handle< ::IceDelegateM::Ice::Object> d = __createDelegateM(); - d->setup(_reference, this, async); - return d; -} - -void -IceProxy::Ice::Object::setup(const ReferencePtr& ref) +IceProxy::Ice::Object::__invoke(Outgoing& __og) const { // - // No need to synchronize "*this", as this operation is only - // called upon initialization. + // Helper for operations without out/return parameters and user + // exceptions. // - assert(!_reference); - assert(!_delegate); - - _reference = ref; -} - -IceDelegateM::Ice::Object::~Object() -{ -} - -bool -IceDelegateM::Ice::Object::ice_isA(const string& __id, const Context* context, InvocationObserver& observer) -{ - Outgoing __og(__handler.get(), ice_isA_name, ::Ice::Nonmutating, context, observer); - try - { - BasicStream* __os = __og.startWriteParams(DefaultFormat); - __os->write(__id, false); - __og.endWriteParams(); - } - catch(const ::Ice::LocalException& __ex) - { - __og.abort(__ex); - } - bool __ret; bool __ok = __og.invoke(); - try + if(__og.hasResponse()) { if(!__ok) { @@ -1596,556 +1316,144 @@ IceDelegateM::Ice::Object::ice_isA(const string& __id, const Context* context, I } catch(const ::Ice::UserException& __ex) { - throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); + ::Ice::UnknownUserException __uue(__FILE__, __LINE__, __ex.ice_name()); + throw __uue; } } - BasicStream* __is = __og.startReadParams(); - __is->read(__ret); - __og.endReadParams(); + __og.readEmptyParams(); } - catch(const ::Ice::LocalException& __ex) - { - throw ::IceInternal::LocalExceptionWrapper(__ex, false); - } - return __ret; } void -IceDelegateM::Ice::Object::ice_ping(const Context* context, InvocationObserver& observer) -{ - Outgoing __og(__handler.get(), ice_ping_name, ::Ice::Nonmutating, context, observer); - __og.writeEmptyParams(); - bool __ok = __og.invoke(); - if(__og.hasResponse()) - { - try - { - if(!__ok) - { - try - { - __og.throwUserException(); - } - catch(const ::Ice::UserException& __ex) - { - throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); - } - } - __og.readEmptyParams(); - } - catch(const ::Ice::LocalException& __ex) - { - throw ::IceInternal::LocalExceptionWrapper(__ex, false); - } - } -} - -vector<string> -IceDelegateM::Ice::Object::ice_ids(const Context* context, InvocationObserver& observer) +IceProxy::Ice::Object::__end(const ::Ice::AsyncResultPtr& __result, const std::string& operation) const { - Outgoing __og(__handler.get(), ice_ids_name, ::Ice::Nonmutating, context, observer); - __og.writeEmptyParams(); - vector<string> __ret; - bool __ok = __og.invoke(); - try + AsyncResult::__check(__result, this, operation); + bool __ok = __result->__wait(); + if(_reference->getMode() == Reference::ModeTwoway) { if(!__ok) { try { - __og.throwUserException(); + __result->__throwUserException(); } - catch(const ::Ice::UserException& __ex) + catch(const UserException& __ex) { - throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); + throw UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); } } - BasicStream* __is = __og.startReadParams(); - __is->read(__ret, false); - __og.endReadParams(); + __result->__readEmptyParams(); } - catch(const ::Ice::LocalException& __ex) - { - throw ::IceInternal::LocalExceptionWrapper(__ex, false); - } - return __ret; } -string -IceDelegateM::Ice::Object::ice_id(const Context* context, InvocationObserver& observer) +namespace IceProxy { - Outgoing __og(__handler.get(), ice_id_name, ::Ice::Nonmutating, context, observer); - __og.writeEmptyParams(); - string __ret; - bool __ok = __og.invoke(); - try - { - if(!__ok) - { - try - { - __og.throwUserException(); - } - catch(const ::Ice::UserException& __ex) - { - throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); - } - } - BasicStream* __is = __og.startReadParams(); - __is->read(__ret, false); - __og.endReadParams(); - } - catch(const ::Ice::LocalException& __ex) - { - throw ::IceInternal::LocalExceptionWrapper(__ex, false); - } - return __ret; -} - -bool -IceDelegateM::Ice::Object::ice_invoke(const string& operation, - OperationMode mode, - const pair<const Byte*, const Byte*>& inEncaps, - vector<Byte>& outEncaps, - const Context* context, - InvocationObserver& observer) -{ - Outgoing __og(__handler.get(), operation, mode, context, observer); - try - { - __og.writeParamEncaps(inEncaps.first, static_cast<Int>(inEncaps.second - inEncaps.first)); - } - catch(const ::Ice::LocalException& __ex) - { - __og.abort(__ex); - } - bool ok = __og.invoke(); - if(__handler->getReference()->getMode() == Reference::ModeTwoway) - { - try - { - const Byte* v; - Int sz; - __og.readParamEncaps(v, sz); - vector<Byte>(v, v + sz).swap(outEncaps); - } - catch(const ::Ice::LocalException& __ex) - { - throw ::IceInternal::LocalExceptionWrapper(__ex, false); - } - } - return ok; -} -void -IceDelegateM::Ice::Object::ice_flushBatchRequests(InvocationObserver& observer) +namespace Ice { - BatchOutgoing __og(__handler.get(), observer); - __og.invoke(); -} -RequestHandlerPtr -IceDelegateM::Ice::Object::__getRequestHandler() const +ostream& +operator<<(ostream& os, const ::IceProxy::Ice::Object& p) { - return __handler; + return os << p.ice_toString(); } -void -IceDelegateM::Ice::Object::__setRequestHandler(const RequestHandlerPtr& handler) -{ - __handler = handler; } -void -IceDelegateM::Ice::Object::__copyFrom(const ::IceInternal::Handle< ::IceDelegateM::Ice::Object>& from) -{ - // - // No need to synchronize "from", as the delegate is immutable - // after creation. - // - - // - // No need to synchronize "*this", as this operation is only - // called upon initialization. - // - - assert(!__handler); - - __handler = from->__handler; } -void -IceDelegateM::Ice::Object::setup(const ReferencePtr& ref, const ::Ice::ObjectPrx& proxy, bool async) +::IceInternal::RequestHandlerPtr +IceProxy::Ice::Object::__getRequestHandler(bool async) { - // - // No need to synchronize "*this", as this operation is only - // called upon initialization. - // - - assert(!__handler); - - if(async) - { - IceInternal::ConnectRequestHandlerPtr handler = new ::IceInternal::ConnectRequestHandler(ref, proxy, this); - __handler = handler->connect(); - } - else - { - __handler = new ::IceInternal::ConnectionRequestHandler(ref, proxy); - } -} - -bool -IceDelegateD::Ice::Object::ice_isA(const string& __id, const Context* context, InvocationObserver& /*observer*/) -{ - class DirectI : public Direct - { - public: - - DirectI(bool& __result, const string& __id, const Current& __current) : - Direct(__current), - _result(__result), - _id(__id) - { - } - - virtual ::Ice::DispatchStatus - run(::Ice::Object* object) - { - _result = object->ice_isA(_id, _current); - return DispatchOK; - } - - private: - - bool& _result; - const string& _id; - }; - - Current __current; - __initCurrent(__current, "ice_isA", ::Ice::Nonmutating, context); - bool __result; - - try + if(_reference->getCacheConnection()) { - DirectI __direct(__result, __id, __current); - - try - { - __direct.getServant()->__collocDispatch(__direct); - } - catch(const ::std::exception& __ex) - { - __direct.destroy(); - LocalExceptionWrapper::throwWrapper(__ex); - } - catch(...) + IceUtil::Mutex::Lock sync(_mutex); + if(_requestHandler) { - __direct.destroy(); - throw UnknownException(__FILE__, __LINE__, "unknown c++ exception"); + return _requestHandler; } - __direct.destroy(); - } - catch(const LocalExceptionWrapper&) - { - throw; + _requestHandler = createRequestHandler(true); // async = true to avoid blocking with the proxy mutex locked. + return _requestHandler; } - catch(const ::std::exception& __ex) - { - LocalExceptionWrapper::throwWrapper(__ex); - } - catch(...) - { - throw LocalExceptionWrapper(UnknownException(__FILE__, __LINE__, "unknown c++ exception"), false); - } - return __result; + + const Reference::Mode mode = _reference->getMode(); + return createRequestHandler(async || mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram); } void -IceDelegateD::Ice::Object::ice_ping(const ::Ice::Context* context, InvocationObserver&) +IceProxy::Ice::Object::__setRequestHandler(const ::IceInternal::RequestHandlerPtr& previous, + const ::IceInternal::RequestHandlerPtr& handler) { - class DirectI : public Direct - { - public: - - DirectI(const Current& __current) : - Direct(__current) - { - } - - virtual ::Ice::DispatchStatus - run(::Ice::Object* object) - { - object->ice_ping(_current); - return DispatchOK; - } - }; - - Current __current; - __initCurrent(__current, "ice_ping", ::Ice::Nonmutating, context); - - try + if(_reference->getCacheConnection()) { - DirectI __direct(__current); - - try - { - __direct.getServant()->__collocDispatch(__direct); - } - catch(const ::std::exception& __ex) + IceUtil::Mutex::Lock sync(_mutex); + if(previous.get() == _requestHandler.get()) { - __direct.destroy(); - LocalExceptionWrapper::throwWrapper(__ex); + _requestHandler = handler; } - catch(...) + else if(previous && _requestHandler) { - __direct.destroy(); - throw UnknownException(__FILE__, __LINE__, "unknown c++ exception"); + try + { + // + // If both request handlers point to the same connection, we also + // update the request handler. See bug ICE-5489 for reasons why + // this can be useful. + // + if(previous->getConnection(false) == _requestHandler->getConnection(false)) + { + _requestHandler = handler; + } + } + catch(const Exception&) + { + // Ignore + } } - __direct.destroy(); - } - catch(const LocalExceptionWrapper&) - { - throw; - } - catch(const ::std::exception& __ex) - { - LocalExceptionWrapper::throwWrapper(__ex); - } - catch(...) - { - throw LocalExceptionWrapper(UnknownException(__FILE__, __LINE__, "unknown c++ exception"), false); } } -vector<string> -IceDelegateD::Ice::Object::ice_ids(const ::Ice::Context* context, InvocationObserver&) +IceProxy::Ice::Object* +IceProxy::Ice::Object::__newInstance() const { - class DirectI : public Direct - { - public: - - DirectI(vector<string>& __result, const Current& __current) : - Direct(__current), - _result(__result) - { - } - - virtual ::Ice::DispatchStatus - run(::Ice::Object* object) - { - _result = object->ice_ids(_current); - return DispatchOK; - } - - private: - - vector<string>& _result; - }; - - Current __current; - __initCurrent(__current, "ice_ids", ::Ice::Nonmutating, context); - vector<string> __result; - - try - { - DirectI __direct(__result, __current); - - try - { - __direct.getServant()->__collocDispatch(__direct); - } - catch(const ::std::exception& __ex) - { - __direct.destroy(); - LocalExceptionWrapper::throwWrapper(__ex); - } - catch(...) - { - __direct.destroy(); - throw UnknownException(__FILE__, __LINE__, "unknown c++ exception"); - } - __direct.destroy(); - } - catch(const LocalExceptionWrapper&) - { - throw; - } - catch(const ::std::exception& __ex) - { - LocalExceptionWrapper::throwWrapper(__ex); - } - catch(...) - { - throw LocalExceptionWrapper(UnknownException(__FILE__, __LINE__, "unknown c++ exception"), false); - } - return __result; + return new Object; } -string -IceDelegateD::Ice::Object::ice_id(const ::Ice::Context* context, InvocationObserver&) +RequestHandlerPtr +IceProxy::Ice::Object::createRequestHandler(bool async) { - class DirectI : public Direct - { - public: - - DirectI(string& __result, const Current& __current) : - Direct(__current), - _result(__result) - { - } - - virtual ::Ice::DispatchStatus - run(::Ice::Object* object) - { - _result = object->ice_id(_current); - return DispatchOK; - } - - private: - - string& _result; - }; - - Current __current; - __initCurrent(__current, "ice_id", ::Ice::Nonmutating, context); - string __result; - - try + if(_reference->getCollocationOptimized()) { - DirectI __direct(__result, __current); - - try - { - __direct.getServant()->__collocDispatch(__direct); - } - catch(const ::std::exception& __ex) - { - __direct.destroy(); - LocalExceptionWrapper::throwWrapper(__ex); - } - catch(...) + ObjectAdapterPtr adapter = _reference->getInstance()->objectAdapterFactory()->findObjectAdapter(this); + if(adapter) { - __direct.destroy(); - throw UnknownException(__FILE__, __LINE__, "unknown c++ exception"); + return new ::IceInternal::CollocatedRequestHandler(_reference, adapter); } - __direct.destroy(); - } - catch(const LocalExceptionWrapper&) - { - throw; } - catch(const ::std::exception& __ex) - { - LocalExceptionWrapper::throwWrapper(__ex); - } - catch(...) - { - throw LocalExceptionWrapper(UnknownException(__FILE__, __LINE__, "unknown c++ exception"), false); - } - return __result; -} - -bool -IceDelegateD::Ice::Object::ice_invoke(const string&, - OperationMode, - const pair<const Byte*, const Byte*>& /*inEncaps*/, - vector<Byte>&, - const Context*, - InvocationObserver&) -{ - throw CollocationOptimizationException(__FILE__, __LINE__); - return false; -} - -void -IceDelegateD::Ice::Object::ice_flushBatchRequests(InvocationObserver& /*observer*/) -{ - throw CollocationOptimizationException(__FILE__, __LINE__); -} - -RequestHandlerPtr -IceDelegateD::Ice::Object::__getRequestHandler() const -{ - throw CollocationOptimizationException(__FILE__, __LINE__); - return 0; -} - -void -IceDelegateD::Ice::Object::__setRequestHandler(const RequestHandlerPtr&) -{ - throw CollocationOptimizationException(__FILE__, __LINE__); -} - -void -IceDelegateD::Ice::Object::__copyFrom(const ::IceInternal::Handle< ::IceDelegateD::Ice::Object>& from) -{ - // - // No need to synchronize "from", as the delegate is immutable - // after creation. - // - // - // No need to synchronize "*this", as this operation is only - // called upon initialization. - // - - assert(!__reference); - assert(!__adapter); - - __reference = from->__reference; - __adapter = from->__adapter; -} - -void -IceDelegateD::Ice::Object::__initCurrent(Current& current, const string& op, OperationMode mode, - const Context* context) -{ - current.adapter = __adapter; - current.id = __reference->getIdentity(); - current.facet = __reference->getFacet(); - current.operation = op; - current.mode = mode; - if(context != 0) + if(async) { - // - // Explicit context - // - current.ctx = *context; + ConnectRequestHandlerPtr handler = new ::IceInternal::ConnectRequestHandler(_reference, this); + return handler->connect(); } else { - // - // Implicit context - // - const ImplicitContextIPtr& implicitContext = __reference->getInstance()->getImplicitContext(); - const Context& prxContext = __reference->getContext()->getValue(); - - if(implicitContext == 0) - { - current.ctx = prxContext; - } - else - { - implicitContext->combine(prxContext, current.ctx); - } + return new ::IceInternal::ConnectionRequestHandler(_reference, this); } - current.requestId = -1; } void -IceDelegateD::Ice::Object::setup(const ReferencePtr& ref, const ObjectAdapterPtr& adapter) +IceProxy::Ice::Object::setup(const ReferencePtr& ref) { // // No need to synchronize "*this", as this operation is only // called upon initialization. // - assert(!__reference); - assert(!__adapter); + assert(!_reference); + assert(!_requestHandler); - __reference = ref; - __adapter = adapter; + _reference = ref; } bool |