diff options
39 files changed, 1879 insertions, 477 deletions
@@ -27,6 +27,10 @@ Changes since version 3.5.1 General Changes =============== +- Added methods to proxies to asynchronously get the connection + associated with the proxy. The current synchronous methods can block + as they may cause the connection to be established. + - Added support for HTTP proxies to the Ice core in C++, C# and Java. This allows outgoing TCP & SSL connections to be mediated by an HTTP network proxy service that supports HTTP CONNECT tunneling. diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h index b7a14f2dd39..507ead6df62 100644 --- a/cpp/include/Ice/OutgoingAsync.h +++ b/cpp/include/Ice/OutgoingAsync.h @@ -122,7 +122,7 @@ public: virtual void __invokeException(const Exception&); // Required to be public for AsynchronousException void __invokeSent(); // Required to be public for AsynchronousSent - void __attachRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& endpt, + void __attachRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& endpt, Ice::Int requestId, Ice::Int sz) { _childObserver.attach(_observer.getRemoteObserver(c, endpt, requestId, sz)); @@ -130,9 +130,9 @@ public: void __attachCollocatedObserver(const Ice::ObjectAdapterPtr& adapter, Ice::Int requestId) { - _childObserver.attach(_observer.getCollocatedObserver(adapter, - requestId, - static_cast<Ice::Int>(_os.b.size() - + _childObserver.attach(_observer.getCollocatedObserver(adapter, + requestId, + static_cast<Ice::Int>(_os.b.size() - IceInternal::headerSize - 4))); } @@ -213,9 +213,9 @@ public: // // Called by the connection when the message is confirmed sent. The connection is locked - // when this is called so this method can call the sent callback. Instead, this method + // when this is called so this method can't call the sent callback. Instead, this method // returns true if there's a sent callback and false otherwise. If true is returned, the - // connection will call the __invokeSentCallback() method bellow (which in turn should + // connection will call the __invokeSentCallback() method bellow (which in turn should // call the sent callback). // virtual bool __sent() = 0; @@ -294,14 +294,16 @@ public: protected: Ice::ObjectPrx _proxy; + RequestHandlerPtr _handler; + int _cnt; private: void handleException(const Ice::Exception&); - RequestHandlerPtr _handler; + Ice::EncodingVersion _encoding; - int _cnt; + bool _sent; Ice::OperationMode _mode; }; @@ -336,7 +338,7 @@ public: } private: - + Ice::ObjectPrx _proxy; }; @@ -373,6 +375,26 @@ private: int _useCount; }; +class ICE_API GetConnectionOutgoingAsync : public OutgoingAsync +{ +public: + + GetConnectionOutgoingAsync(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, + const Ice::LocalObjectPtr&); + + void __invoke(); + + virtual AsyncStatus __send(const Ice::ConnectionIPtr&, bool, bool); + virtual AsyncStatus __invokeCollocated(CollocatedRequestHandler*); + virtual bool __sent(); + virtual void __invokeSent(); + virtual void __finished(const Ice::Exception&); + +private: + + void handleException(const Ice::Exception&); +}; + // // Base class for all callbacks. // @@ -467,7 +489,7 @@ template<typename Callable, typename = void> struct callable_type static const int value = 1; }; -template<class Callable> struct callable_type<Callable, typename ::std::enable_if< ::std::is_class<Callable>::value && +template<class Callable> struct callable_type<Callable, typename ::std::enable_if< ::std::is_class<Callable>::value && !::std::is_bind_expression<Callable>::value>::type> { template<typename T, T> struct TypeCheck; @@ -491,7 +513,7 @@ template<class Callable> struct callable_type<Callable, typename ::std::enable_i enum { value = sizeof(check<Callable>(0)) }; }; -template<> struct callable_type<void(*)(const ::Ice::AsyncResultPtr&)> +template<> struct callable_type<void(*)(const ::Ice::AsyncResultPtr&)> { static const int value = 2; }; @@ -514,11 +536,11 @@ public: template<typename T> Function(T f, typename ::std::enable_if<is_callable<T, S>::value>::type* = 0) : std::function<S>(f) { } - + Function() { } - + Function(::std::nullptr_t) : ::std::function<S>(nullptr) { } @@ -553,7 +575,7 @@ newCallback(T* instance, ICE_API CallbackPtr newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>&, - const ::IceInternal::Function<void (const AsyncResultPtr&)>& = + const ::IceInternal::Function<void (const AsyncResultPtr&)>& = ::IceInternal::Function<void (const AsyncResultPtr&)>()); #endif @@ -565,7 +587,7 @@ newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>&, // Interfaces for the deprecated AMI mapping. // -class ICE_API AMISentCallback +class ICE_API AMISentCallback { public: diff --git a/cpp/include/Ice/OutgoingAsyncF.h b/cpp/include/Ice/OutgoingAsyncF.h index 3e709ccc366..9675fa6c468 100644 --- a/cpp/include/Ice/OutgoingAsyncF.h +++ b/cpp/include/Ice/OutgoingAsyncF.h @@ -50,6 +50,10 @@ class CommunicatorBatchOutgoingAsync; ICE_API IceUtil::Shared* upCast(CommunicatorBatchOutgoingAsync*); typedef IceInternal::Handle<CommunicatorBatchOutgoingAsync> CommunicatorBatchOutgoingAsyncPtr; +class GetConnectionOutgoingAsync; +ICE_API IceUtil::Shared* upCast(GetConnectionOutgoingAsync*); +typedef IceInternal::Handle<GetConnectionOutgoingAsync> GetConnectionOutgoingAsyncPtr; + } #endif diff --git a/cpp/include/Ice/Proxy.h b/cpp/include/Ice/Proxy.h index 8ef8469537b..8a17329b44b 100644 --- a/cpp/include/Ice/Proxy.h +++ b/cpp/include/Ice/Proxy.h @@ -82,6 +82,9 @@ typedef ::IceUtil::Handle< Callback_Object_ice_invoke_Base> Callback_Object_ice_ class Callback_Object_ice_flushBatchRequests_Base : virtual public ::IceInternal::CallbackBase { }; typedef ::IceUtil::Handle< Callback_Object_ice_flushBatchRequests_Base> Callback_Object_ice_flushBatchRequestsPtr; +class Callback_Object_ice_getConnection_Base : virtual public ::IceInternal::CallbackBase { }; +typedef ::IceUtil::Handle< Callback_Object_ice_getConnection_Base> Callback_Object_ice_getConnectionPtr; + // // Deprecated AMI callbacks // @@ -101,7 +104,7 @@ public: { ice_exception(ex); } - + void __sent(bool sentSynchronously) { AMICallbackBase::__sent(sentSynchronously); @@ -124,7 +127,7 @@ public: { ice_exception(ex); } - + void __sent(bool sentSynchronously) { AMICallbackBase::__sent(sentSynchronously); @@ -140,7 +143,7 @@ public: { ice_exception(ex); } - + void __sent(bool sentSynchronously) { AMICallbackBase::__sent(sentSynchronously); @@ -164,7 +167,7 @@ public: virtual CallbackBasePtr verify(const ::Ice::LocalObjectPtr&); virtual void sent(const ::Ice::AsyncResultPtr&) const; - + virtual bool hasSentCallback() const; protected: @@ -178,16 +181,16 @@ protected: class ICE_API Cpp11FnOnewayCallbackNC : public Cpp11FnCallbackNC { public: - + Cpp11FnOnewayCallbackNC(const ::std::function<void ()>&, const ::std::function<void (const ::Ice::Exception&)>&, const ::std::function<void (bool)>&); virtual void completed(const ::Ice::AsyncResultPtr&) const; - + private: - + ::std::function<void ()> _cb; }; @@ -200,7 +203,7 @@ namespace IceProxy { namespace Ice class ICE_API Object : public ::IceUtil::Shared { public: - + bool operator==(const Object&) const; bool operator!=(const Object&) const; bool operator<(const Object&) const; @@ -217,42 +220,42 @@ public: { return ice_isA(typeId, &context); } - + #ifdef ICE_CPP11 ::Ice::AsyncResultPtr - begin_ice_isA(const ::std::string& typeId, - const ::Ice::Context& ctx, - const ::IceInternal::Function<void (bool)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + begin_ice_isA(const ::std::string& typeId, + const ::Ice::Context& ctx, + const ::IceInternal::Function<void (bool)>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { return __begin_ice_isA(typeId, &ctx, response, exception, sent); } - + ::Ice::AsyncResultPtr - begin_ice_isA(const ::std::string& typeId, - const ::IceInternal::Function<void (bool)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + begin_ice_isA(const ::std::string& typeId, + const ::IceInternal::Function<void (bool)>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { return __begin_ice_isA(typeId, 0, response, exception, sent); } - + ::Ice::AsyncResultPtr - begin_ice_isA(const ::std::string& typeId, + begin_ice_isA(const ::std::string& typeId, const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& completed, const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& sent = ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>()) { return begin_ice_isA(typeId, 0, ::Ice::newCallback(completed, sent), 0); } - + ::Ice::AsyncResultPtr - begin_ice_isA(const ::std::string& typeId, - const ::Ice::Context& ctx, - const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& completed, + begin_ice_isA(const ::std::string& typeId, + const ::Ice::Context& ctx, + const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& completed, const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& sent = ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>()) { @@ -301,7 +304,7 @@ public: } bool end_ice_isA(const ::Ice::AsyncResultPtr&); - + void ice_ping() { ice_ping(0); @@ -310,38 +313,38 @@ public: { ice_ping(&context); } - + #ifdef ICE_CPP11 ::Ice::AsyncResultPtr - begin_ice_ping(const ::IceInternal::Function<void ()>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + begin_ice_ping(const ::IceInternal::Function<void ()>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { return __begin_ice_ping(0, response, exception, sent); } - + ::Ice::AsyncResultPtr begin_ice_ping(const ::Ice::Context& ctx, - const ::IceInternal::Function<void ()>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + const ::IceInternal::Function<void ()>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { return __begin_ice_ping(&ctx, response, exception, sent); } - + ::Ice::AsyncResultPtr - begin_ice_ping(const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& completed, + begin_ice_ping(const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& completed, const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& sent = ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>()) { return begin_ice_ping(0, ::Ice::newCallback(completed, sent), 0); } - + ::Ice::AsyncResultPtr - begin_ice_ping(const ::Ice::Context& ctx, - const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& completed, + begin_ice_ping(const ::Ice::Context& ctx, + const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& completed, const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& sent = ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>()) { @@ -394,39 +397,39 @@ public: { return ice_ids(&context); } - + #ifdef ICE_CPP11 ::Ice::AsyncResultPtr - begin_ice_ids(const ::IceInternal::Function<void (const ::std::vector< ::std::string>&)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + begin_ice_ids(const ::IceInternal::Function<void (const ::std::vector< ::std::string>&)>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { return __begin_ice_ids(0, response, exception, sent); } - + ::Ice::AsyncResultPtr begin_ice_ids(const ::Ice::Context& ctx, const ::IceInternal::Function<void (const ::std::vector< ::std::string>&)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { return __begin_ice_ids(&ctx, response, exception, sent); } - + ::Ice::AsyncResultPtr - begin_ice_ids(const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& completed, - const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& sent = + begin_ice_ids(const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& completed, + const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& sent = ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>()) { return begin_ice_ids(0, ::Ice::newCallback(completed, sent), 0); } - + ::Ice::AsyncResultPtr - begin_ice_ids(const ::Ice::Context& ctx, + begin_ice_ids(const ::Ice::Context& ctx, const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& completed, - const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& sent = + const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& sent = ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>()) { return begin_ice_ids(&ctx, ::Ice::newCallback(completed, sent), 0); @@ -470,7 +473,7 @@ public: } ::std::vector< ::std::string> end_ice_ids(const ::Ice::AsyncResultPtr&); - + ::std::string ice_id() { return ice_id(0); @@ -479,27 +482,27 @@ public: { return ice_id(&context); } - + #ifdef ICE_CPP11 ::Ice::AsyncResultPtr - begin_ice_id(const ::IceInternal::Function<void (const ::std::string&)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + begin_ice_id(const ::IceInternal::Function<void (const ::std::string&)>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { return __begin_ice_id(0, response, exception, sent); } - + ::Ice::AsyncResultPtr begin_ice_id(const ::Ice::Context& ctx, - const ::IceInternal::Function<void (const ::std::string&)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + const ::IceInternal::Function<void (const ::std::string&)>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { return __begin_ice_id(&ctx, response, exception, sent); } - + ::Ice::AsyncResultPtr begin_ice_id(const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& completed, const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& sent = @@ -507,11 +510,11 @@ public: { return begin_ice_id(0, ::Ice::newCallback(completed, sent), 0); } - + ::Ice::AsyncResultPtr begin_ice_id(const ::Ice::Context& ctx, const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& completed, - const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& sent = + const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& sent = ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>()) { return begin_ice_id(&ctx, ::Ice::newCallback(completed, sent), 0); @@ -560,20 +563,20 @@ public: static const ::std::string& ice_staticId() { return ::Ice::Object::ice_staticId(); - } + } // Returns true if ok, false if user exception. - bool ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + bool ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::vector< ::Ice::Byte>& inParams, ::std::vector< ::Ice::Byte>& outParams) { return ice_invoke(operation, mode, inParams, outParams, 0); } - bool ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + bool ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::vector< ::Ice::Byte>& inParams, ::std::vector< ::Ice::Byte>& outParams, const ::Ice::Context& context) @@ -588,50 +591,50 @@ public: #ifdef ICE_CPP11 ::Ice::AsyncResultPtr begin_ice_invoke( - const ::std::string& operation, - ::Ice::OperationMode mode, + const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::vector< ::Ice::Byte>& inParams, - const ::IceInternal::Function<void (bool, const ::std::vector< ::Ice::Byte>&)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + const ::IceInternal::Function<void (bool, const ::std::vector< ::Ice::Byte>&)>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { return __begin_ice_invoke(operation, mode, inParams, 0, response, exception, sent); } - + ::Ice::AsyncResultPtr begin_ice_invoke( - const ::std::string& operation, - ::Ice::OperationMode mode, + const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::vector< ::Ice::Byte>& inParams, const ::Ice::Context& ctx, const ::IceInternal::Function<void (bool, const ::std::vector< ::Ice::Byte>&)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { return __begin_ice_invoke(operation, mode, inParams, &ctx, response, exception, sent); } - + ::Ice::AsyncResultPtr begin_ice_invoke( const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::OperationMode mode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>& inParams, const ::IceInternal::Function<void (bool, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>&)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { return __begin_ice_invoke(operation, mode, inParams, 0, response, exception, sent); } - + ::Ice::AsyncResultPtr begin_ice_invoke( const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::OperationMode mode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>& inParams, const ::Ice::Context& ctx, const ::IceInternal::Function<void (bool, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>&)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { @@ -639,23 +642,23 @@ public: } #endif - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::vector< ::Ice::Byte>& inParams) { return begin_ice_invoke(operation, mode, inParams, 0, ::IceInternal::__dummyCallback, 0); } - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::vector< ::Ice::Byte>& inParams, const ::Ice::Context& __ctx) { return begin_ice_invoke(operation, mode, inParams, &__ctx, ::IceInternal::__dummyCallback, 0); } - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::vector< ::Ice::Byte>& inParams, const ::Ice::CallbackPtr& __del, const ::Ice::LocalObjectPtr& __cookie = 0) @@ -663,8 +666,8 @@ public: return begin_ice_invoke(operation, mode, inParams, 0, __del, __cookie); } - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::vector< ::Ice::Byte>& inParams, const ::Ice::Context& __ctx, const ::Ice::CallbackPtr& __del, @@ -673,8 +676,8 @@ public: return begin_ice_invoke(operation, mode, inParams, &__ctx, __del, __cookie); } - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::vector< ::Ice::Byte>& inParams, const ::Ice::Callback_Object_ice_invokePtr& __del, const ::Ice::LocalObjectPtr& __cookie = 0) @@ -682,8 +685,8 @@ public: return begin_ice_invoke(operation, mode, inParams, 0, __del, __cookie); } - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::vector< ::Ice::Byte>& inParams, const ::Ice::Context& __ctx, const ::Ice::Callback_Object_ice_invokePtr& __del, @@ -694,15 +697,15 @@ public: bool end_ice_invoke(::std::vector< ::Ice::Byte>&, const ::Ice::AsyncResultPtr&); - bool ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, - const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>& inParams, + bool ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, + const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>& inParams, ::std::vector< ::Ice::Byte>& outParams) { return ice_invoke(operation, mode, inParams, outParams, 0); } - bool ice_invoke(const ::std::string& operation, + bool ice_invoke(const ::std::string& operation, ::Ice::OperationMode mode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>& inParams, ::std::vector< ::Ice::Byte>& outParams, @@ -713,18 +716,18 @@ public: bool ice_invoke_async(const ::Ice::AMI_Array_Object_ice_invokePtr&, const ::std::string&, ::Ice::OperationMode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>&); - bool ice_invoke_async(const ::Ice::AMI_Array_Object_ice_invokePtr&, const ::std::string&, ::Ice::OperationMode, + bool ice_invoke_async(const ::Ice::AMI_Array_Object_ice_invokePtr&, const ::std::string&, ::Ice::OperationMode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>&, const ::Ice::Context&); - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>& inParams) { return begin_ice_invoke(operation, mode, inParams, 0, ::IceInternal::__dummyCallback, 0); } - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>& inParams, const ::Ice::Context& __ctx, const ::Ice::LocalObjectPtr& __cookie = 0) @@ -732,8 +735,8 @@ public: return begin_ice_invoke(operation, mode, inParams, &__ctx, ::IceInternal::__dummyCallback, __cookie); } - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>& inParams, const ::Ice::CallbackPtr& __del, const ::Ice::LocalObjectPtr& __cookie = 0) @@ -741,8 +744,8 @@ public: return begin_ice_invoke(operation, mode, inParams, 0, __del, __cookie); } - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>& inParams, const ::Ice::Context& __ctx, const ::Ice::CallbackPtr& __del, @@ -751,8 +754,8 @@ public: return begin_ice_invoke(operation, mode, inParams, &__ctx, __del, __cookie); } - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>& inParams, const ::Ice::Callback_Object_ice_invokePtr& __del, const ::Ice::LocalObjectPtr& __cookie = 0) @@ -760,8 +763,8 @@ public: return begin_ice_invoke(operation, mode, inParams, 0, __del, __cookie); } - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, - ::Ice::OperationMode mode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string& operation, + ::Ice::OperationMode mode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>& inParams, const ::Ice::Context& __ctx, const ::Ice::Callback_Object_ice_invokePtr& __del, @@ -835,14 +838,47 @@ public: ::std::string ice_getConnectionId() const; ::Ice::ConnectionPtr ice_getConnection(); + +#ifdef ICE_CPP11 + ::Ice::AsyncResultPtr begin_ice_getConnection( + const ::IceInternal::Function<void (const ::Ice::AsyncResultPtr&)>& completed) + { + return begin_ice_getConnectionInternal(::Ice::newCallback(completed, 0), 0); + } + + ::Ice::AsyncResultPtr begin_ice_getConnection( + const ::IceInternal::Function<void (const ::Ice::ConnectionPtr&)>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + ::IceInternal::Function<void (const ::Ice::Exception&)>()); +#endif + + ::Ice::AsyncResultPtr begin_ice_getConnection() + { + return begin_ice_getConnectionInternal(::IceInternal::__dummyCallback, 0); + } + + ::Ice::AsyncResultPtr begin_ice_getConnection(const ::Ice::CallbackPtr& __del, + const ::Ice::LocalObjectPtr& __cookie = 0) + { + return begin_ice_getConnectionInternal(__del, __cookie); + } + + ::Ice::AsyncResultPtr begin_ice_getConnection(const ::Ice::Callback_Object_ice_getConnectionPtr& __del, + const ::Ice::LocalObjectPtr& __cookie = 0) + { + return begin_ice_getConnectionInternal(__del, __cookie); + } + + ::Ice::ConnectionPtr end_ice_getConnection(const ::Ice::AsyncResultPtr&); + ::Ice::ConnectionPtr ice_getCachedConnection() const; void ice_flushBatchRequests(); bool ice_flushBatchRequests_async(const ::Ice::AMI_Object_ice_flushBatchRequestsPtr&); - + #ifdef ICE_CPP11 ::Ice::AsyncResultPtr begin_ice_flushBatchRequests( - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception, const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { return begin_ice_flushBatchRequestsInternal( @@ -875,7 +911,7 @@ public: void __copyFrom(const ::Ice::ObjectPrx&); - int __handleException(const ::Ice::Exception&, const ::IceInternal::RequestHandlerPtr&, ::Ice::OperationMode, + int __handleException(const ::Ice::Exception&, const ::IceInternal::RequestHandlerPtr&, ::Ice::OperationMode, bool, int&); void __checkTwowayOnly(const ::std::string&) const; @@ -890,7 +926,7 @@ public: protected: virtual Object* __newInstance() const; - + private: ::IceInternal::RequestHandlerPtr createRequestHandler(); @@ -903,10 +939,10 @@ private: #ifdef ICE_CPP11 ::Ice::AsyncResultPtr __begin_ice_isA( - const ::std::string&, + const ::std::string&, const ::Ice::Context*, - const ::IceInternal::Function<void (bool)>&, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& = + const ::IceInternal::Function<void (bool)>&, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& = ::IceInternal::Function<void (bool)>()); #endif @@ -920,8 +956,8 @@ private: #ifdef ICE_CPP11 ::Ice::AsyncResultPtr __begin_ice_ping( const ::Ice::Context* ctx, - const ::IceInternal::Function<void ()>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = + const ::IceInternal::Function<void ()>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()) { @@ -938,7 +974,7 @@ private: ::Ice::AsyncResultPtr __begin_ice_ids( const ::Ice::Context*, const ::IceInternal::Function<void (const ::std::vector< ::std::string>&)>&, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& = + const ::IceInternal::Function<void (const ::Ice::Exception&)>& = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& = ::IceInternal::Function<void (bool)>()); @@ -951,21 +987,21 @@ private: #ifdef ICE_CPP11 ::Ice::AsyncResultPtr __begin_ice_id( const ::Ice::Context*, - const ::IceInternal::Function<void (const ::std::string&)>&, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& = + const ::IceInternal::Function<void (const ::std::string&)>&, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& = ::IceInternal::Function<void (const ::Ice::Exception&)>(), - const ::IceInternal::Function<void (bool)>& sent = + const ::IceInternal::Function<void (bool)>& sent = ::IceInternal::Function<void (bool)>()); #endif - bool ice_invoke(const ::std::string&, - ::Ice::OperationMode, + bool ice_invoke(const ::std::string&, + ::Ice::OperationMode, const ::std::vector< ::Ice::Byte>&, ::std::vector< ::Ice::Byte>&, const ::Ice::Context*); - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string&, - ::Ice::OperationMode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string&, + ::Ice::OperationMode, const ::std::vector< ::Ice::Byte>&, const ::Ice::Context*, const ::IceInternal::CallbackBasePtr&, @@ -973,42 +1009,45 @@ private: #ifdef ICE_CPP11 ::Ice::AsyncResultPtr __begin_ice_invoke( - const ::std::string&, - ::Ice::OperationMode, + const ::std::string&, + ::Ice::OperationMode, const ::std::vector< ::Ice::Byte>&, const ::Ice::Context*, const ::IceInternal::Function<void (bool, const ::std::vector< ::Ice::Byte>&)>&, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& = + const ::IceInternal::Function<void (const ::Ice::Exception&)>& = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& = ::IceInternal::Function<void (bool)>()); - + #endif - bool ice_invoke(const ::std::string&, + bool ice_invoke(const ::std::string&, ::Ice::OperationMode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>&, ::std::vector< ::Ice::Byte>&, const ::Ice::Context*); - ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string&, - ::Ice::OperationMode, + ::Ice::AsyncResultPtr begin_ice_invoke(const ::std::string&, + ::Ice::OperationMode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>&, const ::Ice::Context*, const ::IceInternal::CallbackBasePtr&, const ::Ice::LocalObjectPtr&); - + #ifdef ICE_CPP11 ::Ice::AsyncResultPtr __begin_ice_invoke( const ::std::string&, - ::Ice::OperationMode, + ::Ice::OperationMode, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>&, const ::Ice::Context*, const ::IceInternal::Function<void (bool, const ::std::pair<const ::Ice::Byte*, const ::Ice::Byte*>&)>&, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& = + const ::IceInternal::Function<void (const ::Ice::Exception&)>& = ::IceInternal::Function<void (const ::Ice::Exception&)>(), const ::IceInternal::Function<void (bool)>& = ::IceInternal::Function<void (bool)>()); #endif + ::Ice::AsyncResultPtr begin_ice_getConnectionInternal(const ::IceInternal::CallbackBasePtr&, + const ::Ice::LocalObjectPtr&); + ::Ice::AsyncResultPtr begin_ice_flushBatchRequestsInternal(const ::IceInternal::CallbackBasePtr&, const ::Ice::LocalObjectPtr&); @@ -1085,7 +1124,7 @@ inline bool operator==(const ProxyHandle<T>& lhs, const ProxyHandle<U>& rhs) else { return !l && !r; - } + } } template<typename T, typename U> @@ -1131,7 +1170,7 @@ inline bool operator>=(const ProxyHandle<T>& lhs, const ProxyHandle<U>& rhs) // // checkedCast and uncheckedCast functions without facet: // -template<typename P> P +template<typename P> P checkedCastImpl(const ::Ice::ObjectPrx& b, const ::Ice::Context* context) { P d = 0; @@ -1140,7 +1179,7 @@ checkedCastImpl(const ::Ice::ObjectPrx& b, const ::Ice::Context* context) typedef typename P::element_type T; d = dynamic_cast<T*>(b.get()); - if(!d && (context == 0 ? + if(!d && (context == 0 ? b->ice_isA(T::ice_staticId()) : b->ice_isA(T::ice_staticId(), *context))) { @@ -1151,7 +1190,7 @@ checkedCastImpl(const ::Ice::ObjectPrx& b, const ::Ice::Context* context) return d; } -template<typename P> P +template<typename P> P uncheckedCastImpl(const ::Ice::ObjectPrx& b) { P d = 0; @@ -1169,7 +1208,7 @@ uncheckedCastImpl(const ::Ice::ObjectPrx& b) return d; } -// +// // checkedCast and uncheckedCast with facet: // @@ -1184,13 +1223,13 @@ ICE_API ::Ice::ObjectPrx checkedCastImpl(const ::Ice::ObjectPrx&, const std::str // We have to use inline functions for broken compilers such as VC7. // -template<> inline ::Ice::ObjectPrx +template<> inline ::Ice::ObjectPrx checkedCastImpl< ::Ice::ObjectPrx>(const ::Ice::ObjectPrx& b, const std::string& f, const ::Ice::Context* context) { return checkedCastImpl(b, f, "::Ice::Object", context); } -template<> inline ::Ice::ObjectPrx +template<> inline ::Ice::ObjectPrx uncheckedCastImpl< ::Ice::ObjectPrx>(const ::Ice::ObjectPrx& b, const std::string& f) { ::Ice::ObjectPrx d = 0; @@ -1201,7 +1240,7 @@ uncheckedCastImpl< ::Ice::ObjectPrx>(const ::Ice::ObjectPrx& b, const std::strin return d; } -template<typename P> P +template<typename P> P checkedCastImpl(const ::Ice::ObjectPrx& b, const std::string& f, const ::Ice::Context* context) { P d = 0; @@ -1217,7 +1256,7 @@ checkedCastImpl(const ::Ice::ObjectPrx& b, const std::string& f, const ::Ice::Co return d; } -template<typename P> P +template<typename P> P uncheckedCastImpl(const ::Ice::ObjectPrx& b, const std::string& f) { P d = 0; @@ -1235,9 +1274,9 @@ uncheckedCastImpl(const ::Ice::ObjectPrx& b, const std::string& f) // // checkedCast and uncheckedCast functions provided in the global namespace -// +// -template<typename P, typename Y> inline P +template<typename P, typename Y> inline P checkedCast(const ::IceInternal::ProxyHandle<Y>& b) { Y* tag = 0; @@ -1245,7 +1284,7 @@ checkedCast(const ::IceInternal::ProxyHandle<Y>& b) return ::IceInternal::checkedCastHelper<typename P::element_type>(b, tag, ctx); } -template<typename P, typename Y> inline P +template<typename P, typename Y> inline P checkedCast(const ::IceInternal::ProxyHandle<Y>& b, const ::Ice::Context& context) { Y* tag = 0; @@ -1259,20 +1298,20 @@ uncheckedCast(const ::IceInternal::ProxyHandle<Y>& b) return ::IceInternal::uncheckedCastHelper<typename P::element_type>(b, tag); } -template<typename P> inline P +template<typename P> inline P checkedCast(const ::Ice::ObjectPrx& b, const std::string& f) { Ice::Context* ctx = 0; return ::IceInternal::checkedCastImpl<P>(b, f, ctx); } -template<typename P> inline P +template<typename P> inline P checkedCast(const ::Ice::ObjectPrx& b, const std::string& f, const ::Ice::Context& context) { return ::IceInternal::checkedCastImpl<P>(b, f, &context); } -template<typename P> inline P +template<typename P> inline P uncheckedCast(const ::Ice::ObjectPrx& b, const std::string& f) { return ::IceInternal::uncheckedCastImpl<P>(b, f); @@ -1448,7 +1487,7 @@ public: typedef void (T::*Sent)(bool); typedef void (T::*Response)(); - OnewayCallbackNC(const TPtr& instance, Response cb, Exception excb, Sent sentcb) : + OnewayCallbackNC(const TPtr& instance, Response cb, Exception excb, Sent sentcb) : CallbackNC<T>(instance, excb, sentcb), _response(cb) { CallbackBase::checkCallback(instance, cb != 0 || excb != 0); @@ -1487,7 +1526,7 @@ public: typedef void (T::*Sent)(bool, const CT&); typedef void (T::*Response)(const CT&); - OnewayCallback(const TPtr& instance, Response cb, Exception excb, Sent sentcb) : + OnewayCallback(const TPtr& instance, Response cb, Exception excb, Sent sentcb) : Callback<T, CT>(instance, excb, sentcb), _response(cb) { CallbackBase::checkCallback(instance, cb != 0 || excb != 0); @@ -1511,13 +1550,13 @@ public: } private: - + Response _response; }; } -namespace Ice +namespace Ice { template<class T> @@ -1652,8 +1691,8 @@ public: virtual void completed(const ::Ice::AsyncResultPtr& __result) const { ::std::vector< ::std::string> __ret; - try - { + try + { __ret = __result->getProxy()->end_ice_ids(__result); } catch(const ::Ice::Exception& ex) @@ -1691,8 +1730,8 @@ public: virtual void completed(const ::Ice::AsyncResultPtr& __result) const { ::std::vector< ::std::string> __ret; - try - { + try + { __ret = __result->getProxy()->end_ice_ids(__result); } catch(const ::Ice::Exception& ex) @@ -1702,7 +1741,7 @@ public: } if(_response) { - (::IceInternal::Callback<T, CT>::_callback.get()->*_response)(__ret, + (::IceInternal::Callback<T, CT>::_callback.get()->*_response)(__ret, CT::dynamicCast(__result->getCookie())); } } @@ -1893,8 +1932,8 @@ public: ::IceInternal::Callback<T, CT>::exception(__result, ex); return; } - (::IceInternal::Callback<T, CT>::_callback.get()->*_response)(__ok, - outParams, + (::IceInternal::Callback<T, CT>::_callback.get()->*_response)(__ok, + outParams, CT::dynamicCast(__result->getCookie())); } else @@ -1913,7 +1952,7 @@ public: if(_responseArray) { (::IceInternal::Callback<T, CT>::_callback.get()->*_responseArray)(__ok, - outParams, + outParams, CT::dynamicCast( __result->getCookie())); } @@ -1927,7 +1966,90 @@ private: }; template<class T> -class CallbackNC_Object_ice_flushBatchRequests : public Callback_Object_ice_flushBatchRequests_Base, public ::IceInternal::OnewayCallbackNC<T> +class CallbackNC_Object_ice_getConnection : public Callback_Object_ice_getConnection_Base, + public ::IceInternal::CallbackNC<T> +{ +public: + + typedef IceUtil::Handle<T> TPtr; + + typedef void (T::*Response)(const ::Ice::ConnectionPtr&); + typedef void (T::*Exception)(const ::Ice::Exception&); + typedef void (T::*Sent)(bool); + + CallbackNC_Object_ice_getConnection(const TPtr& instance, Response cb, Exception excb, Sent sentcb) : + ::IceInternal::CallbackNC<T>(instance, excb, sentcb), _response(cb) + { + } + + + virtual void completed(const ::Ice::AsyncResultPtr& __result) const + { + ::Ice::ConnectionPtr __ret; + try + { + __ret = __result->getProxy()->end_ice_getConnection(__result); + } + catch(const ::Ice::Exception& ex) + { + ::IceInternal::CallbackNC<T>::exception(__result, ex); + return; + } + if(_response) + { + (::IceInternal::CallbackNC<T>::_callback.get()->*_response)(__ret); + } + } + +private: + + Response _response; +}; + +template<class T, typename CT> +class Callback_Object_ice_getConnection : public Callback_Object_ice_getConnection_Base, + public ::IceInternal::Callback<T, CT> +{ +public: + + typedef IceUtil::Handle<T> TPtr; + + typedef void (T::*Response)(const ::Ice::ConnectionPtr&, const CT&); + typedef void (T::*Exception)(const ::Ice::Exception&, const CT&); + typedef void (T::*Sent)(bool, const CT&); + + Callback_Object_ice_getConnection(const TPtr& instance, Response cb, Exception excb, Sent sentcb) : + ::IceInternal::Callback<T, CT>(instance, excb, sentcb), _response(cb) + { + } + + virtual void completed(const ::Ice::AsyncResultPtr& __result) const + { + ::Ice::ConnectionPtr __ret; + try + { + __ret = __result->getProxy()->end_ice_getConnection(__result); + } + catch(const ::Ice::Exception& ex) + { + ::IceInternal::Callback<T, CT>::exception(__result, ex); + return; + } + if(_response) + { + (::IceInternal::Callback<T, CT>::_callback.get()->*_response)(__ret, + CT::dynamicCast(__result->getCookie())); + } + } + +private: + + Response _response; +}; + +template<class T> +class CallbackNC_Object_ice_flushBatchRequests : public Callback_Object_ice_flushBatchRequests_Base, + public ::IceInternal::OnewayCallbackNC<T> { public: @@ -1943,7 +2065,8 @@ public: }; template<class T, typename CT> -class Callback_Object_ice_flushBatchRequests : public Callback_Object_ice_flushBatchRequests_Base, public ::IceInternal::OnewayCallback<T, CT> +class Callback_Object_ice_flushBatchRequests : public Callback_Object_ice_flushBatchRequests_Base, + public ::IceInternal::OnewayCallback<T, CT> { public: @@ -2259,7 +2382,7 @@ newCallback_Object_ice_invoke(const IceUtil::Handle<T>& instance, template<class T, typename CT> Callback_Object_ice_invokePtr newCallback_Object_ice_invoke(const IceUtil::Handle<T>& instance, - void (T::*cb)(bool, const std::pair<const Byte*, const Byte*>&, + void (T::*cb)(bool, const std::pair<const Byte*, const Byte*>&, const CT&), void (T::*excb)(const ::Ice::Exception&, const CT&), void (T::*sentcb)(bool, const CT&) = 0) @@ -2337,6 +2460,38 @@ newCallback_Object_ice_invoke(T* instance, instance, static_cast<void (T::*)(bool, const std::vector<Ice::Byte>&, const CT&)>(0), excb, sentcb); } +template<class T> Callback_Object_ice_getConnectionPtr +newCallback_Object_ice_getConnection(const IceUtil::Handle<T>& instance, + void (T::*cb)(const ::Ice::ConnectionPtr&), + void (T::*excb)(const ::Ice::Exception&)) +{ + return new CallbackNC_Object_ice_getConnection<T>(instance, cb, excb, 0); +} + +template<class T, typename CT> Callback_Object_ice_getConnectionPtr +newCallback_Object_ice_getConnection(const IceUtil::Handle<T>& instance, + void (T::*cb)(const ::Ice::ConnectionPtr&, const CT&), + void (T::*excb)(const ::Ice::Exception&, const CT&)) +{ + return new Callback_Object_ice_getConnection<T, CT>(instance, cb, excb, 0); +} + +template<class T> Callback_Object_ice_getConnectionPtr +newCallback_Object_ice_getConnection(T* instance, + void (T::*cb)(const ::Ice::ConnectionPtr&), + void (T::*excb)(const ::Ice::Exception&)) +{ + return new CallbackNC_Object_ice_getConnection<T>(instance, cb, excb, 0); +} + +template<class T, typename CT> Callback_Object_ice_getConnectionPtr +newCallback_Object_ice_getConnection(T* instance, + void (T::*cb)(const ::Ice::ConnectionPtr&, const CT&), + void (T::*excb)(const ::Ice::Exception&, const CT&)) +{ + return new Callback_Object_ice_getConnection<T, CT>(instance, cb, excb, 0); +} + template<class T> Callback_Object_ice_flushBatchRequestsPtr newCallback_Object_ice_flushBatchRequests(const IceUtil::Handle<T>& instance, void (T::*excb)(const ::Ice::Exception&), diff --git a/cpp/src/Ice/CollocatedRequestHandler.h b/cpp/src/Ice/CollocatedRequestHandler.h index 7b80a2a036a..36462f80feb 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.h +++ b/cpp/src/Ice/CollocatedRequestHandler.h @@ -63,7 +63,7 @@ public: virtual Ice::ConnectionIPtr getConnection(); virtual Ice::ConnectionIPtr waitForConnection(); - + void invokeRequest(Outgoing*); AsyncStatus invokeAsyncRequest(OutgoingAsync*); void invokeBatchRequests(BatchOutgoing*); @@ -77,7 +77,7 @@ public: private: void handleException(Ice::Int, const Ice::Exception&); - + const Ice::ObjectAdapterIPtr _adapter; const bool _dispatcher; const Ice::LoggerPtr _logger; diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 10f756be30c..874858e1c7e 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -28,27 +28,27 @@ namespace class FlushRequestsWithException : public DispatchWorkItem { public: - + FlushRequestsWithException(const Ice::ConnectionPtr& connection, const ConnectRequestHandlerPtr& handler) : DispatchWorkItem(connection), _handler(handler) { } - + virtual void run() { _handler->flushRequestsWithException(); } - + private: - + const ConnectRequestHandlerPtr _handler; }; class FlushSentRequests : public DispatchWorkItem { public: - + FlushSentRequests(const Ice::ConnectionPtr& connection, const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) : DispatchWorkItem(connection), _callbacks(callbacks) { @@ -115,7 +115,7 @@ ConnectRequestHandler::prepareBatchRequest(BasicStream* os) { wait(); } - + try { if(!initialized()) @@ -146,7 +146,7 @@ ConnectRequestHandler::finishBatchRequest(BasicStream* os) _batchStream.swap(*os); - if(!_batchAutoFlush && + if(!_batchAutoFlush && _batchStream.b.size() + _batchRequestsSize > _reference->getInstance()->messageSizeMax()) { Ex::throwMemoryLimitException(__FILE__, __LINE__, _batchStream.b.size() + _batchRequestsSize, @@ -259,7 +259,7 @@ ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out) _connection->requestTimedOut(out); } -void +void ConnectRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) { { @@ -298,7 +298,7 @@ ConnectRequestHandler::getConnection() else { return _connection; - } + } } Ice::ConnectionIPtr @@ -309,7 +309,7 @@ ConnectRequestHandler::waitForConnection() { throw RetryException(*_exception.get()); } - + // // Wait for the connection establishment to complete or fail. // @@ -326,7 +326,7 @@ ConnectRequestHandler::waitForConnection() else { return _connection; - } + } } void @@ -340,7 +340,7 @@ ConnectRequestHandler::setConnection(const Ice::ConnectionIPtr& connection, bool _connection = connection; _compress = compress; } - + // // If this proxy is for a non-local object, and we are using a router, then // add this proxy to the router info object. @@ -369,7 +369,7 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex) // // If some requests were queued, we notify them of the failure. This is done from a thread - // from the client thread pool since this will result in ice_exception callbacks to be + // from the client thread pool since this will result in ice_exception callbacks to be // called. // if(!_requests.empty()) @@ -384,7 +384,7 @@ void ConnectRequestHandler::addedProxy() { // - // The proxy was added to the router info, we're now ready to send the + // The proxy was added to the router info, we're now ready to send the // queued requests. // flushRequests(); @@ -406,7 +406,7 @@ ConnectRequestHandler::initialized() { wait(); } - + if(_exception.get()) { _exception->ice_throw(); @@ -425,17 +425,17 @@ ConnectRequestHandler::flushRequests() { Lock sync(*this); assert(_connection && !_initialized); - + while(_batchRequestInProgress) { wait(); } - + // // We set the _flushing flag to true to prevent any additional queuing. Callers // might block for a little while as the queued requests are being sent but this // shouldn't be an issue as the request sends are non-blocking. - // + // _flushing = true; } @@ -486,7 +486,7 @@ ConnectRequestHandler::flushRequests() // RetryException. We handle the exception like it // was an exception that occured while sending the // request. - // + // Lock sync(*this); assert(!_exception.get() && !_requests.empty()); _exception.reset(ex.get()->ice_clone()); @@ -504,11 +504,11 @@ ConnectRequestHandler::flushRequests() { _reference->getInstance()->clientThreadPool()->dispatch(new FlushSentRequests(_connection, sentCallbacks)); } - + // // We've finished sending the queued requests and the request handler now sends - // the requests over the connection directly. It's time to substitute the - // request handler of the proxy with the more efficient connection request + // the requests over the connection directly. It's time to substitute the + // request handler of the proxy with the more efficient connection request // handler which does not have any synchronization. This also breaks the cyclic // reference count with the proxy. // @@ -538,11 +538,11 @@ ConnectRequestHandler::flushRequestsWithException() for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { if(p->out) - { + { p->out->finished(*_exception.get()); } else if(p->outAsync) - { + { p->outAsync->__finished(*_exception.get()); } else diff --git a/cpp/src/Ice/ConnectRequestHandler.h b/cpp/src/Ice/ConnectRequestHandler.h index a95f62f8717..ea86da211ff 100644 --- a/cpp/src/Ice/ConnectRequestHandler.h +++ b/cpp/src/Ice/ConnectRequestHandler.h @@ -25,7 +25,7 @@ namespace IceInternal { -class ConnectRequestHandler : public RequestHandler, +class ConnectRequestHandler : public RequestHandler, public Reference::GetConnectionCallback, public RouterInfo::AddProxyCallback, public IceUtil::Monitor<IceUtil::Mutex> diff --git a/cpp/src/Ice/ConnectionRequestHandler.cpp b/cpp/src/Ice/ConnectionRequestHandler.cpp index fba1ee9dc34..4d9d746675d 100644 --- a/cpp/src/Ice/ConnectionRequestHandler.cpp +++ b/cpp/src/Ice/ConnectionRequestHandler.cpp @@ -29,8 +29,8 @@ ConnectionRequestHandler::ConnectionRequestHandler(const ReferencePtr& reference } } -ConnectionRequestHandler::ConnectionRequestHandler(const ReferencePtr& reference, - const Ice::ConnectionIPtr& connection, +ConnectionRequestHandler::ConnectionRequestHandler(const ReferencePtr& reference, + const Ice::ConnectionIPtr& connection, bool compress) : RequestHandler(reference), _connection(connection), @@ -68,13 +68,13 @@ ConnectionRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr return out->__send(_connection, _compress, _response); } -void +void ConnectionRequestHandler::requestTimedOut(OutgoingMessageCallback* out) { _connection->requestTimedOut(out); } -void +void ConnectionRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) { _connection->asyncRequestTimedOut(outAsync); diff --git a/cpp/src/Ice/LoggerAdminI.cpp b/cpp/src/Ice/LoggerAdminI.cpp index a673a695f6c..4ce2cd073e2 100644 --- a/cpp/src/Ice/LoggerAdminI.cpp +++ b/cpp/src/Ice/LoggerAdminI.cpp @@ -35,15 +35,15 @@ public: virtual void attachRemoteLogger(const RemoteLoggerPrx&, const LogMessageTypeSeq&, const StringSeq&, Int, const Current&); - + virtual void detachRemoteLogger(const RemoteLoggerPrx&, const Current&); - + virtual LogMessageSeq getLog(const LogMessageTypeSeq&, const StringSeq&, Int, string&, const Current&); - + void destroy(); vector<RemoteLoggerPrx> log(const LogMessage&); - + void deadRemoteLogger(const RemoteLoggerPrx&, const LoggerPtr&, const LocalException&, const string&); const int getTraceLevel() const @@ -57,9 +57,9 @@ public: } private: - + bool removeRemoteLogger(const RemoteLoggerPrx&); - + void remoteCallCompleted(const AsyncResultPtr&); IceUtil::Mutex _mutex; @@ -69,19 +69,19 @@ private: int _traceCount; const int _maxTraceCount; const int _traceLevel; - + list<LogMessage>::iterator _oldestTrace; list<LogMessage>::iterator _oldestLog; struct ObjectIdentityCompare { - bool operator()(const RemoteLoggerPrx& lhs, const RemoteLoggerPrx& rhs) + bool operator()(const RemoteLoggerPrx& lhs, const RemoteLoggerPrx& rhs) const { // // Caller should make sure that proxies are never null // assert(lhs != 0 && rhs != 0); - + return lhs->ice_getIdentity() < rhs->ice_getIdentity(); } }; @@ -93,22 +93,22 @@ private: traceCategories(c.begin(), c.end()) { } - + const set<LogMessageType> messageTypes; const set<string> traceCategories; }; - typedef map<RemoteLoggerPrx, Filters, ObjectIdentityCompare> RemoteLoggerMap; + typedef map<RemoteLoggerPrx, Filters, ObjectIdentityCompare> RemoteLoggerMap; struct GetRemoteLoggerMapKey { - RemoteLoggerMap::key_type + RemoteLoggerMap::key_type operator()(const RemoteLoggerMap::value_type& val) { return val.first; } }; - + RemoteLoggerMap _remoteLoggerMap; const CallbackPtr _remoteCallCompleted; @@ -121,15 +121,15 @@ typedef IceUtil::Handle<LoggerAdminI> LoggerAdminIPtr; class Job : public IceUtil::Shared { public: - + Job(const vector<RemoteLoggerPrx>& r, const LogMessage& l) : remoteLoggers(r), logMessage(l) { } - + const vector<RemoteLoggerPrx> remoteLoggers; - const LogMessage logMessage; + const LogMessage logMessage; }; typedef IceUtil::Handle<Job> JobPtr; @@ -150,26 +150,26 @@ public: virtual ObjectPtr getFacet() const; virtual void destroy(); - + const LoggerPtr& getLocalLogger() const { return _localLogger; } void run(); - + private: void log(const LogMessage&); LoggerPtr _localLogger; const LoggerAdminIPtr _loggerAdmin; - + IceUtil::Monitor<IceUtil::Mutex> _monitor; bool _destroyed; IceUtil::ThreadPtr _sendLogThread; - std::deque<JobPtr> _jobQueue; + std::deque<JobPtr> _jobQueue; }; typedef IceUtil::Handle<LoggerAdminLoggerI> LoggerAdminLoggerIPtr; @@ -179,9 +179,9 @@ class SendLogThread : public IceUtil::Thread public: SendLogThread(const LoggerAdminLoggerIPtr&); - + virtual void run(); - + private: LoggerAdminLoggerIPtr _logger; @@ -195,7 +195,7 @@ private: // // Filter out messages from in/out logMessages list // -void +void filterLogMessages(LogMessageSeq& logMessages, const set<LogMessageType>& messageTypes, const set<string>& traceCategories, Int messageMax) { @@ -217,7 +217,7 @@ filterLogMessages(LogMessageSeq& logMessages, const set<LogMessageType>& message bool keepIt = false; if(messageTypes.empty() || messageTypes.count(p->type) != 0) { - if(p->type != TraceMessage || traceCategories.empty() || + if(p->type != TraceMessage || traceCategories.empty() || traceCategories.count(p->traceCategory) != 0) { keepIt = true; @@ -268,7 +268,7 @@ changeCommunicator(const RemoteLoggerPrx& prx, const CommunicatorPtr& communicat } // -// Copies a set of properties +// Copies a set of properties // void copyProperties(const string& prefix, const PropertiesPtr& from, const PropertiesPtr& to) @@ -297,7 +297,7 @@ createSendLogCommunicator(const CommunicatorPtr& communicator, const LoggerPtr& copyProperties("IceSSL.", mainProps, initData.properties); StringSeq extraProps = mainProps->getPropertyAsList("Ice.Admin.Logger.Properties"); - + if(!extraProps.empty()) { for(vector<string>::iterator p = extraProps.begin(); p != extraProps.end(); ++p) @@ -329,17 +329,17 @@ LoggerAdminI::LoggerAdminI(const PropertiesPtr& props) : } void -LoggerAdminI::attachRemoteLogger(const RemoteLoggerPrx& prx, - const LogMessageTypeSeq& messageTypes, +LoggerAdminI::attachRemoteLogger(const RemoteLoggerPrx& prx, + const LogMessageTypeSeq& messageTypes, const StringSeq& categories, - Int messageMax, + Int messageMax, const Current& current) { if(!prx) { return; // can't send this null RemoteLogger anything! } - + LoggerAdminLoggerIPtr logger = LoggerAdminLoggerIPtr::dynamicCast(current.adapter->getCommunicator()->getLogger()); if(!logger) { @@ -358,10 +358,10 @@ LoggerAdminI::attachRemoteLogger(const RemoteLoggerPrx& prx, if(!_sendLogCommunicator) { - _sendLogCommunicator = + _sendLogCommunicator = createSendLogCommunicator(current.adapter->getCommunicator(), logger->getLocalLogger()); } - + if(!_remoteLoggerMap.insert(make_pair(changeCommunicator(remoteLogger, _sendLogCommunicator), filters)).second) { if(_traceLevel > 0) @@ -378,7 +378,7 @@ LoggerAdminI::attachRemoteLogger(const RemoteLoggerPrx& prx, initLogMessages = _queue; // copy } } - + if(_traceLevel > 0) { Trace trace(logger, traceCategory); @@ -397,7 +397,7 @@ LoggerAdminI::attachRemoteLogger(const RemoteLoggerPrx& prx, throw; } } - + void LoggerAdminI::detachRemoteLogger(const RemoteLoggerPrx& remoteLogger, const Current& current) { @@ -423,24 +423,24 @@ LoggerAdminI::detachRemoteLogger(const RemoteLoggerPrx& remoteLogger, const Curr } } -LogMessageSeq -LoggerAdminI::getLog(const LogMessageTypeSeq& messageTypes, - const StringSeq& categories, +LogMessageSeq +LoggerAdminI::getLog(const LogMessageTypeSeq& messageTypes, + const StringSeq& categories, Int messageMax, string& prefix, const Current& current) { LogMessageSeq logMessages; { IceUtil::Mutex::Lock lock(_mutex); - + if(messageMax != 0) { logMessages = _queue; } } - + LoggerPtr logger = current.adapter->getCommunicator()->getLogger(); prefix = logger->getPrefix(); - + Filters filters(messageTypes, categories); filterLogMessages(logMessages, filters.messageTypes, filters.traceCategories, messageMax); return logMessages; @@ -467,11 +467,11 @@ LoggerAdminI::log(const LogMessage& logMessage) // // Put message in _queue // - if((logMessage.type != TraceMessage && _maxLogCount > 0) || - (logMessage.type == TraceMessage && _maxTraceCount > 0)) + if((logMessage.type != TraceMessage && _maxLogCount > 0) || + (logMessage.type == TraceMessage && _maxTraceCount > 0)) { list<LogMessage>::iterator p = _queue.insert(_queue.end(), logMessage); - + if(logMessage.type != TraceMessage) { assert(_maxLogCount > 0); @@ -524,18 +524,18 @@ LoggerAdminI::log(const LogMessage& logMessage) } } } - + // // Queue updated, now find which remote loggers want this message - // + // for(RemoteLoggerMap::const_iterator p = _remoteLoggerMap.begin(); p != _remoteLoggerMap.end(); ++p) { const Filters& filters = p->second; - + if(filters.messageTypes.empty() || filters.messageTypes.count(logMessage.type) != 0) { if(logMessage.type != TraceMessage || filters.traceCategories.empty() || - filters.traceCategories.count(logMessage.traceCategory) != 0) + filters.traceCategories.count(logMessage.traceCategory) != 0) { remoteLoggers.push_back(p->first); } @@ -574,7 +574,7 @@ LoggerAdminI::removeRemoteLogger(const RemoteLoggerPrx& remoteLogger) void LoggerAdminI::remoteCallCompleted(const AsyncResultPtr& r) { - try + try { r->throwLocalException(); @@ -624,7 +624,7 @@ void LoggerAdminLoggerI::print(const string& message) { LogMessage logMessage = { PrintMessage, IceUtil::Time::now().toMicroSeconds(), "", message }; - + _localLogger->print(message); log(logMessage); } @@ -633,7 +633,7 @@ void LoggerAdminLoggerI::trace(const string& category, const string& message) { LogMessage logMessage = { TraceMessage, IceUtil::Time::now().toMicroSeconds(), category, message }; - + _localLogger->trace(category, message); log(logMessage); } @@ -642,7 +642,7 @@ void LoggerAdminLoggerI::warning(const string& message) { LogMessage logMessage = { WarningMessage, IceUtil::Time::now().toMicroSeconds(), "", message }; - + _localLogger->warning(message); log(logMessage); } @@ -651,7 +651,7 @@ void LoggerAdminLoggerI::error(const string& message) { LogMessage logMessage = { ErrorMessage, IceUtil::Time::now().toMicroSeconds(), "", message }; - + _localLogger->error(message); log(logMessage); } @@ -674,15 +674,15 @@ LoggerAdminLoggerI::getFacet() const return _loggerAdmin; } -void +void LoggerAdminLoggerI::log(const LogMessage& logMessage) { - const vector<RemoteLoggerPrx> remoteLoggers = _loggerAdmin->log(logMessage); - + const vector<RemoteLoggerPrx> remoteLoggers = _loggerAdmin->log(logMessage); + if(!remoteLoggers.empty()) { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor); - + if(!_sendLogThread) { _sendLogThread = new SendLogThread(this); @@ -701,7 +701,7 @@ LoggerAdminLoggerI::destroy() bool joinThread = false; { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor); - + if(_sendLogThread) { joinThread = true; @@ -717,7 +717,7 @@ LoggerAdminLoggerI::destroy() sendLogThreadControl.join(); } - // destroy sendLogCommunicator + // destroy sendLogCommunicator _loggerAdmin->destroy(); } @@ -746,7 +746,7 @@ LoggerAdminLoggerI::run() JobPtr job = _jobQueue.front(); _jobQueue.pop_front(); lock.release(); - + for(vector<RemoteLoggerPrx>::const_iterator p = job->remoteLoggers.begin(); p != job->remoteLoggers.end(); ++p) { if(_loggerAdmin->getTraceLevel() > 1) @@ -754,7 +754,7 @@ LoggerAdminLoggerI::run() Trace trace(_localLogger, traceCategory); trace << "sending log message to `" << *p << "'"; } - + try { // @@ -766,7 +766,7 @@ LoggerAdminLoggerI::run() { _loggerAdmin->deadRemoteLogger(*p, _localLogger, ex, "log"); } - } + } } if(_loggerAdmin->getTraceLevel() > 1) @@ -786,8 +786,8 @@ SendLogThread::SendLogThread(const LoggerAdminLoggerIPtr& logger) : _logger(logger) { } - -void + +void SendLogThread::run() { _logger->run(); diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 509ba18ad06..cd3442113f8 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -38,6 +38,7 @@ IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(ProxyBatchOutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(ConnectionBatchOutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(CommunicatorBatchOutgoingAsync* p) { return p; } +IceUtil::Shared* IceInternal::upCast(GetConnectionOutgoingAsync* p) { return p; } const unsigned char Ice::AsyncResult::OK = 0x1; const unsigned char Ice::AsyncResult::Done = 0x2; @@ -414,7 +415,7 @@ IceInternal::OutgoingAsyncMessageCallback::__dispatchInvocationTimeout(const Thr { public: - InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) : + InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) : DispatchWorkItem(connection), _outAsync(outAsync) { } @@ -901,7 +902,7 @@ IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc) { int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); _observer.retried(); // Invocation is being retried. - + // // Schedule the retry. Note that we always schedule the retry // on the retry queue even if the invocation can be retried @@ -928,7 +929,7 @@ IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& commu { } -AsyncStatus +AsyncStatus IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool, bool) { _cachedConnection = connection; @@ -1094,7 +1095,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons // Assume all connections are flushed synchronously. // _sentSynchronously = true; - + // // Attach observer // @@ -1109,7 +1110,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt public: BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync, - const InstancePtr& instance, + const InstancePtr& instance, InvocationObserver& observer) : BatchOutgoingAsync(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0), _outAsync(outAsync), _observer(observer) @@ -1141,7 +1142,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt } private: - + const CommunicatorBatchOutgoingAsyncPtr _outAsync; InvocationObserver& _observer; }; @@ -1181,7 +1182,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) if(--_useCount > 0) { return; - } + } _state |= Done | OK | Sent; _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation _monitor.notifyAll(); @@ -1207,6 +1208,96 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) } } +IceInternal::GetConnectionOutgoingAsync::GetConnectionOutgoingAsync(const Ice::ObjectPrx& proxy, + const std::string& operation, + const CallbackBasePtr& delegate, + const Ice::LocalObjectPtr& cookie) : + OutgoingAsync(proxy, operation, delegate, cookie) +{ + _observer.attach(proxy.get(), operation, 0); +} + +void +IceInternal::GetConnectionOutgoingAsync::__invoke() +{ + while(true) + { + try + { + _handler = _proxy->__getRequestHandler(); + _handler->sendAsyncRequest(this); + } + catch(const RetryException&) + { + _proxy->__setRequestHandler(_handler, 0); + } + catch(const Ice::Exception& ex) + { + handleException(ex); + } + break; + } +} + +AsyncStatus +IceInternal::GetConnectionOutgoingAsync::__send(const Ice::ConnectionIPtr&, bool, bool) +{ + __sent(); + return AsyncStatusSent; +} + +AsyncStatus +IceInternal::GetConnectionOutgoingAsync::__invokeCollocated(CollocatedRequestHandler*) +{ + __sent(); + return AsyncStatusSent; +} + +bool +IceInternal::GetConnectionOutgoingAsync::__sent() +{ + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _state |= Done; + _monitor.notifyAll(); + } + __invokeCompleted(); + return false; +} + +void +IceInternal::GetConnectionOutgoingAsync::__invokeSent() +{ + // No sent callback +} + +void +IceInternal::GetConnectionOutgoingAsync::__finished(const Ice::Exception& exc) +{ + try + { + handleException(exc); + } + catch(const Ice::Exception& ex) + { + __invokeException(ex); + } +} + +void +IceInternal::GetConnectionOutgoingAsync::handleException(const Ice::Exception& exc) +{ + try + { + _instance->retryQueue()->add(this, _proxy->__handleException(exc, _handler, Ice::Idempotent, false, _cnt)); + _observer.retried(); // Invocation is being retried. + } + catch(const Ice::Exception& ex) + { + _observer.failed(ex.ice_name()); + throw; + } +} namespace { @@ -1227,13 +1318,13 @@ public: { } - virtual void + virtual void completed(const Ice::AsyncResultPtr&) const { assert(false); } - virtual CallbackBasePtr + virtual CallbackBasePtr verify(const Ice::LocalObjectPtr&) { // @@ -1245,13 +1336,13 @@ public: return 0; } - virtual void + virtual void sent(const AsyncResultPtr&) const { assert(false); } - virtual bool + virtual bool hasSentCallback() const { assert(false); @@ -1281,25 +1372,25 @@ Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& co Cpp11CB(const ::std::function<void (const AsyncResultPtr&)>& completed, const ::std::function<void (const AsyncResultPtr&)>& sent) : - _completed(completed), + _completed(completed), _sent(sent) { checkCallback(true, completed != nullptr); } - - virtual void + + virtual void completed(const AsyncResultPtr& result) const { _completed(result); } - - virtual CallbackBasePtr + + virtual CallbackBasePtr verify(const LocalObjectPtr&) { return this; // Nothing to do, the cookie is not type-safe. } - - virtual void + + virtual void sent(const AsyncResultPtr& result) const { if(_sent != nullptr) @@ -1307,19 +1398,19 @@ Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& co _sent(result); } } - - virtual bool + + virtual bool hasSentCallback() const { return _sent != nullptr; } - + private: ::std::function< void (const AsyncResultPtr&)> _completed; ::std::function< void (const AsyncResultPtr&)> _sent; }; - + return new Cpp11CB(completed, sent); } #endif diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index 720099b6927..9878f540035 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -40,6 +40,7 @@ const string ice_ids_name = "ice_ids"; const string ice_id_name = "ice_id"; const string ice_isA_name = "ice_isA"; const string ice_invoke_name = "ice_invoke"; +const string ice_getConnection_name = "ice_getConnection"; const string ice_flushBatchRequests_name = "ice_flushBatchRequests"; } @@ -472,6 +473,49 @@ IceProxy::Ice::Object::__begin_ice_invoke( return begin_ice_invoke(operation, mode, inParams, ctx, new Cpp11CB(response, exception, sent), 0); } +Ice::AsyncResultPtr +IceProxy::Ice::Object::begin_ice_getConnection( + const ::IceInternal::Function<void (const ::Ice::ConnectionPtr&)>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception) +{ + class Cpp11CB : public ::IceInternal::Cpp11FnCallbackNC + { + public: + + Cpp11CB(const ::IceInternal::Function<void (const ::Ice::ConnectionPtr&)>& responseFunc, + const ::std::function<void (const ::Ice::Exception&)>& exceptionFunc) : + ::IceInternal::Cpp11FnCallbackNC(exceptionFunc, nullptr), + _response(responseFunc) + { + CallbackBase::checkCallback(true, responseFunc || exceptionFunc != nullptr); + } + + virtual void completed(const ::Ice::AsyncResultPtr& __result) const + { + ::Ice::ObjectPrx __proxy = ::Ice::ObjectPrx::uncheckedCast(__result->getProxy()); + ::Ice::ConnectionPtr __ret; + try + { + __ret = __proxy->end_ice_getConnection(__result); + } + catch(const ::Ice::Exception& ex) + { + Cpp11FnCallbackNC::exception(__result, ex); + return; + } + if(_response != nullptr) + { + _response(__ret); + } + } + + private: + + ::std::function<void (const ::Ice::ConnectionPtr&)> _response; + }; + return begin_ice_getConnectionInternal(new Cpp11CB(response, exception), 0); +} + #endif @@ -1440,6 +1484,31 @@ IceProxy::Ice::Object::ice_getConnection() } } +AsyncResultPtr +IceProxy::Ice::Object::begin_ice_getConnectionInternal(const ::IceInternal::CallbackBasePtr& del, + const ::Ice::LocalObjectPtr& cookie) +{ + ::IceInternal::GetConnectionOutgoingAsyncPtr __result = + new ::IceInternal::GetConnectionOutgoingAsync(this, ice_getConnection_name, del, cookie); + try + { + __result->__invoke(); + } + catch(const Exception& __ex) + { + __result->__invokeExceptionAsync(__ex); + } + return __result; +} + +ConnectionPtr +IceProxy::Ice::Object::end_ice_getConnection(const AsyncResultPtr& __result) +{ + AsyncResult::__check(__result, this, ice_getConnection_name); + __result->__wait(); + return ice_getCachedConnection(); +} + ConnectionPtr IceProxy::Ice::Object::ice_getCachedConnection() const { diff --git a/cpp/src/Ice/RequestHandler.cpp b/cpp/src/Ice/RequestHandler.cpp index e5c7b86565c..2cbf7826213 100644 --- a/cpp/src/Ice/RequestHandler.cpp +++ b/cpp/src/Ice/RequestHandler.cpp @@ -13,8 +13,7 @@ using namespace std; using namespace IceInternal; -IceUtil::Shared* IceInternal::upCast(RequestHandler* obj) { return obj; } - +IceUtil::Shared* IceInternal::upCast(RequestHandler* p) { return p; } RetryException::RetryException(const Ice::LocalException& ex) { @@ -37,7 +36,7 @@ RequestHandler::~RequestHandler() { } -RequestHandler::RequestHandler(const ReferencePtr& reference) : +RequestHandler::RequestHandler(const ReferencePtr& reference) : _reference(reference), _response(reference->getMode() == Reference::ModeTwoway) { diff --git a/cpp/src/Ice/RequestHandler.h b/cpp/src/Ice/RequestHandler.h index e2d83c63e20..45cf917dd0a 100644 --- a/cpp/src/Ice/RequestHandler.h +++ b/cpp/src/Ice/RequestHandler.h @@ -48,7 +48,6 @@ public: private: - IceUtil::UniquePtr<Ice::LocalException> _ex; }; @@ -72,7 +71,7 @@ public: virtual Ice::ConnectionIPtr getConnection() = 0; virtual Ice::ConnectionIPtr waitForConnection() = 0; - + protected: RequestHandler(const ReferencePtr&); diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp index 5cedc15abd1..c164220c2ae 100644 --- a/cpp/test/Ice/ami/AllTests.cpp +++ b/cpp/test/Ice/ami/AllTests.cpp @@ -106,6 +106,13 @@ public: called(); } + void connection(const Ice::AsyncResultPtr& result) + { + test(result->getCookie() == _cookie); + test(result->getProxy()->end_ice_getConnection(result)); + called(); + } + void op(const Ice::AsyncResultPtr& result) { test(result->getCookie() == _cookie); @@ -210,6 +217,24 @@ public: } } + void connectionEx(const Ice::AsyncResultPtr& result) + { + test(result->getCookie() == _cookie); + try + { + result->getProxy()->end_ice_getConnection(result); + test(false); + } + catch(const Ice::NoEndpointException&) + { + called(); + } + catch(const Ice::Exception&) + { + test(false); + } + } + void opEx(const Ice::AsyncResultPtr& result) { test(result->getCookie() == _cookie); @@ -265,6 +290,12 @@ public: called(); } + void connection(const Ice::ConnectionPtr& conn) + { + test(conn); + called(); + } + void op() { called(); @@ -330,6 +361,13 @@ public: called(); } + void connection(const Ice::ConnectionPtr& conn, const CookiePtr& cookie) + { + test(conn); + test(cookie == _cookie); + called(); + } + void op(const CookiePtr& cookie) { test(cookie == _cookie); @@ -395,6 +433,11 @@ public: test(false); } + void connection(const Ice::ConnectionPtr&) + { + test(false); + } + void op() { test(false); @@ -804,7 +847,7 @@ typedef IceUtil::Handle<Thrower> ThrowerPtr; } void -allTests(const Ice::CommunicatorPtr& communicator) +allTests(const Ice::CommunicatorPtr& communicator, bool collocated) { string sref = "test:default -p 12010"; Ice::ObjectPrx obj = communicator->stringToProxy(sref); @@ -843,6 +886,12 @@ allTests(const Ice::CommunicatorPtr& communicator) result = p->begin_ice_ids(ctx); test(p->end_ice_ids(result).size() == 2); + if(!collocated) + { + result = p->begin_ice_getConnection(); + test(p->end_ice_getConnection(result)); + } + result = p->begin_op(); p->end_op(result); result = p->begin_op(ctx); @@ -917,6 +966,14 @@ allTests(const Ice::CommunicatorPtr& communicator) p->begin_ice_ids(ctx, Ice::newCallback(cbWC, &AsyncCallback::ids), cookie); cbWC->check(); + if(!collocated) + { + p->begin_ice_getConnection(Ice::newCallback(cb, &AsyncCallback::connection)); + cb->check(); + p->begin_ice_getConnection(Ice::newCallback(cbWC, &AsyncCallback::connection), cookie); + cbWC->check(); + } + p->begin_op(Ice::newCallback(cb, &AsyncCallback::op)); cb->check(); p->begin_op(Ice::newCallback(cbWC, &AsyncCallback::op), cookie); @@ -945,7 +1002,7 @@ allTests(const Ice::CommunicatorPtr& communicator) cbWC->check(); } cout << "ok" << endl; - + #ifdef ICE_CPP11 cout << "testing C++11 async callback... " << flush; { @@ -972,6 +1029,12 @@ allTests(const Ice::CommunicatorPtr& communicator) p->begin_ice_ids(ctx, [=](const ::Ice::AsyncResultPtr& r){ cb->ids(r); }); cb->check(); + if(!collocated) + { + p->begin_ice_getConnection([=](const ::Ice::AsyncResultPtr& r){ cb->connection(r); }); + cb->check(); + } + p->begin_op([=](const ::Ice::AsyncResultPtr& r){ cb->op(r); }); cb->check(); p->begin_op(ctx, [=](const ::Ice::AsyncResultPtr& r){ cb->op(r); }); @@ -1040,6 +1103,16 @@ allTests(const Ice::CommunicatorPtr& communicator) p->begin_ice_ids(ctx, Ice::newCallback_Object_ice_ids(cbWC, &ResponseCallbackWC::ids, nullExWC), cookie); cbWC->check(); + if(!collocated) + { + p->begin_ice_getConnection(Ice::newCallback_Object_ice_getConnection(cb, &ResponseCallback::connection, + nullEx)); + cb->check(); + p->begin_ice_getConnection(Ice::newCallback_Object_ice_getConnection(cbWC, &ResponseCallbackWC::connection, + nullExWC), cookie); + cbWC->check(); + } + p->begin_op(Test::newCallback_TestIntf_op(cb, &ResponseCallback::op, nullEx)); cb->check(); p->begin_op(Test::newCallback_TestIntf_op(cbWC, &ResponseCallbackWC::op, nullExWC), cookie); @@ -1093,7 +1166,6 @@ allTests(const Ice::CommunicatorPtr& communicator) p->begin_ice_ping(ctx, [=](){ cb->ping(); }); cb->check(); - p->begin_ice_id([=](const string& id){ cb->id(id); }); cb->check(); @@ -1106,6 +1178,12 @@ allTests(const Ice::CommunicatorPtr& communicator) p->begin_ice_ids(ctx, [=](const Ice::StringSeq& ids){ cb->ids(ids); }); cb->check(); + if(!collocated) + { + p->begin_ice_getConnection([=](const Ice::ConnectionPtr& conn){ cb->connection(conn); }); + cb->check(); + } + p->begin_op([=](){ cb->op(); }); cb->check(); @@ -1126,7 +1204,7 @@ allTests(const Ice::CommunicatorPtr& communicator) } cout << "ok" << endl; #endif - + cout << "testing local exceptions... " << flush; { Test::TestIntfPrx indirect = Test::TestIntfPrx::uncheckedCast(p->ice_adapterId("dummy")); @@ -1201,13 +1279,19 @@ allTests(const Ice::CommunicatorPtr& communicator) i->begin_ice_ids(Ice::newCallback(cbWC, &AsyncCallback::idsEx), cookie); cbWC->check(); + if(!collocated) + { + i->begin_ice_getConnection(Ice::newCallback(cb, &AsyncCallback::connectionEx)); + cb->check(); + } + i->begin_op(Ice::newCallback(cb, &AsyncCallback::opEx)); cb->check(); i->begin_op(Ice::newCallback(cbWC, &AsyncCallback::opEx), cookie); cbWC->check(); } cout << "ok" << endl; - + cout << "testing local exceptions with response callback... " << flush; { Test::TestIntfPrx i = Test::TestIntfPrx::uncheckedCast(p->ice_adapterId("dummy")); @@ -1241,14 +1325,21 @@ allTests(const Ice::CommunicatorPtr& communicator) &ExceptionCallbackWC::ex), cookie); cbWC->check(); + if(!collocated) + { + i->begin_ice_getConnection( + Ice::newCallback_Object_ice_getConnection(cb, &ExceptionCallback::connection, &ExceptionCallback::ex)); + cb->check(); + } + i->begin_op(Test::newCallback_TestIntf_op(cb, &ExceptionCallback::op, &ExceptionCallback::ex)); cb->check(); i->begin_op(Test::newCallback_TestIntf_op(cbWC, &ExceptionCallbackWC::op, &ExceptionCallbackWC::ex), cookie); cbWC->check(); } cout << "ok" << endl; - -#ifdef ICE_CPP11 + +#ifdef ICE_CPP11 cout << "testing local exceptions with C++11 response callback... " << flush; { Test::TestIntfPrx i = Test::TestIntfPrx::uncheckedCast(p->ice_adapterId("dummy")); @@ -1266,6 +1357,13 @@ allTests(const Ice::CommunicatorPtr& communicator) i->begin_ice_ids([](const Ice::StringSeq&){ test(false); }, [=](const Ice::Exception& ex){ cb->ex(ex); }); cb->check(); + if(!collocated) + { + i->begin_ice_getConnection([](const Ice::ConnectionPtr&){ test(false); }, + [=](const Ice::Exception& ex){ cb->ex(ex); }); + cb->check(); + } + i->begin_op([](){ test(false); }, [=](const Ice::Exception& ex){ cb->ex(ex); }); cb->check(); } @@ -1320,7 +1418,7 @@ allTests(const Ice::CommunicatorPtr& communicator) cbWC->check(); } cout << "ok" << endl; - + #ifdef ICE_CPP11 cout << "testing C++11 exception callback... " << flush; { @@ -1330,6 +1428,12 @@ allTests(const Ice::CommunicatorPtr& communicator) i->begin_ice_isA(Test::TestIntf::ice_staticId(), nullptr, [=](const Ice::Exception& ex){cb->ex(ex); }); cb->check(); + if(!collocated) + { + i->begin_ice_getConnection(nullptr, [=](const Ice::Exception& ex){ cb->ex(ex); }); + cb->check(); + } + i->begin_op(nullptr, [=](const Ice::Exception& ex){ cb->ex(ex); }); cb->check(); @@ -1426,7 +1530,7 @@ allTests(const Ice::CommunicatorPtr& communicator) } } cout << "ok" << endl; - + #ifdef ICE_CPP11 cout << "testing C++11 sent callback... " << flush; { @@ -1451,7 +1555,7 @@ allTests(const Ice::CommunicatorPtr& communicator) [=](const Ice::Exception& ex){ cb->ex(ex); }, [=](bool sent){ cb->sent(sent); }); cb->check(); - + p->begin_op([=](){ cb->op(); }, [=](const Ice::Exception& ex){ cb->ex(ex); }, @@ -1646,7 +1750,7 @@ allTests(const Ice::CommunicatorPtr& communicator) } } cout << "ok" << endl; - + #ifdef ICE_CPP11 cout << "testing unexpected exceptions from C++11 callback... " << flush; { @@ -1662,7 +1766,7 @@ allTests(const Ice::CommunicatorPtr& communicator) p->begin_op([=](){ cb->op(); }, [=](const Ice::Exception& ex){ cb->ex(ex); }); cb->check(); - + p->begin_op([=](){ cb->noOp(); }, [=](const Ice::Exception& ex){ cb->noEx(ex); }, [=](bool sent){ cb->sent(sent); }); cb->check(); @@ -1799,7 +1903,7 @@ allTests(const Ice::CommunicatorPtr& communicator) test(r->isCompleted()); test(p->opBatchCount() == 0); } - + { // // Exception with cookie. @@ -1996,7 +2100,7 @@ allTests(const Ice::CommunicatorPtr& communicator) } } cout << "ok" << endl; - + #ifdef ICE_CPP11 cout << "testing C++11 batch requests with connection... " << flush; { @@ -2464,7 +2568,7 @@ allTests(const Ice::CommunicatorPtr& communicator) { test((r1->sentSynchronously() && r1->isSent() && !r1->isCompleted()) || (!r1->sentSynchronously() && !r1->isCompleted())); - + test(!r2->sentSynchronously() && !r2->isCompleted()); } } @@ -2569,9 +2673,9 @@ allTests(const Ice::CommunicatorPtr& communicator) // // Send multiple opWithPayload, followed by a close and followed by multiple opWithPaylod. - // The goal is to make sure that none of the opWithPayload fail even if the server closes + // The goal is to make sure that none of the opWithPayload fail even if the server closes // the connection gracefully in between. - // + // int maxQueue = 2; bool done = false; while(!done && maxQueue < 50) @@ -2597,7 +2701,7 @@ allTests(const Ice::CommunicatorPtr& communicator) } } } - else + else { maxQueue *= 2; done = false; diff --git a/cpp/test/Ice/ami/Client.cpp b/cpp/test/Ice/ami/Client.cpp index 9559b4d5c45..fa982d492ec 100644 --- a/cpp/test/Ice/ami/Client.cpp +++ b/cpp/test/Ice/ami/Client.cpp @@ -18,8 +18,8 @@ using namespace std; int run(int, char**, const Ice::CommunicatorPtr& communicator) { - void allTests(const Ice::CommunicatorPtr&); - allTests(communicator); + void allTests(const Ice::CommunicatorPtr&, bool); + allTests(communicator, false); return EXIT_SUCCESS; } diff --git a/cpp/test/Ice/ami/Collocated.cpp b/cpp/test/Ice/ami/Collocated.cpp index 45236ebef71..e687181b158 100644 --- a/cpp/test/Ice/ami/Collocated.cpp +++ b/cpp/test/Ice/ami/Collocated.cpp @@ -34,8 +34,8 @@ run(int, char**, const Ice::CommunicatorPtr& communicator, adapter2->add(testController, communicator->stringToIdentity("testController")); adapter2->activate(); - void allTests(const Ice::CommunicatorPtr&); - allTests(communicator); + void allTests(const Ice::CommunicatorPtr&, bool); + allTests(communicator, true); return EXIT_SUCCESS; } diff --git a/cs/src/Ice/CollocatedRequestHandler.cs b/cs/src/Ice/CollocatedRequestHandler.cs index 0e6b3f84b16..8f9ac2fde55 100644 --- a/cs/src/Ice/CollocatedRequestHandler.cs +++ b/cs/src/Ice/CollocatedRequestHandler.cs @@ -238,7 +238,7 @@ namespace IceInternal _adapter.decDirectCount(); return true; } - + public void invokeException(int requestId, Ice.LocalException ex, int invokeNum) { @@ -516,4 +516,4 @@ namespace IceInternal private int _batchRequestNum; private int _batchMarker; } -}
\ No newline at end of file +} diff --git a/cs/src/Ice/ConnectRequestHandler.cs b/cs/src/Ice/ConnectRequestHandler.cs index a30f9858c34..f188e722c47 100644 --- a/cs/src/Ice/ConnectRequestHandler.cs +++ b/cs/src/Ice/ConnectRequestHandler.cs @@ -86,7 +86,7 @@ namespace IceInternal { lock(this) { - if(!initialized()) // This can't throw until _batchRequestInProgress = false + if(!initialized()) // This can't throw until _batchRequestInProgress = false { Debug.Assert(_batchRequestInProgress); _batchRequestInProgress = false; @@ -111,13 +111,13 @@ namespace IceInternal { lock(this) { - if(!initialized()) // This can't throw until _batchRequestInProgress = false + if(!initialized()) // This can't throw until _batchRequestInProgress = false { Debug.Assert(_batchRequestInProgress); _batchRequestInProgress = false; System.Threading.Monitor.PulseAll(this); - BasicStream dummy = new BasicStream(_reference.getInstance(), Ice.Util.currentProtocolEncoding, + BasicStream dummy = new BasicStream(_reference.getInstance(), Ice.Util.currentProtocolEncoding, _batchAutoFlush); _batchStream.swap(dummy); _batchRequestsSize = Protocol.requestBatchHdr.Length; @@ -392,7 +392,7 @@ namespace IceInternal // RetryException. We handle the exception like it // was an exception that occured while sending the // request. - // + // lock(this) { Debug.Assert(_exception == null && _requests.Count > 0); diff --git a/cs/src/Ice/OutgoingAsync.cs b/cs/src/Ice/OutgoingAsync.cs index 5a7e9a11f8b..0f84936de22 100644 --- a/cs/src/Ice/OutgoingAsync.cs +++ b/cs/src/Ice/OutgoingAsync.cs @@ -104,7 +104,7 @@ namespace IceInternal bool send(Ice.ConnectionI connection, bool compress, bool response, out Ice.AsyncCallback sentCallback); bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback); - + // // Called by the connection when the message is confirmed sent. The connection is locked // when this is called so this method can call the sent callback. Instead, this method @@ -500,8 +500,8 @@ namespace IceInternal { if(observer_ != null) { - childObserver_ = observer_.getCollocatedObserver(adapter, - requestId, + childObserver_ = observer_.getCollocatedObserver(adapter, + requestId, os_.size() - IceInternal.Protocol.headerSize - 4); if(childObserver_ != null) { @@ -734,10 +734,10 @@ namespace IceInternal lock(monitor_) { - handler = timeoutRequestHandler_; + handler = timeoutRequestHandler_; timeoutRequestHandler_ = null; } - + if(handler != null) { handler.asyncRequestCanceled((OutgoingAsyncMessageCallback)this, new Ice.InvocationTimeoutException()); @@ -807,9 +807,9 @@ namespace IceInternal public void prepare(string operation, Ice.OperationMode mode, Dictionary<string, string> context, bool explicitContext, bool synchronous) { - _handler = null; + handler_ = null; _sent = false; - _cnt = 0; + cnt_ = 0; _mode = mode; sentSynchronously_ = false; synchronous_ = synchronous; @@ -840,14 +840,14 @@ namespace IceInternal { try { - _handler = proxy_.getRequestHandler__(); - _handler.prepareBatchRequest(os_); + handler_ = proxy_.getRequestHandler__(); + handler_.prepareBatchRequest(os_); break; } catch(RetryException) { // Clear request handler and retry. - proxy_.setRequestHandler__(_handler, null); + proxy_.setRequestHandler__(handler_, null); } catch(Ice.LocalException ex) { @@ -856,8 +856,8 @@ namespace IceInternal observer_.failed(ex.ice_name()); } // Clear request handler - proxy_.setRequestHandler__(_handler, null); - _handler = null; + proxy_.setRequestHandler__(handler_, null); + handler_ = null; throw ex; } } @@ -918,14 +918,14 @@ namespace IceInternal return proxy_; } - public bool send(Ice.ConnectionI connection, bool compress, bool response, out Ice.AsyncCallback sentCB) + public virtual bool send(Ice.ConnectionI connection, bool compress, bool response, out Ice.AsyncCallback sentCB) { // Store away the connection for passing to the dispatcher. cachedConnection_ = connection; return connection.sendAsyncRequest(this, compress, response, out sentCB); } - public bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback) + public virtual bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback) { // The BasicStream cannot be cached if the proxy is // not a twoway or there is an invocation timeout set. @@ -937,15 +937,15 @@ namespace IceInternal handler.invokeAsyncRequest(this, synchronous_, out sentCallback); return false; } - - public Ice.AsyncCallback sent() + + public virtual Ice.AsyncCallback sent() { lock(monitor_) { bool alreadySent = (state_ & StateSent) != 0; state_ |= StateSent; _sent = true; - + Debug.Assert((state_ & StateDone) == 0); if(!proxy_.ice_isTwoway()) { @@ -977,17 +977,17 @@ namespace IceInternal } } - public new void invokeSent(Ice.AsyncCallback cb) + public virtual new void invokeSent(Ice.AsyncCallback cb) { base.invokeSent(cb); } - public new void invokeCompleted(Ice.AsyncCallback cb) + public virtual new void invokeCompleted(Ice.AsyncCallback cb) { base.invokeCompleted(cb); } - public void finished(Ice.Exception exc) + public virtual void finished(Ice.Exception exc) { lock(monitor_) { @@ -1019,11 +1019,11 @@ namespace IceInternal } } - public void + public void dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) { OutgoingAsync self = this; - threadPool.dispatch(() => + threadPool.dispatch(() => { self.finished(ex); }, connection); @@ -1053,7 +1053,7 @@ namespace IceInternal instance_.timer().cancel(this); timeoutRequestHandler_ = null; } - + replyStatus = is_.readByte(); switch(replyStatus) @@ -1221,7 +1221,7 @@ namespace IceInternal waitHandle_.Set(); } System.Threading.Monitor.PulseAll(monitor_); - + if(completedCallback_ == null) { if(observer_ != null) @@ -1243,7 +1243,7 @@ namespace IceInternal if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram) { state_ |= StateDone | StateOK; - _handler.finishBatchRequest(os_); + handler_.finishBatchRequest(os_); if(observer_ != null) { observer_.detach(); @@ -1257,9 +1257,9 @@ namespace IceInternal try { _sent = false; - _handler = proxy_.getRequestHandler__(); + handler_ = proxy_.getRequestHandler__(); Ice.AsyncCallback sentCallback; - bool sent = _handler.sendAsyncRequest(this, out sentCallback); + bool sent = handler_.sendAsyncRequest(this, out sentCallback); if(sent) { if(synchronous) // Only set sentSynchronously_ If called synchronously by the user thread. @@ -1279,11 +1279,11 @@ namespace IceInternal { if((state_ & StateDone) == 0) { - int invocationTimeout = _handler.getReference().getInvocationTimeout(); + int invocationTimeout = handler_.getReference().getInvocationTimeout(); if(invocationTimeout > 0) { instance_.timer().schedule(this, invocationTimeout); - timeoutRequestHandler_ = _handler; + timeoutRequestHandler_ = handler_; } } } @@ -1292,7 +1292,7 @@ namespace IceInternal catch(RetryException) { - proxy_.setRequestHandler__(_handler, null); // Clear request handler and retry. + proxy_.setRequestHandler__(handler_, null); // Clear request handler and retry. continue; } catch(Ice.Exception ex) @@ -1365,7 +1365,7 @@ namespace IceInternal } } - public void + public void runTimerTask() { runTimerTask__(); @@ -1389,14 +1389,14 @@ namespace IceInternal is_.reset(); } os_.reset(); - + proxy_.cacheMessageBuffers(is_, os_); } } override public void invokeExceptionAsync(Ice.Exception ex) { - if((state_ & StateDone) == 0 && _handler != null) + if((state_ & StateDone) == 0 && handler_ != null) { // // If we didn't finish a batch oneway or datagram request, we @@ -1406,7 +1406,7 @@ namespace IceInternal Reference.Mode mode = proxy_.reference__().getMode(); if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram) { - _handler.abortBatchRequest(); + handler_.abortBatchRequest(); } } base.invokeExceptionAsync(ex); @@ -1416,7 +1416,7 @@ namespace IceInternal { try { - int interval = proxy_.handleException__(exc, _handler, _mode, _sent, ref _cnt); + int interval = proxy_.handleException__(exc, handler_, _mode, _sent, ref cnt_); if(observer_ != null) { observer_.retried(); // Invocation is being retried. @@ -1458,10 +1458,10 @@ namespace IceInternal } protected Ice.ObjectPrxHelperBase proxy_; + protected RequestHandler handler_; + protected int cnt_; - private RequestHandler _handler; private Ice.EncodingVersion _encoding; - private int _cnt; private Ice.OperationMode _mode; private bool _sent; @@ -1475,7 +1475,7 @@ namespace IceInternal { } - public OutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, object cookie, BasicStream iss, + public OutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, object cookie, BasicStream iss, BasicStream os) : base(prx, operation, cookie, iss, os) { @@ -1562,7 +1562,7 @@ namespace IceInternal public class TwowayOutgoingAsync<T> : OutgoingAsync<T> { - public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb, + public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb, object cookie) : base(prx, operation, cookie) { @@ -1570,7 +1570,7 @@ namespace IceInternal _completed = cb; } - public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb, + public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb, object cookie, BasicStream iss, BasicStream os) : base(prx, operation, cookie, iss, os) { @@ -1634,6 +1634,103 @@ namespace IceInternal private ProxyOnewayCallback<T> _completed; } + public class GetConnectionOutgoingAsync : TwowayOutgoingAsync<Ice.Callback_Object_ice_getConnection> + { + public GetConnectionOutgoingAsync(Ice.ObjectPrxHelperBase proxy, string operation, + ProxyTwowayCallback<Ice.Callback_Object_ice_getConnection> cb, + object cookie) : + base(proxy, operation, cb, cookie) + { + observer_ = ObserverHelper.get(proxy, operation); + } + + public void invoke() + { + while(true) + { + try + { + handler_ = proxy_.getRequestHandler__(); + Ice.AsyncCallback sentCallback; + handler_.sendAsyncRequest(this, out sentCallback); + } + catch(RetryException) + { + proxy_.setRequestHandler__(handler_, null); + } + catch(Ice.Exception ex) + { + handleException(ex); + } + break; + } + } + + public override bool send(Ice.ConnectionI connection, bool compress, bool response, + out Ice.AsyncCallback sentCallback) + { + sent(); + sentCallback = null; + return false; + } + + public override bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback) + { + sent(); + sentCallback = null; + return false; + } + + public override Ice.AsyncCallback sent() + { + lock(monitor_) + { + state_ |= StateDone; + System.Threading.Monitor.PulseAll(monitor_); + } + invokeCompleted(completedCallback_); + return null; + } + + public override void invokeSent(Ice.AsyncCallback cb) + { + // No sent callback + } + + public override void finished(Ice.Exception exc) + { + try + { + handleException(exc); + } + catch(Ice.Exception ex) + { + invokeExceptionAsync(ex); + } + } + + private void handleException(Ice.Exception exc) + { + try + { + instance_.retryQueue().add(this, proxy_.handleException__(exc, handler_, Ice.OperationMode.Idempotent, + false, ref cnt_)); + if(observer_ != null) + { + observer_.retried(); // Invocation is being retried + } + } + catch(Ice.Exception ex) + { + if(observer_ != null) + { + observer_.failed(ex.ice_name()); + } + throw ex; + } + } + } + public class BatchOutgoingAsync : OutgoingAsyncBase, OutgoingAsyncMessageCallback, TimerTask { public BatchOutgoingAsync(Ice.Communicator communicator, Instance instance, string operation, object cookie) : @@ -1647,12 +1744,12 @@ namespace IceInternal cachedConnection_ = connection; return connection.flushAsyncBatchRequests(this, out sentCallback); } - + public bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback) { return handler.invokeAsyncBatchRequests(this, out sentCallback); } - + virtual public Ice.AsyncCallback sent() { lock(monitor_) @@ -1712,17 +1809,17 @@ namespace IceInternal invokeException(exc); } - public void + public void dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) { BatchOutgoingAsync self = this; - threadPool.dispatch(() => + threadPool.dispatch(() => { self.finished(ex); }, connection); } - public void + public void runTimerTask() { runTimerTask__(); @@ -1954,7 +2051,7 @@ namespace IceInternal _outAsync.check(false); } - override public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, + override public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int sz) { if(_outAsync.observer_ != null) diff --git a/cs/src/Ice/Proxy.cs b/cs/src/Ice/Proxy.cs index 5ca2782f120..f2f7f3ff3db 100644 --- a/cs/src/Ice/Proxy.cs +++ b/cs/src/Ice/Proxy.cs @@ -48,6 +48,12 @@ namespace Ice public delegate void Callback_Object_ice_invoke(bool ret__, byte[] outEncaps); /// <summary> + /// Delegate for a successful <code>ice_getConnection</code> invocation. + /// <param name="ret__">The connection used by the proxy.</param> + /// </summary> + public delegate void Callback_Object_ice_getConnection(Connection ret__); + + /// <summary> /// Callback object for Blobject AMI invocations. /// </summary> public abstract class AMI_Object_ice_invoke : AMICallbackBase @@ -714,6 +720,27 @@ namespace Ice Connection ice_getConnection(); /// <summary> + /// Asynchronously gets the connection for this proxy. + /// </summary> + /// <returns>An asynchronous result object.</returns> + AsyncResult<Callback_Object_ice_getConnection> begin_ice_getConnection(); + + /// <summary> + /// Asynchronously gets the connection for this proxy. + /// </summary> + /// <param name="cb__">A callback to be invoked when the invocation completes.</param> + /// <param name="cookie__">Application-specific data to be stored in the result.</param> + /// <returns>An asynchronous result object.</returns> + AsyncResult begin_ice_getConnection(AsyncCallback cb__, object cookie__); + + /// <summary> + /// Asynchronously gets the connection for this proxy. + /// </summary> + /// <param name="r__">The asynchronous result object returned by <code>begin_ice_getConnection</code>.</param> + /// <returns>The connection.</returns> + Connection end_ice_getConnection(AsyncResult r__); + + /// <summary> /// Returns the cached Connection for this proxy. If the proxy does not yet have an established /// connection, it does not attempt to create a connection. /// </summary> @@ -887,7 +914,7 @@ namespace Ice } protected IceInternal.TwowayOutgoingAsync<T> - getTwowayOutgoingAsync<T>(string operation, IceInternal.ProxyTwowayCallback<T> cb, + getTwowayOutgoingAsync<T>(string operation, IceInternal.ProxyTwowayCallback<T> cb, object cookie) { bool haveEntry = false; IceInternal.BasicStream iss = null; @@ -918,7 +945,7 @@ namespace Ice } protected IceInternal.OnewayOutgoingAsync<T> - getOnewayOutgoingAsync<T>(string operation, IceInternal.ProxyOnewayCallback<T> cb, + getOnewayOutgoingAsync<T>(string operation, IceInternal.ProxyOnewayCallback<T> cb, object cookie) { bool haveEntry = false; IceInternal.BasicStream iss = null; @@ -1275,7 +1302,7 @@ namespace Ice outAsync__.cacheMessageBuffers(); } } - } + } private AsyncResult<Callback_Object_ice_id> begin_ice_id(Dictionary<string, string> context__, bool explicitCtx__, @@ -2199,6 +2226,70 @@ namespace Ice } } + public AsyncResult<Callback_Object_ice_getConnection> begin_ice_getConnection() + { + return begin_ice_getConnectionInternal(null, null); + } + + internal const string __ice_getConnection_name = "ice_getConnection"; + + public AsyncResult begin_ice_getConnection(Ice.AsyncCallback cb__, object cookie__) + { + return begin_ice_getConnectionInternal(cb__, cookie__); + } + + public Connection end_ice_getConnection(Ice.AsyncResult r__) + { + IceInternal.GetConnectionOutgoingAsync outAsync__ = (IceInternal.GetConnectionOutgoingAsync)r__; + IceInternal.GetConnectionOutgoingAsync.check(outAsync__, this, __ice_getConnection_name); + outAsync__.wait(); + return ice_getCachedConnection(); + } + + private AsyncResult<Callback_Object_ice_getConnection> begin_ice_getConnectionInternal(Ice.AsyncCallback cb__, + object cookie__) + { + IceInternal.GetConnectionOutgoingAsync result__ = + new IceInternal.GetConnectionOutgoingAsync(this, __ice_getConnection_name, + ice_getConnection_completed__, cookie__); + if(cb__ != null) + { + result__.whenCompletedWithAsyncCallback(cb__); + } + try + { + result__.invoke(); + } + catch(Ice.Exception ex__) + { + result__.invokeExceptionAsync(ex__); + } + return result__; + } + + private void ice_getConnection_completed__(AsyncResult r__, + Callback_Object_ice_getConnection cb__, + Ice.ExceptionCallback excb__) + { + Connection ret__; + try + { + ret__ = end_ice_getConnection(r__); + } + catch(Ice.Exception ex__) + { + if(excb__ != null) + { + excb__(ex__); + } + return; + } + if(cb__ != null) + { + cb__(ret__); + } + } + /// <summary> /// Returns the cached Connection for this proxy. If the proxy does not yet have an established /// connection, it does not attempt to create a connection. @@ -2516,7 +2607,7 @@ namespace Ice } } - + return (new IceInternal.ConnectRequestHandler(_reference, this)).connect(); } diff --git a/cs/test/Ice/ami/AllTests.cs b/cs/test/Ice/ami/AllTests.cs index 1394001697a..543d8c6108e 100644 --- a/cs/test/Ice/ami/AllTests.cs +++ b/cs/test/Ice/ami/AllTests.cs @@ -114,6 +114,14 @@ public class AllTests : TestCommon.TestApp } public void + connection(Ice.AsyncResult result) + { + test(result.AsyncState == _cookie); + test(result.getProxy().end_ice_getConnection(result) != null); + called(); + } + + public void op(Ice.AsyncResult result) { test(result.AsyncState == _cookie); @@ -225,6 +233,25 @@ public class AllTests : TestCommon.TestApp } public void + connectionEx(Ice.AsyncResult result) + { + test(result.AsyncState == _cookie); + try + { + result.getProxy().end_ice_getConnection(result); + test(false); + } + catch(Ice.NoEndpointException) + { + called(); + } + catch(Ice.Exception) + { + test(false); + } + } + + public void opEx(Ice.AsyncResult result) { test(result.AsyncState == _cookie); @@ -280,6 +307,13 @@ public class AllTests : TestCommon.TestApp } public void + connection(Ice.Connection conn) + { + test(conn != null); + called(); + } + + public void op() { called(); @@ -342,6 +376,12 @@ public class AllTests : TestCommon.TestApp } public void + connection(Ice.Connection conn) + { + test(false); + } + + public void op() { test(false); @@ -622,7 +662,7 @@ public class AllTests : TestCommon.TestApp override public void run(Ice.Communicator communicator) #else - public static void allTests(Ice.Communicator communicator) + public static void allTests(Ice.Communicator communicator, bool collocated) #endif { string sref = "test:default -p 12010"; @@ -663,6 +703,12 @@ public class AllTests : TestCommon.TestApp result = p.begin_ice_ids(ctx); test(p.end_ice_ids(result).Length == 2); + if(!collocated) + { + result = p.begin_ice_getConnection(); + test(p.end_ice_getConnection(result) != null); + } + result = p.begin_op(); p.end_op(result); result = p.begin_op(ctx); @@ -738,6 +784,14 @@ public class AllTests : TestCommon.TestApp p.begin_ice_ids(ctx, cbWC.ids, cookie); cbWC.check(); + if(!collocated) + { + p.begin_ice_getConnection(cb.connection, null); + cb.check(); + p.begin_ice_getConnection(cbWC.connection, cookie); + cbWC.check(); + } + p.begin_op(cb.op, null); cb.check(); p.begin_op(cbWC.op, cookie); @@ -793,6 +847,12 @@ public class AllTests : TestCommon.TestApp p.begin_ice_ids(ctx).whenCompleted(cb.ids, null); cb.check(); + if(!collocated) + { + p.begin_ice_getConnection().whenCompleted(cb.connection, null); + cb.check(); + } + p.begin_op().whenCompleted(cb.op, null); cb.check(); p.begin_op(ctx).whenCompleted(cb.op, null); @@ -868,6 +928,16 @@ public class AllTests : TestCommon.TestApp }, null); cb.check(); + if(!collocated) + { + p.begin_ice_getConnection().whenCompleted( + (Ice.Connection conn) => + { + cb.connection(conn); + }, null); + cb.check(); + } + p.begin_op().whenCompleted( () => { @@ -993,6 +1063,14 @@ public class AllTests : TestCommon.TestApp i.begin_ice_ids(cbWC.idsEx, cookie); cbWC.check(); + if(!collocated) + { + i.begin_ice_getConnection(cb.connectionEx, null); + cb.check(); + i.begin_ice_getConnection(cbWC.connectionEx, cookie); + cbWC.check(); + } + i.begin_op(cb.opEx, null); cb.check(); i.begin_op(cbWC.opEx, cookie); @@ -1018,6 +1096,12 @@ public class AllTests : TestCommon.TestApp i.begin_ice_ids().whenCompleted(cb.ids, cb.ex); cb.check(); + if(!collocated) + { + i.begin_ice_getConnection().whenCompleted(cb.connection, cb.ex); + cb.check(); + } + i.begin_op().whenCompleted(cb.op, cb.ex); cb.check(); } @@ -1030,7 +1114,7 @@ public class AllTests : TestCommon.TestApp ExceptionCallback cb = new ExceptionCallback(); i.begin_ice_isA("::Test::TestIntf").whenCompleted( - (bool r) => + (bool r) => { cb.isA(r); }, @@ -1041,7 +1125,7 @@ public class AllTests : TestCommon.TestApp cb.check(); i.begin_ice_ping().whenCompleted( - () => + () => { cb.ping(); }, @@ -1052,7 +1136,7 @@ public class AllTests : TestCommon.TestApp cb.check(); i.begin_ice_id().whenCompleted( - (string id) => + (string id) => { cb.id(id); }, @@ -1063,7 +1147,7 @@ public class AllTests : TestCommon.TestApp cb.check(); i.begin_ice_ids().whenCompleted( - (string[] ids) => + (string[] ids) => { cb.ids(ids); }, @@ -1073,8 +1157,22 @@ public class AllTests : TestCommon.TestApp }); cb.check(); + if(!collocated) + { + i.begin_ice_getConnection().whenCompleted( + (Ice.Connection conn) => + { + cb.connection(conn); + }, + (Ice.Exception ex) => + { + cb.ex(ex); + }); + cb.check(); + } + i.begin_op().whenCompleted( - () => + () => { cb.op(); }, @@ -1239,7 +1337,7 @@ public class AllTests : TestCommon.TestApp { cb.isA(r); }, - (Ice.Exception ex) => + (Ice.Exception ex) => { cb.ex(ex); } @@ -1255,7 +1353,7 @@ public class AllTests : TestCommon.TestApp { cb.ping(); }, - (Ice.Exception ex) => + (Ice.Exception ex) => { cb.ex(ex); } @@ -1271,7 +1369,7 @@ public class AllTests : TestCommon.TestApp { cb.id(id); }, - (Ice.Exception ex) => + (Ice.Exception ex) => { cb.ex(ex); } @@ -1287,7 +1385,7 @@ public class AllTests : TestCommon.TestApp { cb.ids(ids); }, - (Ice.Exception ex) => + (Ice.Exception ex) => { cb.ex(ex); } @@ -1303,7 +1401,7 @@ public class AllTests : TestCommon.TestApp { cb.op(); }, - (Ice.Exception ex) => + (Ice.Exception ex) => { cb.ex(ex); } @@ -1446,14 +1544,14 @@ public class AllTests : TestCommon.TestApp Thrower cb = new Thrower(throwEx[i]); p.begin_op().whenCompleted( - () => + () => { cb.op(); }, null); cb.check(); q.begin_op().whenCompleted( - () => + () => { cb.op(); }, @@ -1464,7 +1562,7 @@ public class AllTests : TestCommon.TestApp cb.check(); p.begin_op().whenCompleted( - () => + () => { cb.noOp(); }, @@ -1587,7 +1685,7 @@ public class AllTests : TestCommon.TestApp b1.opBatch(); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = b1.begin_ice_flushBatchRequests( - (Ice.AsyncResult result) => + (Ice.AsyncResult result) => { cb.completedAsync(result); }, cookie); @@ -1613,7 +1711,7 @@ public class AllTests : TestCommon.TestApp b1.ice_getConnection().close(false); FlushExCallback cb = new FlushExCallback(cookie); Ice.AsyncResult r = b1.begin_ice_flushBatchRequests( - (Ice.AsyncResult result) => + (Ice.AsyncResult result) => { cb.completedAsync(result); }, cookie); @@ -1780,7 +1878,7 @@ public class AllTests : TestCommon.TestApp b1.opBatch(); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests( - (Ice.AsyncResult result) => + (Ice.AsyncResult result) => { cb.completedAsync(result); }, cookie); @@ -1805,7 +1903,7 @@ public class AllTests : TestCommon.TestApp b1.ice_getConnection().close(false); FlushExCallback cb = new FlushExCallback(cookie); Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests( - (Ice.AsyncResult result) => + (Ice.AsyncResult result) => { cb.completedAsync(result); }, cookie); @@ -2103,7 +2201,7 @@ public class AllTests : TestCommon.TestApp b1.opBatch(); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = communicator.begin_flushBatchRequests( - (Ice.AsyncResult result) => + (Ice.AsyncResult result) => { cb.completedAsync(result); }, cookie); @@ -2128,7 +2226,7 @@ public class AllTests : TestCommon.TestApp b1.ice_getConnection().close(false); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = communicator.begin_flushBatchRequests( - (Ice.AsyncResult result) => + (Ice.AsyncResult result) => { cb.completedAsync(result); }, cookie); @@ -2157,7 +2255,7 @@ public class AllTests : TestCommon.TestApp b2.opBatch(); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = communicator.begin_flushBatchRequests( - (Ice.AsyncResult result) => + (Ice.AsyncResult result) => { cb.completedAsync(result); }, cookie); @@ -2188,7 +2286,7 @@ public class AllTests : TestCommon.TestApp b1.ice_getConnection().close(false); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = communicator.begin_flushBatchRequests( - (Ice.AsyncResult result) => + (Ice.AsyncResult result) => { cb.completedAsync(result); }, cookie); @@ -2219,7 +2317,7 @@ public class AllTests : TestCommon.TestApp b2.ice_getConnection().close(false); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = communicator.begin_flushBatchRequests( - (Ice.AsyncResult result) => + (Ice.AsyncResult result) => { cb.completedAsync(result); }, cookie); @@ -2382,7 +2480,7 @@ public class AllTests : TestCommon.TestApp } WriteLine("ok"); } - + Write("testing AsyncResult operations... "); Flush(); { @@ -2401,7 +2499,7 @@ public class AllTests : TestCommon.TestApp { test(r1.sentSynchronously() && r1.isSent() && !r1.isCompleted_() || !r1.sentSynchronously() && !r1.isCompleted_()); - + test(!r2.sentSynchronously() && !r2.isCompleted_()); test(!r1.IsCompleted && !r1.CompletedSynchronously); @@ -2509,13 +2607,13 @@ public class AllTests : TestCommon.TestApp // // Send multiple opWithPayload, followed by a close and followed by multiple opWithPaylod. - // The goal is to make sure that none of the opWithPayload fail even if the server closes + // The goal is to make sure that none of the opWithPayload fail even if the server closes // the connection gracefully in between. - // + // int maxQueue = 2; bool done = false; while(!done && maxQueue < 50) - { + { done = true; p.ice_ping(); List<Ice.AsyncResult> results = new List<Ice.AsyncResult>(); diff --git a/cs/test/Ice/ami/Client.cs b/cs/test/Ice/ami/Client.cs index af0abc47d58..dc61a6b956b 100644 --- a/cs/test/Ice/ami/Client.cs +++ b/cs/test/Ice/ami/Client.cs @@ -21,7 +21,7 @@ public class Client { private static int run(string[] args, Ice.Communicator communicator) { - AllTests.allTests(communicator); + AllTests.allTests(communicator, false); return 0; } diff --git a/cs/test/Ice/ami/Collocated.cs b/cs/test/Ice/ami/Collocated.cs index 0a0190e70a2..199c0493170 100644 --- a/cs/test/Ice/ami/Collocated.cs +++ b/cs/test/Ice/ami/Collocated.cs @@ -33,7 +33,7 @@ public class Collocated adapter2.add(new TestControllerI(adapter), communicator.stringToIdentity("testController")); adapter2.activate(); - AllTests.allTests(communicator); + AllTests.allTests(communicator, true); return 0; } diff --git a/java/src/Ice/Callback_Object_ice_getConnection.java b/java/src/Ice/Callback_Object_ice_getConnection.java new file mode 100644 index 00000000000..14b5de10939 --- /dev/null +++ b/java/src/Ice/Callback_Object_ice_getConnection.java @@ -0,0 +1,31 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package Ice; + +/** + * Callback object for {@link ObjectPrx#.begin_ice_getConnection}. + **/ +public abstract class Callback_Object_ice_getConnection extends IceInternal.TwowayCallback + implements Ice.TwowayCallbackArg1<Ice.Connection> +{ + /** + * Called when the invocation completes successfully. + * + * @param __ret The connection being used by the proxy. + **/ + @Override + public abstract void response(Ice.Connection __ret); + + @Override + public final void __completed(AsyncResult __result) + { + ObjectPrxHelperBase.__ice_getConnection_completed(this, __result); + } +} diff --git a/java/src/Ice/ObjectPrx.java b/java/src/Ice/ObjectPrx.java index eddfec7e8f7..b3af5ef6eb8 100644 --- a/java/src/Ice/ObjectPrx.java +++ b/java/src/Ice/ObjectPrx.java @@ -1136,6 +1136,46 @@ public interface ObjectPrx Connection ice_getConnection(); /** + * Asynchronously gets the connection for this proxy. The call does not block. + * + * @return The asynchronous result object. + **/ + AsyncResult begin_ice_getConnection(); + + /** + * Asynchronously gets the connection for this proxy. The call does not block. + * + * @return The asynchronous result object. + **/ + AsyncResult begin_ice_getConnection(Callback __cb); + + /** + * Asynchronously gets the connection for this proxy. The call does not block. + * + * @param cb The callback object to notify the application when the operation is complete. + * @return The asynchronous result object. + **/ + AsyncResult begin_ice_getConnection(Callback_Object_ice_getConnection __cb); + + /** + * Asynchronously gets the connection for this proxy. The call does not block. + * + * @param __responseCb The callback object to notify the application when the there is a response available. + * @param __exceptionCb The callback object to notify the application when the there is an exception getting + * connection. + * @return The asynchronous result object. + **/ + AsyncResult begin_ice_getConnection(IceInternal.Functional_GenericCallback1<Ice.Connection> __responseCb, + IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb); + + /** + * Completes the asynchronous get connection. + * + * @param __result The asynchronous result. + **/ + Ice.Connection end_ice_getConnection(AsyncResult __result); + + /** * Returns the cached {@link Connection} for this proxy. If the proxy does not yet have an established * connection, it does not attempt to create a connection. * diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index edda748ea7a..72d3d99f150 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -2392,6 +2392,129 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } /** + * Asynchronously gets the connection for this proxy. The call does not block. + * + * @return The asynchronous result object. + **/ + @Override + public AsyncResult + begin_ice_getConnection() + { + AsyncResult result = begin_ice_getConnectionInternal(null); + return result; + } + + /** + * Asynchronously gets the connection for this proxy. The call does not block. + * + * @param __cb The callback object to notify the application when the flush is complete. + * @return The asynchronous result object. + **/ + @Override + public AsyncResult + begin_ice_getConnection(Callback __cb) + { + AsyncResult result = begin_ice_getConnectionInternal(__cb); + return result; + } + + /** + * Asynchronously gets the connection for this proxy. The call does not block. + * + * @param __cb The callback object to notify the application when the flush is complete. + * @return The asynchronous result object. + **/ + @Override + public AsyncResult + begin_ice_getConnection(Callback_Object_ice_getConnection __cb) + { + AsyncResult result = begin_ice_getConnectionInternal(__cb); + return result; + } + + private class FunctionalCallback_Object_ice_getConnection + extends IceInternal.Functional_TwowayCallbackArg1<Ice.Connection> + { + FunctionalCallback_Object_ice_getConnection( + IceInternal.Functional_GenericCallback1<Ice.Connection> __responseCb, + IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb) + { + super(__responseCb, __exceptionCb, null); + } + + @Override + public final void __completed(AsyncResult __result) + { + ObjectPrxHelperBase.__ice_getConnection_completed(this, __result); + } + } + + /** + * Asynchronously gets the connection for this proxy. The call does not block. + * + * @param __responseCb The callback object to notify the application when the there is a response available. + * @param __exceptionCb The callback object to notify the application when the there is an exception getting + * connection. + * @return The asynchronous result object. + **/ + @Override + public AsyncResult + begin_ice_getConnection(IceInternal.Functional_GenericCallback1<Ice.Connection> __responseCb, + IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb) + { + return begin_ice_getConnectionInternal( + new FunctionalCallback_Object_ice_getConnection(__responseCb, __exceptionCb)); + } + + private static final String __ice_getConnection_name = "ice_getConnection"; + + private AsyncResult + begin_ice_getConnectionInternal(IceInternal.CallbackBase __cb) + { + IceInternal.GetConnectionOutgoingAsync __result = + new IceInternal.GetConnectionOutgoingAsync(this, __ice_getConnection_name, __cb); + try + { + __result.__invoke(); + } + catch(Exception __ex) + { + __result.invokeExceptionAsync(__ex); + } + return __result; + } + + @Override + public Ice.Connection + end_ice_getConnection(AsyncResult __iresult) + { + IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult; + IceInternal.AsyncResultI.check(__result, this, __ice_getConnection_name); + __result.__wait(); + return ice_getCachedConnection(); + } + + static public final void __ice_getConnection_completed(TwowayCallbackArg1<Ice.Connection> __cb, AsyncResult __result) + { + Ice.Connection __ret = null; + try + { + __ret = __result.getProxy().end_ice_getConnection(__result); + } + catch(LocalException __ex) + { + __cb.exception(__ex); + return; + } + catch(SystemException __ex) + { + __cb.exception(__ex); + return; + } + __cb.response(__ret); + } + + /** * Returns the cached {@link Connection} for this proxy. If the proxy does not yet have an established * connection, it does not attempt to create a connection. * @@ -2531,7 +2654,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public void end_ice_flushBatchRequests(AsyncResult __iresult) { - IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult; + IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult; IceInternal.AsyncResultI.check(__result, this, __ice_flushBatchRequests_name); __result.__wait(); } @@ -2767,7 +2890,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } } } - + public void cacheMessageBuffers(IceInternal.BasicStream is, IceInternal.BasicStream os) { diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 25d9859b997..60881c35c6d 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -201,7 +201,7 @@ public class ConnectRequestHandler return _connection; } } - + @Override synchronized public ConnectionI waitForConnection() diff --git a/java/src/IceInternal/GetConnectionOutgoingAsync.java b/java/src/IceInternal/GetConnectionOutgoingAsync.java new file mode 100644 index 00000000000..95878b90709 --- /dev/null +++ b/java/src/IceInternal/GetConnectionOutgoingAsync.java @@ -0,0 +1,108 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class GetConnectionOutgoingAsync extends IceInternal.OutgoingAsync +{ + public GetConnectionOutgoingAsync(Ice.ObjectPrxHelperBase prx, String operation, CallbackBase callback) + { + super(prx, operation, callback); + _observer = ObserverHelper.get(prx, operation); + } + + public void __invoke() + { + while(true) + { + try + { + _handler = _proxy.__getRequestHandler(); + _handler.sendAsyncRequest(this); + } + catch(RetryException ex) + { + _proxy.__setRequestHandler(_handler, null); + } + catch(Ice.Exception ex) + { + handleException(ex); + } + break; + } + } + + @Override + public int send(Ice.ConnectionI conection, boolean compress, boolean response) + throws RetryException + { + sent(); + return 0; + } + + @Override + public int invokeCollocated(CollocatedRequestHandler handler) + { + sent(); + return 0; + } + + @Override + public boolean sent() + { + synchronized(_monitor) + { + _state |= StateDone; + _monitor.notifyAll(); + } + invokeCompleted(); + return false; + } + + @Override + public void invokeSent() + { + // No sent callback + } + + @Override + public void finished(Ice.Exception exc) + { + try + { + handleException(exc); + } + catch(Ice.Exception ex) + { + invokeExceptionAsync(ex); + } + } + + private void handleException(Ice.Exception exc) + { + try + { + Ice.Holder<Integer> interval = new Ice.Holder<Integer>(); + _cnt = _proxy.__handleException(exc, _handler, Ice.OperationMode.Idempotent, false, interval, _cnt); + if(_observer != null) + { + _observer.retried(); // Invocation is being retried + } + _instance.retryQueue().add(this, interval.value); + } + catch(Ice.Exception ex) + { + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + throw ex; + } + } +} diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 84bfc477768..992b9ca6265 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -17,7 +17,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _proxy = (Ice.ObjectPrxHelperBase) prx; _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding()); } - + public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, IceInternal.BasicStream is, IceInternal.BasicStream os) { @@ -26,8 +26,8 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _proxy = (Ice.ObjectPrxHelperBase) prx; _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding()); } - - public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx, + + public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx, boolean explicitCtx, boolean synchronous) { _handler = null; @@ -200,7 +200,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _monitor.notifyAll(); // Don't call the sent call is already sent. - return !alreadySent && _callback != null && _callback.__hasSentCallback(); + return !alreadySent && _callback != null && _callback.__hasSentCallback(); } } @@ -264,7 +264,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC // with the connection locked. Therefore, it must not invoke // any user callbacks. // - + assert (_proxy.ice_isTwoway()); // Can only be called for twoways. byte replyStatus; @@ -279,14 +279,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _childObserver.detach(); _childObserver = null; } - + if(_timeoutRequestHandler != null) { _future.cancel(false); _future = null; _timeoutRequestHandler = null; } - + // _is can already be initialized if the invocation is retried if(_is == null) { @@ -294,7 +294,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } _is.swap(is); replyStatus = _is.readByte(); - + switch(replyStatus) { case ReplyStatus.replyOK: @@ -422,7 +422,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } _state |= StateDone; _monitor.notifyAll(); - + if(_callback == null) { if(_observer != null) @@ -579,7 +579,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _os.writeEncaps(encaps); } } - + public void cacheMessageBuffers() { if(_proxy.__reference().getInstance().cacheMessageBuffers() > 0) @@ -597,11 +597,11 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _is.reset(); } _os.reset(); - + _proxy.cacheMessageBuffers(_is, _os); } } - + @Override public void invokeExceptionAsync(final Ice.Exception ex) { @@ -621,7 +621,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC super.invokeExceptionAsync(ex); } - + private void handleException(Ice.Exception exc) { try @@ -632,7 +632,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC { _observer.retried(); // Invocation is being retried. } - + // // Schedule the retry. Note that we always schedule the retry // on the retry queue even if the invocation can be retried @@ -654,10 +654,10 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } protected Ice.ObjectPrxHelperBase _proxy; + protected RequestHandler _handler; + protected int _cnt; - private RequestHandler _handler; private Ice.EncodingVersion _encoding; - private int _cnt; private Ice.OperationMode _mode; private boolean _sent; // diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java index f9d90e3e72e..6709c2f7ab3 100644 --- a/java/src/IceInternal/QueueRequestHandler.java +++ b/java/src/IceInternal/QueueRequestHandler.java @@ -21,7 +21,7 @@ import Ice.ConnectionI; public class QueueRequestHandler implements RequestHandler { public - QueueRequestHandler(Instance instance, RequestHandler delegate) + QueueRequestHandler(Instance instance, RequestHandler delegate) { _executor = instance.getQueueExecutor(); assert(delegate != null); @@ -34,7 +34,7 @@ public class QueueRequestHandler implements RequestHandler { try { - Future<Void> future = _executor.submit(new Callable<Void>() + Future<Void> future = _executor.submit(new Callable<Void>() { @Override public Void call() throws RetryException @@ -81,7 +81,7 @@ public class QueueRequestHandler implements RequestHandler { try { - Future<Void> future = _executor.submit(new Callable<Void>() + Future<Void> future = _executor.submit(new Callable<Void>() { @Override public Void call() diff --git a/java/test/Ice/ami/AMI.java b/java/test/Ice/ami/AMI.java index 67c6920d142..aa3f9c9b328 100644 --- a/java/test/Ice/ami/AMI.java +++ b/java/test/Ice/ami/AMI.java @@ -100,6 +100,13 @@ public class AMI } public void + connection(Ice.AsyncResult result) + { + test(result.getProxy().end_ice_getConnection(result) != null); + called(); + } + + public void op(Ice.AsyncResult result) { TestIntfPrxHelper.uncheckedCast(result.getProxy()).end_op(result); @@ -204,6 +211,24 @@ public class AMI } public void + connectionEx(Ice.AsyncResult result) + { + try + { + result.getProxy().end_ice_getConnection(result); + test(false); + } + catch(Ice.NoEndpointException ex) + { + called(); + } + catch(Ice.LocalException ex) + { + test(false); + } + } + + public void opEx(Ice.AsyncResult result) { try @@ -256,6 +281,13 @@ public class AMI } public void + connection(Ice.Connection conn) + { + test(conn != null); + called(); + } + + public void op() { called(); @@ -317,6 +349,12 @@ public class AMI } public void + connection(Ice.Connection conn) + { + test(false); + } + + public void op() { test(false); @@ -548,11 +586,12 @@ public class AMI } public static void - run(Application app, Ice.Communicator communicator, TestIntfPrx p, TestIntfControllerPrx testController) + run(Application app, Ice.Communicator communicator, boolean collocated, TestIntfPrx p, + TestIntfControllerPrx testController) { - + PrintWriter out = app.getWriter(); - + out.print("testing begin/end invocation... "); out.flush(); { @@ -579,6 +618,12 @@ public class AMI result = p.begin_ice_ids(ctx); test(p.end_ice_ids(result).length == 2); + if(!collocated) + { + result = p.begin_ice_getConnection(); + test(p.end_ice_getConnection(result) != null); + } + result = p.begin_op(); p.end_op(result); result = p.begin_op(ctx); @@ -701,6 +746,20 @@ public class AMI }); cb.check(); + if(!collocated) + { + p.begin_ice_getConnection(new Ice.Callback() + { + @Override + public void + completed(Ice.AsyncResult r) + { + cb.connection(r); + } + }); + cb.check(); + } + p.begin_op(new Ice.Callback() { @Override @@ -912,6 +971,27 @@ public class AMI }); cb.check(); + if(!collocated) + { + p.begin_ice_getConnection(new Ice.Callback_Object_ice_getConnection() + { + @Override + public void + response(Ice.Connection conn) + { + cb.connection(conn); + } + + @Override + public void + exception(Ice.LocalException ex) + { + test(false); + } + }); + cb.check(); + } + p.begin_op(new Callback_TestIntf_op() { @Override @@ -1131,6 +1211,20 @@ public class AMI }); cb.check(); + if(!collocated) + { + i.begin_ice_getConnection(new Ice.Callback() + { + @Override + public void + completed(Ice.AsyncResult r) + { + cb.connectionEx(r); + } + }); + cb.check(); + } + i.begin_op(new Ice.Callback() { @Override @@ -1222,6 +1316,27 @@ public class AMI }); cb.check(); + if(!collocated) + { + i.begin_ice_getConnection(new Ice.Callback_Object_ice_getConnection() + { + @Override + public void + response(Ice.Connection conn) + { + test(false); + } + + @Override + public void + exception(Ice.LocalException ex) + { + cb.ex(ex); + } + }); + cb.check(); + } + i.begin_op(new Callback_TestIntf_op() { @Override @@ -2198,7 +2313,7 @@ public class AMI { test(r1.sentSynchronously() && r1.isSent() && !r1.isCompleted() || !r1.sentSynchronously() && !r1.isCompleted()); - + test(!r2.sentSynchronously() && !r2.isCompleted()); } } @@ -2298,9 +2413,9 @@ public class AMI // // Send multiple opWithPayload, followed by a close and followed by multiple opWithPaylod. - // The goal is to make sure that none of the opWithPayload fail even if the server closes + // The goal is to make sure that none of the opWithPayload fail even if the server closes // the connection gracefully in between. - // + // int maxQueue = 2; boolean done = false; while(!done && maxQueue < 50) diff --git a/java/test/Ice/ami/AllTests.java b/java/test/Ice/ami/AllTests.java index 8e98a2e7667..ffe95450b2c 100644 --- a/java/test/Ice/ami/AllTests.java +++ b/java/test/Ice/ami/AllTests.java @@ -27,9 +27,9 @@ public class AllTests throw new RuntimeException(); } } - + public static void - allTests(Application app) + allTests(Application app, boolean collocated) { Ice.Communicator communicator = app.communicator(); PrintWriter out = app.getWriter(); @@ -47,22 +47,22 @@ public class AllTests TestIntfControllerPrx testController = TestIntfControllerPrxHelper.uncheckedCast(obj); out.println("testing with new AMI mapping... "); - test.Ice.ami.AMI.run(app, communicator, p, testController); - + test.Ice.ami.AMI.run(app, communicator, collocated, p, testController); + // // Use reflection to load TwowaysLambdaAMI as that is only supported with Java >= 1.8 - // + // try { Class<?> cls = IceInternal.Util.findClass("test.Ice.ami.lambda.AMI", null); if(cls != null) { - java.lang.reflect.Method run = cls.getDeclaredMethod("run", - new Class<?>[]{test.Util.Application.class, Ice.Communicator.class, TestIntfPrx.class, + java.lang.reflect.Method run = cls.getDeclaredMethod("run", + new Class<?>[]{test.Util.Application.class, Ice.Communicator.class, boolean.class, TestIntfPrx.class, TestIntfControllerPrx.class}); out.println("testing with lambda AMI mapping... "); out.flush(); - run.invoke(null, app, communicator, p, testController); + run.invoke(null, app, communicator, collocated, p, testController); } } catch(java.lang.NoSuchMethodException ex) @@ -77,7 +77,7 @@ public class AllTests { throw new RuntimeException(ex); } - + p.shutdown(); } } diff --git a/java/test/Ice/ami/Client.java b/java/test/Ice/ami/Client.java index e95e0e9bfb6..3e547a17ec0 100644 --- a/java/test/Ice/ami/Client.java +++ b/java/test/Ice/ami/Client.java @@ -14,7 +14,7 @@ public class Client extends test.Util.Application @Override public int run(String[] args) { - AllTests.allTests(this); + AllTests.allTests(this, false); return 0; } diff --git a/java/test/Ice/ami/Collocated.java b/java/test/Ice/ami/Collocated.java index 765c8944413..3385facc305 100644 --- a/java/test/Ice/ami/Collocated.java +++ b/java/test/Ice/ami/Collocated.java @@ -22,7 +22,7 @@ public class Collocated extends test.Util.Application adapter2.add(new TestControllerI(adapter), communicator().stringToIdentity("testController")); adapter2.activate(); - AllTests.allTests(this); + AllTests.allTests(this, true); return 0; } diff --git a/java/test/Ice/ami/lambda/AMI.java b/java/test/Ice/ami/lambda/AMI.java index b89a51ae0f5..0e013b97435 100644 --- a/java/test/Ice/ami/lambda/AMI.java +++ b/java/test/Ice/ami/lambda/AMI.java @@ -101,6 +101,13 @@ public class AMI } public void + connection(Ice.AsyncResult result) + { + test(result.getProxy().end_ice_getConnection(result) != null); + called(); + } + + public void op(Ice.AsyncResult result) { TestIntfPrxHelper.uncheckedCast(result.getProxy()).end_op(result); @@ -205,6 +212,24 @@ public class AMI } public void + connectionEx(Ice.AsyncResult result) + { + try + { + result.getProxy().end_ice_getConnection(result); + test(false); + } + catch(Ice.NoEndpointException ex) + { + called(); + } + catch(Ice.Exception ex) + { + test(false); + } + } + + public void opEx(Ice.AsyncResult result) { try @@ -257,6 +282,13 @@ public class AMI } public void + connection(Ice.Connection conn) + { + test(conn != null); + called(); + } + + public void op() { called(); @@ -318,6 +350,12 @@ public class AMI } public void + connection(Ice.Connection conn) + { + test(false); + } + + public void op() { test(false); @@ -549,9 +587,10 @@ public class AMI } public static void - run(Application app, Ice.Communicator communicator, TestIntfPrx p, TestIntfControllerPrx testController) + run(Application app, Ice.Communicator communicator, boolean collocated, TestIntfPrx p, + TestIntfControllerPrx testController) { - + PrintWriter out = app.getWriter(); out.print("testing response callback... "); @@ -560,12 +599,12 @@ public class AMI final ResponseCallback cb = new ResponseCallback(); java.util.Map<String, String> ctx = new java.util.HashMap<String, String>(); - p.begin_ice_isA("::Test::TestIntf", + p.begin_ice_isA("::Test::TestIntf", (boolean r) -> cb.isA(r), (Ice.Exception ex) -> test(false)); cb.check(); - - p.begin_ice_isA("::Test::TestIntf", + + p.begin_ice_isA("::Test::TestIntf", (boolean r) -> cb.isA(r), (Ice.Exception ex) -> test(false)); cb.check(); @@ -574,8 +613,8 @@ public class AMI () -> cb.ping(), (Ice.Exception ex) -> test(false)); cb.check(); - - p.begin_ice_ping(ctx, + + p.begin_ice_ping(ctx, () -> cb.ping(), (Ice.Exception ex) -> test(false)); cb.check(); @@ -584,8 +623,8 @@ public class AMI (String id) -> cb.id(id), (Ice.Exception ex) -> test(false)); cb.check(); - - p.begin_ice_id(ctx, + + p.begin_ice_id(ctx, (String id) -> cb.id(id), (Ice.Exception ex) -> test(false)); cb.check(); @@ -594,18 +633,26 @@ public class AMI (String[] ids) -> cb.ids(ids), (Ice.Exception ex) -> test(false)); cb.check(); - + p.begin_ice_ids(ctx, (String[] ids) -> cb.ids(ids), (Ice.Exception ex) -> test(false)); cb.check(); + if(!collocated) + { + p.begin_ice_getConnection( + (Ice.Connection conn) -> cb.connection(conn), + (Ice.Exception ex) -> test(false)); + cb.check(); + } + p.begin_op( () -> cb.op(), (Ice.Exception ex) -> test(false)); cb.check(); - - p.begin_op(ctx, + + p.begin_op(ctx, () -> cb.op(), (Ice.Exception ex) -> test(false)); cb.check(); @@ -614,8 +661,8 @@ public class AMI (int r) -> cb.opWithResult(r), (Ice.Exception ex) -> test(false)); cb.check(); - - p.begin_opWithResult(ctx, + + p.begin_opWithResult(ctx, (int r) -> cb.opWithResult(r), (Ice.Exception ex) -> test(false)); cb.check(); @@ -625,8 +672,8 @@ public class AMI (Ice.UserException ex) -> cb.opWithUE(ex), (Ice.Exception ex) -> test(false)); cb.check(); - - p.begin_opWithUE(ctx, + + p.begin_opWithUE(ctx, () -> test(false), (Ice.UserException ex) -> cb.opWithUE(ex), (Ice.Exception ex) -> test(false)); @@ -640,7 +687,7 @@ public class AMI TestIntfPrx i = TestIntfPrxHelper.uncheckedCast(p.ice_adapterId("dummy")); final ExceptionCallback cb = new ExceptionCallback(); - i.begin_ice_isA("::Test::TestIntf", + i.begin_ice_isA("::Test::TestIntf", (boolean r) -> test(false), (Ice.Exception ex) -> cb.ex(ex)); cb.check(); @@ -660,6 +707,14 @@ public class AMI (Ice.Exception ex) -> cb.ex(ex)); cb.check(); + if(!collocated) + { + i.begin_ice_getConnection( + (Ice.Connection conn) -> test(false), + (Ice.Exception ex) -> cb.ex(ex)); + cb.check(); + } + i.begin_op( () -> test(false), (Ice.Exception ex) -> cb.ex(ex)); @@ -672,7 +727,7 @@ public class AMI { final SentCallback cb = new SentCallback(); - p.begin_ice_isA("", + p.begin_ice_isA("", (boolean r) -> cb.isA(r), (Ice.Exception ex) -> cb.ex(ex), (boolean ss) -> cb.sent(ss)); @@ -965,7 +1020,7 @@ public class AMI } out.println("ok"); } - + out.print("testing null callbacks..."); try { @@ -978,27 +1033,27 @@ public class AMI { // Excepted when response and exception callback are both null. } - + try { p.begin_ice_ping(() -> {}, null); - + } catch(IllegalArgumentException ex) { test(false); } - + try { p.begin_ice_ping(null, (Ice.Exception ex) -> {}); - + } catch(IllegalArgumentException ex) { test(false); } - + try { IceInternal.Functional_BoolCallback response = null; @@ -1010,27 +1065,27 @@ public class AMI { // Excepted when response and exception callback are both null. } - + try { p.begin_ice_isA("::Test::TestIntf", (boolean v) -> {}, null); - + } catch(IllegalArgumentException ex) { test(false); } - + try { p.begin_ice_isA("::Test::TestIntf", null, (Ice.Exception ex) -> {}); - + } catch(IllegalArgumentException ex) { test(false); } - + try { IceInternal.Functional_VoidCallback response = null; @@ -1043,7 +1098,7 @@ public class AMI // an operation that throws user exceptions both user exception callback // an local exception callback must be present. } - + try { IceInternal.Functional_VoidCallback response = null; @@ -1056,7 +1111,7 @@ public class AMI // an operation that throws user exceptions both user exception callback // an local exception callback must be present. } - + try { IceInternal.Functional_VoidCallback response = null; diff --git a/py/modules/IcePy/Operation.cpp b/py/modules/IcePy/Operation.cpp index 327be1c130c..ac5221029fd 100644 --- a/py/modules/IcePy/Operation.cpp +++ b/py/modules/IcePy/Operation.cpp @@ -16,6 +16,7 @@ #include <Proxy.h> #include <Thread.h> #include <Types.h> +#include <Connection.h> #include <Util.h> #include <Ice/Communicator.h> #include <Ice/IncomingAsync.h> @@ -2549,7 +2550,7 @@ IcePy::SyncBlobjectInvocation::invoke(PyObject* args, PyObject* /* kwds */) memcpy(buf, &out[0], sz); } #endif - + if(PyTuple_SET_ITEM(result.get(), 1, op.get()) < 0) { throwPythonException(); @@ -3618,7 +3619,7 @@ IcePy::BlobjectUpcall::dispatch(PyObject* servant, const pair<const Ice::Byte*, { throwPythonException(); } - + PyObjectHandle ip; #if PY_VERSION_HEX >= 0x03000000 @@ -4010,6 +4011,45 @@ IcePy::FlushCallback::sent(bool sentSynchronously) } } +IcePy::GetConnectionCallback::GetConnectionCallback(const Ice::CommunicatorPtr& communicator, + PyObject* response, PyObject* ex, const string& op) : + _communicator(communicator), _response(response), _ex(ex), _op(op) +{ + assert(_response); + Py_INCREF(_response); + Py_XINCREF(_ex); +} + +IcePy::GetConnectionCallback::~GetConnectionCallback() +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + Py_DECREF(_response); + Py_XDECREF(_ex); +} + +void +IcePy::GetConnectionCallback::response(const Ice::ConnectionPtr& conn) +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + PyObjectHandle pyConn = createConnection(conn, _communicator); + PyObjectHandle args = Py_BuildValue(STRCAST("(O)"), pyConn.get()); + PyObjectHandle tmp = PyObject_Call(_response, args.get(), 0); + if(PyErr_Occurred()) + { + handleException(); // Callback raised an exception. + } +} + +void +IcePy::GetConnectionCallback::exception(const Ice::Exception& ex) +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + callException(_ex, ex); +} + // // ServantWrapper implementation. // diff --git a/py/modules/IcePy/Operation.h b/py/modules/IcePy/Operation.h index 98d20f23463..e9691c63f8f 100644 --- a/py/modules/IcePy/Operation.h +++ b/py/modules/IcePy/Operation.h @@ -14,6 +14,7 @@ #include <Ice/Current.h> #include <Ice/Object.h> #include <Ice/OutgoingAsyncF.h> +#include <Ice/CommunicatorF.h> namespace IcePy { @@ -40,6 +41,28 @@ PyObject* createAsyncResult(const Ice::AsyncResultPtr&, PyObject*, PyObject*, Py Ice::AsyncResultPtr getAsyncResult(PyObject*); // +// Used as the callback for getConnection operation. +// +class GetConnectionCallback : public IceUtil::Shared +{ +public: + + GetConnectionCallback(const Ice::CommunicatorPtr&, PyObject*, PyObject*, const std::string&); + ~GetConnectionCallback(); + + void response(const Ice::ConnectionPtr&); + void exception(const Ice::Exception&); + +protected: + + Ice::CommunicatorPtr _communicator; + PyObject* _response; + PyObject* _ex; + std::string _op; +}; +typedef IceUtil::Handle<GetConnectionCallback> GetConnectionCallbackPtr; + +// // Used as the callback for the various flushBatchRequest operations. // class FlushCallback : public IceUtil::Shared diff --git a/py/modules/IcePy/Proxy.cpp b/py/modules/IcePy/Proxy.cpp index 90cf02923b7..cbdbeb8297a 100644 --- a/py/modules/IcePy/Proxy.cpp +++ b/py/modules/IcePy/Proxy.cpp @@ -1735,6 +1735,114 @@ proxyIceGetConnection(ProxyObject* self) extern "C" #endif static PyObject* +proxyBeginIceGetConnection(ProxyObject* self, PyObject* args, PyObject* kwds) +{ + assert(self->proxy); + + static char* argNames[] = + { + const_cast<char*>("_response"), + const_cast<char*>("_ex"), + 0 + }; + + PyObject* response = Py_None; + PyObject* ex = Py_None; + if(!PyArg_ParseTupleAndKeywords(args, kwds, STRCAST("|OO"), argNames, &response, &ex)) + { + return 0; + } + + if(response == Py_None) + { + response = 0; + } + if(ex == Py_None) + { + ex = 0; + } + + if(!response && ex) + { + PyErr_Format(PyExc_RuntimeError, + STRCAST("response callback must also be provided when exception callback is used")); + return 0; + } + + Ice::Callback_Object_ice_getConnectionPtr cb; + if(response || ex) + { + GetConnectionCallbackPtr d = new GetConnectionCallback(*self->communicator, response, ex, "ice_getConnection"); + cb = Ice::newCallback_Object_ice_getConnection(d, &GetConnectionCallback::response, + &GetConnectionCallback::exception); + } + + Ice::AsyncResultPtr result; + try + { + AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. + + if(cb) + { + result = (*self->proxy)->begin_ice_getConnection(cb); + } + else + { + result = (*self->proxy)->begin_ice_getConnection(); + } + } + catch(const Ice::Exception& ex) + { + setPythonException(ex); + return 0; + } + + PyObjectHandle communicator = getCommunicatorWrapper(*self->communicator); + return createAsyncResult(result, reinterpret_cast<PyObject*>(self), 0, communicator.get()); +} + +#ifdef WIN32 +extern "C" +#endif +static PyObject* +proxyEndIceGetConnection(ProxyObject* self, PyObject* args) +{ + assert(self->proxy); + + PyObject* result; + if(!PyArg_ParseTuple(args, STRCAST("O!"), &AsyncResultType, &result)) + { + return 0; + } + + Ice::AsyncResultPtr r = getAsyncResult(result); + Ice::ConnectionPtr con; + try + { + AllowThreads allowThreads; // Release Python's global interpreter lock during blocking invocations. + con = (*self->proxy)->end_ice_getConnection(r); + } + catch(const Ice::Exception& ex) + { + setPythonException(ex); + return 0; + } + + if(con) + { + return createConnection(con, *self->communicator); + } + else + { + Py_INCREF(Py_None); + return Py_None; + } +} + +#ifdef WIN32 +extern "C" +#endif +static PyObject* proxyIceGetCachedConnection(ProxyObject* self) { assert(self->proxy); @@ -2438,6 +2546,10 @@ static PyMethodDef ProxyMethods[] = PyDoc_STR(STRCAST("ice_connectionId(string) -> Ice.ObjectPrx")) }, { STRCAST("ice_getConnection"), reinterpret_cast<PyCFunction>(proxyIceGetConnection), METH_NOARGS, PyDoc_STR(STRCAST("ice_getConnection() -> Ice.Connection")) }, + { STRCAST("begin_ice_getConnection"), reinterpret_cast<PyCFunction>(proxyBeginIceGetConnection), + METH_VARARGS | METH_KEYWORDS, PyDoc_STR(STRCAST("begin_ice_getConnection([_response][, _ex]) -> Ice.AsyncResult")) }, + { STRCAST("end_ice_getConnection"), reinterpret_cast<PyCFunction>(proxyEndIceGetConnection), METH_VARARGS, + PyDoc_STR(STRCAST("end_ice_getConnection(Ice.AsyncResult) -> Ice.Connection")) }, { STRCAST("ice_getCachedConnection"), reinterpret_cast<PyCFunction>(proxyIceGetCachedConnection), METH_NOARGS, PyDoc_STR(STRCAST("ice_getCachedConnection() -> Ice.Connection")) }, { STRCAST("ice_flushBatchRequests"), reinterpret_cast<PyCFunction>(proxyIceFlushBatchRequests), METH_NOARGS, diff --git a/py/test/Ice/ami/AllTests.py b/py/test/Ice/ami/AllTests.py index 677ebeed9a2..6442f0df450 100644 --- a/py/test/Ice/ami/AllTests.py +++ b/py/test/Ice/ami/AllTests.py @@ -52,6 +52,10 @@ class ResponseCallback(CallbackBase): test(len(ids) == 2) self.called() + def connection(self, conn): + test(conn != None) + self.called() + def op(self): self.called() @@ -94,6 +98,11 @@ class ResponseCallbackWC(CallbackBase): test(len(ids) == 2) self.called() + def connection(self, conn, cookie): + test(cookie == self._cookie) + test(conn != None) + self.called() + def op(self, cookie): test(cookie == self._cookie) self.called() @@ -324,6 +333,9 @@ def allTests(communicator): result = p.begin_ice_ids(_ctx=ctx) test(len(p.end_ice_ids(result)) == 2) + result = p.begin_ice_getConnection() + test(p.end_ice_getConnection(result) != None) + result = p.begin_op() p.end_op(result) result = p.begin_op(_ctx=ctx) @@ -394,6 +406,11 @@ def allTests(communicator): p.begin_ice_ids(lambda ids: cbWC.ids(ids, cookie), lambda ex: cbWC.ex(ex, cookie), _ctx=ctx) cbWC.check() + p.begin_ice_getConnection(cb.connection, cb.ex) + cb.check() + p.begin_ice_getConnection(lambda conn: cbWC.connection(conn, cookie), lambda ex: cbWC.ex(ex, cookie)) + cbWC.check() + p.begin_op(cb.op, cb.ex) cb.check() p.begin_op(lambda: cbWC.op(cookie), lambda ex: cbWC.ex(ex, cookie)) @@ -487,6 +504,11 @@ def allTests(communicator): i.begin_ice_ids(lambda ids: cbWC.response(ids, cookie), lambda ex: cbWC.ex(ex, cookie)) cbWC.check() + i.begin_ice_getConnection(cb.response, cb.ex) + cb.check() + i.begin_ice_getConnection(lambda conn: cbWC.response(conn, cookie), lambda ex: cbWC.ex(ex, cookie)) + cbWC.check() + i.begin_op(cb.response, cb.ex) cb.check() i.begin_op(lambda: cbWC.response(cookie), lambda ex: cbWC.ex(ex, cookie)) |