diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-05-23 11:59:44 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-05-23 11:59:44 +0200 |
commit | d81701ca8182942b7936f9fd84a019b695e9c890 (patch) | |
tree | dc036c9d701fbbe1afad67782bd78572c0f61974 /cpp/src/Ice/ConnectionI.h | |
parent | Fixed bug ICE-5543: stringToIdentity bug with escaped escapes (diff) | |
download | ice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.bz2 ice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.xz ice-d81701ca8182942b7936f9fd84a019b695e9c890.zip |
Added support for invocation timeouts and ACM heartbeats
Diffstat (limited to 'cpp/src/Ice/ConnectionI.h')
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 141 |
1 files changed, 66 insertions, 75 deletions
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 5004046cc33..4a29e8d62b0 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -33,6 +33,7 @@ #include <Ice/Dispatcher.h> #include <Ice/ObserverHelper.h> #include <Ice/ConnectionAsync.h> +#include <Ice/ACM.h> #include <deque> #include <IceUtil/UniquePtr.h> @@ -44,19 +45,6 @@ class Outgoing; class BatchOutgoing; class OutgoingMessageCallback; -class ConnectionReaper : public IceUtil::Mutex, public IceUtil::Shared -{ -public: - - void add(const Ice::ConnectionIPtr&); - void swapConnections(std::vector<Ice::ConnectionIPtr>&); - -private: - - std::vector<Ice::ConnectionIPtr> _connections; -}; -typedef IceUtil::Handle<ConnectionReaper> ConnectionReaperPtr; - } namespace Ice @@ -87,35 +75,60 @@ class ICE_API ConnectionI : public Connection, public IceInternal::EventHandler, public: - class StartCallback : virtual public IceUtil::Shared - { - public: - - virtual void connectionStartCompleted(const ConnectionIPtr&) = 0; - virtual void connectionStartFailed(const ConnectionIPtr&, const Ice::LocalException&) = 0; - }; - typedef IceUtil::Handle<StartCallback> StartCallbackPtr; - - struct SentCallback + struct OutgoingMessage { + OutgoingMessage(IceInternal::BasicStream* str, bool comp) : + stream(str), out(0), compress(comp), requestId(0), adopted(false), isSent(false) #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - SentCallback(const IceInternal::OutgoingAsyncMessageCallbackPtr& outAsync, - const IceInternal::OutgoingAsyncPtr& replyOutAsync) : - outAsync(outAsync), replyOutAsync(replyOutAsync) + , invokeSentCallback(false), receivedReply(false) +#endif { } -#else - SentCallback(const IceInternal::OutgoingAsyncMessageCallbackPtr& outAsync) : outAsync(outAsync) + + OutgoingMessage(IceInternal::OutgoingMessageCallback* o, IceInternal::BasicStream* str, bool comp, int rid) : + stream(str), out(o), compress(comp), requestId(rid), adopted(false), isSent(false) +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + , invokeSentCallback(false), receivedReply(false) +#endif { } + + OutgoingMessage(const IceInternal::OutgoingAsyncMessageCallbackPtr& o, IceInternal::BasicStream* str, + bool comp, int rid) : + stream(str), out(0), outAsync(o), compress(comp), requestId(rid), adopted(false), isSent(false) +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + , invokeSentCallback(false), receivedReply(false) #endif + { + } + + void adopt(IceInternal::BasicStream*); + void timedOut(); + bool sent(); + void finished(const Ice::LocalException&); + IceInternal::BasicStream* stream; + IceInternal::OutgoingMessageCallback* out; IceInternal::OutgoingAsyncMessageCallbackPtr outAsync; + bool compress; + int requestId; + bool adopted; + bool isSent; #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - IceInternal::OutgoingAsyncPtr replyOutAsync; + bool invokeSentCallback; + bool receivedReply; #endif }; + class StartCallback : virtual public IceUtil::Shared + { + public: + + virtual void connectionStartCompleted(const ConnectionIPtr&) = 0; + virtual void connectionStartFailed(const ConnectionIPtr&, const Ice::LocalException&) = 0; + }; + typedef IceUtil::Handle<StartCallback> StartCallbackPtr; + enum DestructionReason { ObjectAdapterDeactivated, @@ -138,7 +151,7 @@ public: void updateObserver(); - void monitor(const IceUtil::Time&); + void monitor(const IceUtil::Time&, const IceInternal::ACMConfig&); bool sendRequest(IceInternal::Outgoing*, bool, bool); IceInternal::AsyncStatus sendAsyncRequest(const IceInternal::OutgoingAsyncPtr&, bool, bool); @@ -167,6 +180,15 @@ public: bool flushBatchRequests(IceInternal::BatchOutgoing*); IceInternal::AsyncStatus flushAsyncBatchRequests(const IceInternal::BatchOutgoingAsyncPtr&); + virtual void setCallback(const ConnectionCallbackPtr&); + virtual void setACM(const IceUtil::Optional<int>&, + const IceUtil::Optional<ACMClose>&, + const IceUtil::Optional<ACMHeartbeat>&); + virtual ACM getACM(); + + void requestTimedOut(IceInternal::OutgoingMessageCallback*); + void asyncRequestTimedOut(const IceInternal::OutgoingAsyncMessageCallbackPtr&); + void sendResponse(IceInternal::BasicStream*, Byte); void sendNoResponse(); @@ -200,15 +222,13 @@ public: void exception(const LocalException&); void invokeException(const LocalException&, int); - void dispatch(const StartCallbackPtr&, const std::vector<SentCallback>&, Byte, Int, Int, + void dispatch(const StartCallbackPtr&, const std::vector<OutgoingMessage>&, Byte, Int, Int, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, const IceInternal::OutgoingAsyncPtr&, - IceInternal::BasicStream&); + const ConnectionCallbackPtr&, IceInternal::BasicStream&); void finish(); private: - friend class IceInternal::ConnectionReaper; - enum State { StateNotInitialized, @@ -221,41 +241,7 @@ private: StateFinished }; - struct OutgoingMessage - { - OutgoingMessage(IceInternal::BasicStream* str, bool comp) : - stream(str), out(0), compress(comp), requestId(0), adopted(false), isSent(false) - { - } - - OutgoingMessage(IceInternal::OutgoingMessageCallback* o, IceInternal::BasicStream* str, bool comp, int rid) : - stream(str), out(o), compress(comp), requestId(rid), adopted(false), isSent(false) - { - } - - OutgoingMessage(const IceInternal::OutgoingAsyncMessageCallbackPtr& o, IceInternal::BasicStream* str, - bool comp, int rid) : - stream(str), out(0), outAsync(o), compress(comp), requestId(rid), adopted(false), isSent(false) - { - } - - void adopt(IceInternal::BasicStream*); - bool sent(ConnectionI*, bool); - void finished(const Ice::LocalException&); - - IceInternal::BasicStream* stream; - IceInternal::OutgoingMessageCallback* out; - IceInternal::OutgoingAsyncMessageCallbackPtr outAsync; -#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - IceInternal::OutgoingAsyncPtr replyOutAsync; -#endif - bool compress; - int requestId; - bool adopted; - bool isSent; - }; - - ConnectionI(const Ice::CommunicatorPtr&, const IceInternal::InstancePtr&, const IceInternal::ConnectionReaperPtr&, + ConnectionI(const Ice::CommunicatorPtr&, const IceInternal::InstancePtr&, const IceInternal::ACMMonitorPtr&, const IceInternal::TransceiverPtr&, const IceInternal::ConnectorPtr&, const IceInternal::EndpointIPtr&, const ObjectAdapterPtr&); virtual ~ConnectionI(); @@ -267,10 +253,11 @@ private: void setState(State); void initiateShutdown(); + void heartbeat(); bool initialize(IceInternal::SocketOperation = IceInternal::SocketOperationNone); bool validate(IceInternal::SocketOperation = IceInternal::SocketOperationNone); - IceInternal::SocketOperation sendNextMessage(std::vector<SentCallback>&); + IceInternal::SocketOperation sendNextMessage(std::vector<OutgoingMessage>&); IceInternal::AsyncStatus sendMessage(OutgoingMessage&); #ifndef ICE_OS_WINRT @@ -279,7 +266,7 @@ private: #endif IceInternal::SocketOperation parseMessage(IceInternal::BasicStream&, Int&, Int&, Byte&, IceInternal::ServantManagerPtr&, ObjectAdapterPtr&, - IceInternal::OutgoingAsyncPtr&); + IceInternal::OutgoingAsyncPtr&, ConnectionCallbackPtr&); void invokeAll(IceInternal::BasicStream&, Int, Int, Byte, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&); @@ -288,12 +275,14 @@ private: Ice::ConnectionInfoPtr initConnectionInfo() const; Ice::Instrumentation::ConnectionState toConnectionState(State) const; - + + void reap(); + AsyncResultPtr __begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); Ice::CommunicatorPtr _communicator; const IceInternal::InstancePtr _instance; - const IceInternal::ConnectionReaperPtr _reaper; + IceInternal::ACMMonitorPtr _monitor; const IceInternal::TransceiverPtr _transceiver; const std::string _desc; const std::string _type; @@ -320,8 +309,8 @@ private: const bool _warn; const bool _warnUdp; - const int _acmTimeout; - IceUtil::Time _acmAbsoluteTimeout; + + IceUtil::Time _acmLastActivity; const int _compressionLevel; @@ -355,6 +344,8 @@ private: State _state; // The current state. bool _shutdownInitiated; bool _validated; + + Ice::ConnectionCallbackPtr _callback; }; } |