summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Proxy.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2007-11-27 11:58:35 +0100
committerBenoit Foucher <benoit@zeroc.com>2007-11-27 11:58:35 +0100
commit47f800495093fd7679a315e2d730fea22f6135b7 (patch)
treea7b8d3488f3841367dd03d10cae293f36fd10481 /cpp/src/Ice/Proxy.cpp
parentFixed SystemException to no longer derive from LocalException (diff)
downloadice-47f800495093fd7679a315e2d730fea22f6135b7.tar.bz2
ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.xz
ice-47f800495093fd7679a315e2d730fea22f6135b7.zip
- Added support for non-blocking AMI/batch requests, connection
creation. - Added support for AMI oneway requests. - Changed collocation optimization to not perform any DNS lookups.
Diffstat (limited to 'cpp/src/Ice/Proxy.cpp')
-rw-r--r--cpp/src/Ice/Proxy.cpp177
1 files changed, 130 insertions, 47 deletions
diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp
index 4d986ef0abc..845d40ffee5 100644
--- a/cpp/src/Ice/Proxy.cpp
+++ b/cpp/src/Ice/Proxy.cpp
@@ -14,6 +14,8 @@
#include <Ice/ObjectAdapterFactory.h>
#include <Ice/Outgoing.h>
#include <Ice/OutgoingAsync.h>
+#include <Ice/ConnectRequestHandler.h>
+#include <Ice/ConnectionRequestHandler.h>
#include <Ice/Direct.h>
#include <Ice/Reference.h>
#include <Ice/EndpointI.h>
@@ -123,7 +125,7 @@ IceProxy::Ice::Object::ice_isA(const string& typeId, const Context* context)
try
{
__checkTwowayOnly("ice_isA");
- __del = __getDelegate();
+ __del = __getDelegate(false);
return __del->ice_isA(typeId, context);
}
catch(const LocalExceptionWrapper& __ex)
@@ -146,7 +148,7 @@ IceProxy::Ice::Object::ice_ping(const Context* context)
Handle< ::IceDelegate::Ice::Object> __del;
try
{
- __del = __getDelegate();
+ __del = __getDelegate(false);
__del->ice_ping(context);
return;
}
@@ -171,7 +173,7 @@ IceProxy::Ice::Object::ice_ids(const Context* context)
try
{
__checkTwowayOnly("ice_ids");
- __del = __getDelegate();
+ __del = __getDelegate(false);
return __del->ice_ids(context);
}
catch(const LocalExceptionWrapper& __ex)
@@ -195,7 +197,7 @@ IceProxy::Ice::Object::ice_id(const Context* context)
try
{
__checkTwowayOnly("ice_id");
- __del = __getDelegate();
+ __del = __getDelegate(false);
return __del->ice_id(context);
}
catch(const LocalExceptionWrapper& __ex)
@@ -244,7 +246,7 @@ IceProxy::Ice::Object::ice_invoke(const string& operation,
Handle< ::IceDelegate::Ice::Object> __del;
try
{
- __del = __getDelegate();
+ __del = __getDelegate(false);
return __del->ice_invoke(operation, mode, inParams, outParams, context);
}
catch(const LocalExceptionWrapper& __ex)
@@ -824,9 +826,8 @@ IceProxy::Ice::Object::ice_getConnection()
Handle< ::IceDelegate::Ice::Object> __del;
try
{
- __del = __getDelegate();
- bool compress;
- return __del->__getConnection(compress);
+ __del = __getDelegate(false);
+ return __del->__getRequestHandler()->getConnection(true);
}
catch(const LocalException& __ex)
{
@@ -848,8 +849,7 @@ IceProxy::Ice::Object::ice_getCachedConnection() const
{
try
{
- bool compress;
- return __del->__getConnection(compress);
+ return __del->__getRequestHandler()->getConnection(false);
}
catch(const CollocationOptimizationException&)
{
@@ -858,6 +858,36 @@ IceProxy::Ice::Object::ice_getCachedConnection() const
return 0;
}
+void
+IceProxy::Ice::Object::ice_flushBatchRequests()
+{
+ int __cnt;
+ while(true)
+ {
+ Handle< ::IceDelegate::Ice::Object> __del;
+ try
+ {
+ __del = __getDelegate(false);
+ __del->ice_flushBatchRequests();
+ return;
+ }
+ catch(const LocalExceptionWrapper& __ex)
+ {
+ __handleExceptionWrapper(__del, __ex);
+ }
+ catch(const LocalException& __ex)
+ {
+ __handleException(__del, __ex, __cnt);
+ }
+ }
+}
+
+void
+IceProxy::Ice::Object::ice_flushBatchRequests_async(const AMI_Object_ice_flushBatchRequestsPtr& cb)
+{
+ cb->__invoke(this);
+}
+
ReferencePtr
IceProxy::Ice::Object::__reference() const
{
@@ -1024,7 +1054,7 @@ operator<<(ostream& os, const ::IceProxy::Ice::Object& p)
}
Handle< ::IceDelegate::Ice::Object>
-IceProxy::Ice::Object::__getDelegate()
+IceProxy::Ice::Object::__getDelegate(bool async)
{
IceUtil::Mutex::Lock sync(*this);
@@ -1048,19 +1078,8 @@ IceProxy::Ice::Object::__getDelegate()
if(!delegate)
{
Handle< ::IceDelegateM::Ice::Object> d = __createDelegateM();
- d->setup(_reference);
+ d->setup(_reference, this, async);
delegate = d;
-
- //
- // If this proxy is for a non-local object, and we are
- // using a router, then add this proxy to the router info
- // object.
- //
- RouterInfoPtr ri = _reference->getRouterInfo();
- if(ri)
- {
- ri->addProxy(this);
- }
}
if(_reference->getCacheConnection())
@@ -1076,6 +1095,26 @@ IceProxy::Ice::Object::__getDelegate()
return delegate;
}
+void
+IceProxy::Ice::Object::__setRequestHandler(const Handle< ::IceDelegate::Ice::Object>& delegate,
+ const ::IceInternal::RequestHandlerPtr& handler)
+{
+ IceUtil::Mutex::Lock sync(*this);
+ if(_delegate.get() == delegate.get())
+ {
+ if(dynamic_cast< ::IceDelegateM::Ice::Object*>(_delegate.get()))
+ {
+ _delegate = __createDelegateM();
+ _delegate->__setRequestHandler(handler);
+ }
+ else if(dynamic_cast< ::IceDelegateD::Ice::Object*>(_delegate.get()))
+ {
+ _delegate = __createDelegateD();
+ _delegate->__setRequestHandler(handler);
+ }
+ }
+}
+
Handle< ::IceDelegateM::Ice::Object>
IceProxy::Ice::Object::__createDelegateM()
{
@@ -1116,7 +1155,7 @@ bool
IceDelegateM::Ice::Object::ice_isA(const string& __id, const Context* context)
{
static const string __operation("ice_isA");
- Outgoing __og(__connection.get(), __reference.get(), __operation, ::Ice::Nonmutating, context, __compress);
+ Outgoing __og(__handler.get(), __operation, ::Ice::Nonmutating, context);
try
{
BasicStream* __os = __og.os();
@@ -1155,7 +1194,7 @@ void
IceDelegateM::Ice::Object::ice_ping(const Context* context)
{
static const string __operation("ice_ping");
- Outgoing __og(__connection.get(), __reference.get(), __operation, ::Ice::Nonmutating, context, __compress);
+ Outgoing __og(__handler.get(), __operation, ::Ice::Nonmutating, context);
bool __ok = __og.invoke();
try
{
@@ -1182,7 +1221,7 @@ vector<string>
IceDelegateM::Ice::Object::ice_ids(const Context* context)
{
static const string __operation("ice_ids");
- Outgoing __og(__connection.get(), __reference.get(), __operation, ::Ice::Nonmutating, context, __compress);
+ Outgoing __og(__handler.get(), __operation, ::Ice::Nonmutating, context);
vector<string> __ret;
bool __ok = __og.invoke();
try
@@ -1212,7 +1251,7 @@ string
IceDelegateM::Ice::Object::ice_id(const Context* context)
{
static const string __operation("ice_id");
- Outgoing __og(__connection.get(), __reference.get(), __operation, ::Ice::Nonmutating, context, __compress);
+ Outgoing __og(__handler.get(), __operation, ::Ice::Nonmutating, context);
string __ret;
bool __ok = __og.invoke();
try
@@ -1245,7 +1284,7 @@ IceDelegateM::Ice::Object::ice_invoke(const string& operation,
vector<Byte>& outParams,
const Context* context)
{
- Outgoing __og(__connection.get(), __reference.get(), operation, mode, context, __compress);
+ Outgoing __og(__handler.get(), operation, mode, context);
try
{
BasicStream* __os = __og.os();
@@ -1256,7 +1295,7 @@ IceDelegateM::Ice::Object::ice_invoke(const string& operation,
__og.abort(__ex);
}
bool ok = __og.invoke();
- if(__reference->getMode() == Reference::ModeTwoway)
+ if(__handler->getReference()->getMode() == Reference::ModeTwoway)
{
try
{
@@ -1272,11 +1311,35 @@ IceDelegateM::Ice::Object::ice_invoke(const string& operation,
return ok;
}
-ConnectionIPtr
-IceDelegateM::Ice::Object::__getConnection(bool& compress) const
+void
+IceDelegateM::Ice::Object::ice_flushBatchRequests()
{
- compress = __compress;
- return __connection;
+ BatchOutgoing __og(__handler.get());
+ try
+ {
+ __og.invoke();
+ }
+ catch(const ::Ice::LocalException& __ex)
+ {
+ //
+ // We never retry flusing the batch requests as the connection batched
+ // requests were discarded and the caller needs to be notified of the
+ // failure.
+ //
+ throw ::IceInternal::LocalExceptionWrapper(__ex, false);
+ }
+}
+
+RequestHandlerPtr
+IceDelegateM::Ice::Object::__getRequestHandler() const
+{
+ return __handler;
+}
+
+void
+IceDelegateM::Ice::Object::__setRequestHandler(const RequestHandlerPtr& handler)
+{
+ __handler = handler;
}
void
@@ -1291,28 +1354,36 @@ IceDelegateM::Ice::Object::__copyFrom(const ::IceInternal::Handle< ::IceDelegate
// No need to synchronize "*this", as this operation is only
// called upon initialization.
//
-
- assert(!__reference);
- assert(!__connection);
-
- __reference = from->__reference;
- __connection = from->__connection;
- __compress = from->__compress;
+
+ assert(!__handler);
+
+ __handler = from->__handler;
}
void
-IceDelegateM::Ice::Object::setup(const ReferencePtr& ref)
+IceDelegateM::Ice::Object::setup(const ReferencePtr& ref, const ::Ice::ObjectPrx& proxy, bool async)
{
//
// No need to synchronize "*this", as this operation is only
// called upon initialization.
//
- assert(!__reference);
- assert(!__connection);
+ assert(!__handler);
- __reference = ref;
- __connection = __reference->getConnection(__compress);
+ //
+ // If the delegate is created as a result of an AMI call or if the proxy is
+ // a batch proxy we use the connect request handler to connect the in the
+ // background.
+ //
+ if(async || ref->getMode() == Reference::ModeBatchOneway || ref->getMode() == Reference::ModeBatchDatagram)
+ {
+ IceInternal::ConnectRequestHandlerPtr handler = new ::IceInternal::ConnectRequestHandler(ref, proxy, this);
+ __handler = handler->connect();
+ }
+ else
+ {
+ __handler = new ::IceInternal::ConnectionRequestHandler(ref, proxy);
+ }
}
bool
@@ -1577,14 +1648,26 @@ IceDelegateD::Ice::Object::ice_invoke(const string&,
return false;
}
-ConnectionIPtr
-IceDelegateD::Ice::Object::__getConnection(bool&) const
+void
+IceDelegateD::Ice::Object::ice_flushBatchRequests()
+{
+ throw CollocationOptimizationException(__FILE__, __LINE__);
+}
+
+RequestHandlerPtr
+IceDelegateD::Ice::Object::__getRequestHandler() const
{
throw CollocationOptimizationException(__FILE__, __LINE__);
return 0;
}
void
+IceDelegateD::Ice::Object::__setRequestHandler(const RequestHandlerPtr&)
+{
+ throw CollocationOptimizationException(__FILE__, __LINE__);
+}
+
+void
IceDelegateD::Ice::Object::__copyFrom(const ::IceInternal::Handle< ::IceDelegateD::Ice::Object>& from)
{
//