diff options
author | Benoit Foucher <benoit@zeroc.com> | 2007-11-27 11:58:35 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2007-11-27 11:58:35 +0100 |
commit | 47f800495093fd7679a315e2d730fea22f6135b7 (patch) | |
tree | a7b8d3488f3841367dd03d10cae293f36fd10481 /cpp/src/Ice/Proxy.cpp | |
parent | Fixed SystemException to no longer derive from LocalException (diff) | |
download | ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.bz2 ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.xz ice-47f800495093fd7679a315e2d730fea22f6135b7.zip |
- Added support for non-blocking AMI/batch requests, connection
creation.
- Added support for AMI oneway requests.
- Changed collocation optimization to not perform any DNS lookups.
Diffstat (limited to 'cpp/src/Ice/Proxy.cpp')
-rw-r--r-- | cpp/src/Ice/Proxy.cpp | 177 |
1 files changed, 130 insertions, 47 deletions
diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index 4d986ef0abc..845d40ffee5 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -14,6 +14,8 @@ #include <Ice/ObjectAdapterFactory.h> #include <Ice/Outgoing.h> #include <Ice/OutgoingAsync.h> +#include <Ice/ConnectRequestHandler.h> +#include <Ice/ConnectionRequestHandler.h> #include <Ice/Direct.h> #include <Ice/Reference.h> #include <Ice/EndpointI.h> @@ -123,7 +125,7 @@ IceProxy::Ice::Object::ice_isA(const string& typeId, const Context* context) try { __checkTwowayOnly("ice_isA"); - __del = __getDelegate(); + __del = __getDelegate(false); return __del->ice_isA(typeId, context); } catch(const LocalExceptionWrapper& __ex) @@ -146,7 +148,7 @@ IceProxy::Ice::Object::ice_ping(const Context* context) Handle< ::IceDelegate::Ice::Object> __del; try { - __del = __getDelegate(); + __del = __getDelegate(false); __del->ice_ping(context); return; } @@ -171,7 +173,7 @@ IceProxy::Ice::Object::ice_ids(const Context* context) try { __checkTwowayOnly("ice_ids"); - __del = __getDelegate(); + __del = __getDelegate(false); return __del->ice_ids(context); } catch(const LocalExceptionWrapper& __ex) @@ -195,7 +197,7 @@ IceProxy::Ice::Object::ice_id(const Context* context) try { __checkTwowayOnly("ice_id"); - __del = __getDelegate(); + __del = __getDelegate(false); return __del->ice_id(context); } catch(const LocalExceptionWrapper& __ex) @@ -244,7 +246,7 @@ IceProxy::Ice::Object::ice_invoke(const string& operation, Handle< ::IceDelegate::Ice::Object> __del; try { - __del = __getDelegate(); + __del = __getDelegate(false); return __del->ice_invoke(operation, mode, inParams, outParams, context); } catch(const LocalExceptionWrapper& __ex) @@ -824,9 +826,8 @@ IceProxy::Ice::Object::ice_getConnection() Handle< ::IceDelegate::Ice::Object> __del; try { - __del = __getDelegate(); - bool compress; - return __del->__getConnection(compress); + __del = __getDelegate(false); + return __del->__getRequestHandler()->getConnection(true); } catch(const LocalException& __ex) { @@ -848,8 +849,7 @@ IceProxy::Ice::Object::ice_getCachedConnection() const { try { - bool compress; - return __del->__getConnection(compress); + return __del->__getRequestHandler()->getConnection(false); } catch(const CollocationOptimizationException&) { @@ -858,6 +858,36 @@ IceProxy::Ice::Object::ice_getCachedConnection() const return 0; } +void +IceProxy::Ice::Object::ice_flushBatchRequests() +{ + int __cnt; + while(true) + { + Handle< ::IceDelegate::Ice::Object> __del; + try + { + __del = __getDelegate(false); + __del->ice_flushBatchRequests(); + return; + } + catch(const LocalExceptionWrapper& __ex) + { + __handleExceptionWrapper(__del, __ex); + } + catch(const LocalException& __ex) + { + __handleException(__del, __ex, __cnt); + } + } +} + +void +IceProxy::Ice::Object::ice_flushBatchRequests_async(const AMI_Object_ice_flushBatchRequestsPtr& cb) +{ + cb->__invoke(this); +} + ReferencePtr IceProxy::Ice::Object::__reference() const { @@ -1024,7 +1054,7 @@ operator<<(ostream& os, const ::IceProxy::Ice::Object& p) } Handle< ::IceDelegate::Ice::Object> -IceProxy::Ice::Object::__getDelegate() +IceProxy::Ice::Object::__getDelegate(bool async) { IceUtil::Mutex::Lock sync(*this); @@ -1048,19 +1078,8 @@ IceProxy::Ice::Object::__getDelegate() if(!delegate) { Handle< ::IceDelegateM::Ice::Object> d = __createDelegateM(); - d->setup(_reference); + d->setup(_reference, this, async); delegate = d; - - // - // If this proxy is for a non-local object, and we are - // using a router, then add this proxy to the router info - // object. - // - RouterInfoPtr ri = _reference->getRouterInfo(); - if(ri) - { - ri->addProxy(this); - } } if(_reference->getCacheConnection()) @@ -1076,6 +1095,26 @@ IceProxy::Ice::Object::__getDelegate() return delegate; } +void +IceProxy::Ice::Object::__setRequestHandler(const Handle< ::IceDelegate::Ice::Object>& delegate, + const ::IceInternal::RequestHandlerPtr& handler) +{ + IceUtil::Mutex::Lock sync(*this); + if(_delegate.get() == delegate.get()) + { + if(dynamic_cast< ::IceDelegateM::Ice::Object*>(_delegate.get())) + { + _delegate = __createDelegateM(); + _delegate->__setRequestHandler(handler); + } + else if(dynamic_cast< ::IceDelegateD::Ice::Object*>(_delegate.get())) + { + _delegate = __createDelegateD(); + _delegate->__setRequestHandler(handler); + } + } +} + Handle< ::IceDelegateM::Ice::Object> IceProxy::Ice::Object::__createDelegateM() { @@ -1116,7 +1155,7 @@ bool IceDelegateM::Ice::Object::ice_isA(const string& __id, const Context* context) { static const string __operation("ice_isA"); - Outgoing __og(__connection.get(), __reference.get(), __operation, ::Ice::Nonmutating, context, __compress); + Outgoing __og(__handler.get(), __operation, ::Ice::Nonmutating, context); try { BasicStream* __os = __og.os(); @@ -1155,7 +1194,7 @@ void IceDelegateM::Ice::Object::ice_ping(const Context* context) { static const string __operation("ice_ping"); - Outgoing __og(__connection.get(), __reference.get(), __operation, ::Ice::Nonmutating, context, __compress); + Outgoing __og(__handler.get(), __operation, ::Ice::Nonmutating, context); bool __ok = __og.invoke(); try { @@ -1182,7 +1221,7 @@ vector<string> IceDelegateM::Ice::Object::ice_ids(const Context* context) { static const string __operation("ice_ids"); - Outgoing __og(__connection.get(), __reference.get(), __operation, ::Ice::Nonmutating, context, __compress); + Outgoing __og(__handler.get(), __operation, ::Ice::Nonmutating, context); vector<string> __ret; bool __ok = __og.invoke(); try @@ -1212,7 +1251,7 @@ string IceDelegateM::Ice::Object::ice_id(const Context* context) { static const string __operation("ice_id"); - Outgoing __og(__connection.get(), __reference.get(), __operation, ::Ice::Nonmutating, context, __compress); + Outgoing __og(__handler.get(), __operation, ::Ice::Nonmutating, context); string __ret; bool __ok = __og.invoke(); try @@ -1245,7 +1284,7 @@ IceDelegateM::Ice::Object::ice_invoke(const string& operation, vector<Byte>& outParams, const Context* context) { - Outgoing __og(__connection.get(), __reference.get(), operation, mode, context, __compress); + Outgoing __og(__handler.get(), operation, mode, context); try { BasicStream* __os = __og.os(); @@ -1256,7 +1295,7 @@ IceDelegateM::Ice::Object::ice_invoke(const string& operation, __og.abort(__ex); } bool ok = __og.invoke(); - if(__reference->getMode() == Reference::ModeTwoway) + if(__handler->getReference()->getMode() == Reference::ModeTwoway) { try { @@ -1272,11 +1311,35 @@ IceDelegateM::Ice::Object::ice_invoke(const string& operation, return ok; } -ConnectionIPtr -IceDelegateM::Ice::Object::__getConnection(bool& compress) const +void +IceDelegateM::Ice::Object::ice_flushBatchRequests() { - compress = __compress; - return __connection; + BatchOutgoing __og(__handler.get()); + try + { + __og.invoke(); + } + catch(const ::Ice::LocalException& __ex) + { + // + // We never retry flusing the batch requests as the connection batched + // requests were discarded and the caller needs to be notified of the + // failure. + // + throw ::IceInternal::LocalExceptionWrapper(__ex, false); + } +} + +RequestHandlerPtr +IceDelegateM::Ice::Object::__getRequestHandler() const +{ + return __handler; +} + +void +IceDelegateM::Ice::Object::__setRequestHandler(const RequestHandlerPtr& handler) +{ + __handler = handler; } void @@ -1291,28 +1354,36 @@ IceDelegateM::Ice::Object::__copyFrom(const ::IceInternal::Handle< ::IceDelegate // No need to synchronize "*this", as this operation is only // called upon initialization. // - - assert(!__reference); - assert(!__connection); - - __reference = from->__reference; - __connection = from->__connection; - __compress = from->__compress; + + assert(!__handler); + + __handler = from->__handler; } void -IceDelegateM::Ice::Object::setup(const ReferencePtr& ref) +IceDelegateM::Ice::Object::setup(const ReferencePtr& ref, const ::Ice::ObjectPrx& proxy, bool async) { // // No need to synchronize "*this", as this operation is only // called upon initialization. // - assert(!__reference); - assert(!__connection); + assert(!__handler); - __reference = ref; - __connection = __reference->getConnection(__compress); + // + // If the delegate is created as a result of an AMI call or if the proxy is + // a batch proxy we use the connect request handler to connect the in the + // background. + // + if(async || ref->getMode() == Reference::ModeBatchOneway || ref->getMode() == Reference::ModeBatchDatagram) + { + IceInternal::ConnectRequestHandlerPtr handler = new ::IceInternal::ConnectRequestHandler(ref, proxy, this); + __handler = handler->connect(); + } + else + { + __handler = new ::IceInternal::ConnectionRequestHandler(ref, proxy); + } } bool @@ -1577,14 +1648,26 @@ IceDelegateD::Ice::Object::ice_invoke(const string&, return false; } -ConnectionIPtr -IceDelegateD::Ice::Object::__getConnection(bool&) const +void +IceDelegateD::Ice::Object::ice_flushBatchRequests() +{ + throw CollocationOptimizationException(__FILE__, __LINE__); +} + +RequestHandlerPtr +IceDelegateD::Ice::Object::__getRequestHandler() const { throw CollocationOptimizationException(__FILE__, __LINE__); return 0; } void +IceDelegateD::Ice::Object::__setRequestHandler(const RequestHandlerPtr&) +{ + throw CollocationOptimizationException(__FILE__, __LINE__); +} + +void IceDelegateD::Ice::Object::__copyFrom(const ::IceInternal::Handle< ::IceDelegateD::Ice::Object>& from) { // |