summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.h')
-rw-r--r--cpp/src/Ice/ConnectionI.h141
1 files changed, 66 insertions, 75 deletions
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 5004046cc33..4a29e8d62b0 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -33,6 +33,7 @@
#include <Ice/Dispatcher.h>
#include <Ice/ObserverHelper.h>
#include <Ice/ConnectionAsync.h>
+#include <Ice/ACM.h>
#include <deque>
#include <IceUtil/UniquePtr.h>
@@ -44,19 +45,6 @@ class Outgoing;
class BatchOutgoing;
class OutgoingMessageCallback;
-class ConnectionReaper : public IceUtil::Mutex, public IceUtil::Shared
-{
-public:
-
- void add(const Ice::ConnectionIPtr&);
- void swapConnections(std::vector<Ice::ConnectionIPtr>&);
-
-private:
-
- std::vector<Ice::ConnectionIPtr> _connections;
-};
-typedef IceUtil::Handle<ConnectionReaper> ConnectionReaperPtr;
-
}
namespace Ice
@@ -87,35 +75,60 @@ class ICE_API ConnectionI : public Connection, public IceInternal::EventHandler,
public:
- class StartCallback : virtual public IceUtil::Shared
- {
- public:
-
- virtual void connectionStartCompleted(const ConnectionIPtr&) = 0;
- virtual void connectionStartFailed(const ConnectionIPtr&, const Ice::LocalException&) = 0;
- };
- typedef IceUtil::Handle<StartCallback> StartCallbackPtr;
-
- struct SentCallback
+ struct OutgoingMessage
{
+ OutgoingMessage(IceInternal::BasicStream* str, bool comp) :
+ stream(str), out(0), compress(comp), requestId(0), adopted(false), isSent(false)
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- SentCallback(const IceInternal::OutgoingAsyncMessageCallbackPtr& outAsync,
- const IceInternal::OutgoingAsyncPtr& replyOutAsync) :
- outAsync(outAsync), replyOutAsync(replyOutAsync)
+ , invokeSentCallback(false), receivedReply(false)
+#endif
{
}
-#else
- SentCallback(const IceInternal::OutgoingAsyncMessageCallbackPtr& outAsync) : outAsync(outAsync)
+
+ OutgoingMessage(IceInternal::OutgoingMessageCallback* o, IceInternal::BasicStream* str, bool comp, int rid) :
+ stream(str), out(o), compress(comp), requestId(rid), adopted(false), isSent(false)
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ , invokeSentCallback(false), receivedReply(false)
+#endif
{
}
+
+ OutgoingMessage(const IceInternal::OutgoingAsyncMessageCallbackPtr& o, IceInternal::BasicStream* str,
+ bool comp, int rid) :
+ stream(str), out(0), outAsync(o), compress(comp), requestId(rid), adopted(false), isSent(false)
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ , invokeSentCallback(false), receivedReply(false)
#endif
+ {
+ }
+
+ void adopt(IceInternal::BasicStream*);
+ void timedOut();
+ bool sent();
+ void finished(const Ice::LocalException&);
+ IceInternal::BasicStream* stream;
+ IceInternal::OutgoingMessageCallback* out;
IceInternal::OutgoingAsyncMessageCallbackPtr outAsync;
+ bool compress;
+ int requestId;
+ bool adopted;
+ bool isSent;
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- IceInternal::OutgoingAsyncPtr replyOutAsync;
+ bool invokeSentCallback;
+ bool receivedReply;
#endif
};
+ class StartCallback : virtual public IceUtil::Shared
+ {
+ public:
+
+ virtual void connectionStartCompleted(const ConnectionIPtr&) = 0;
+ virtual void connectionStartFailed(const ConnectionIPtr&, const Ice::LocalException&) = 0;
+ };
+ typedef IceUtil::Handle<StartCallback> StartCallbackPtr;
+
enum DestructionReason
{
ObjectAdapterDeactivated,
@@ -138,7 +151,7 @@ public:
void updateObserver();
- void monitor(const IceUtil::Time&);
+ void monitor(const IceUtil::Time&, const IceInternal::ACMConfig&);
bool sendRequest(IceInternal::Outgoing*, bool, bool);
IceInternal::AsyncStatus sendAsyncRequest(const IceInternal::OutgoingAsyncPtr&, bool, bool);
@@ -167,6 +180,15 @@ public:
bool flushBatchRequests(IceInternal::BatchOutgoing*);
IceInternal::AsyncStatus flushAsyncBatchRequests(const IceInternal::BatchOutgoingAsyncPtr&);
+ virtual void setCallback(const ConnectionCallbackPtr&);
+ virtual void setACM(const IceUtil::Optional<int>&,
+ const IceUtil::Optional<ACMClose>&,
+ const IceUtil::Optional<ACMHeartbeat>&);
+ virtual ACM getACM();
+
+ void requestTimedOut(IceInternal::OutgoingMessageCallback*);
+ void asyncRequestTimedOut(const IceInternal::OutgoingAsyncMessageCallbackPtr&);
+
void sendResponse(IceInternal::BasicStream*, Byte);
void sendNoResponse();
@@ -200,15 +222,13 @@ public:
void exception(const LocalException&);
void invokeException(const LocalException&, int);
- void dispatch(const StartCallbackPtr&, const std::vector<SentCallback>&, Byte, Int, Int,
+ void dispatch(const StartCallbackPtr&, const std::vector<OutgoingMessage>&, Byte, Int, Int,
const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, const IceInternal::OutgoingAsyncPtr&,
- IceInternal::BasicStream&);
+ const ConnectionCallbackPtr&, IceInternal::BasicStream&);
void finish();
private:
- friend class IceInternal::ConnectionReaper;
-
enum State
{
StateNotInitialized,
@@ -221,41 +241,7 @@ private:
StateFinished
};
- struct OutgoingMessage
- {
- OutgoingMessage(IceInternal::BasicStream* str, bool comp) :
- stream(str), out(0), compress(comp), requestId(0), adopted(false), isSent(false)
- {
- }
-
- OutgoingMessage(IceInternal::OutgoingMessageCallback* o, IceInternal::BasicStream* str, bool comp, int rid) :
- stream(str), out(o), compress(comp), requestId(rid), adopted(false), isSent(false)
- {
- }
-
- OutgoingMessage(const IceInternal::OutgoingAsyncMessageCallbackPtr& o, IceInternal::BasicStream* str,
- bool comp, int rid) :
- stream(str), out(0), outAsync(o), compress(comp), requestId(rid), adopted(false), isSent(false)
- {
- }
-
- void adopt(IceInternal::BasicStream*);
- bool sent(ConnectionI*, bool);
- void finished(const Ice::LocalException&);
-
- IceInternal::BasicStream* stream;
- IceInternal::OutgoingMessageCallback* out;
- IceInternal::OutgoingAsyncMessageCallbackPtr outAsync;
-#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- IceInternal::OutgoingAsyncPtr replyOutAsync;
-#endif
- bool compress;
- int requestId;
- bool adopted;
- bool isSent;
- };
-
- ConnectionI(const Ice::CommunicatorPtr&, const IceInternal::InstancePtr&, const IceInternal::ConnectionReaperPtr&,
+ ConnectionI(const Ice::CommunicatorPtr&, const IceInternal::InstancePtr&, const IceInternal::ACMMonitorPtr&,
const IceInternal::TransceiverPtr&, const IceInternal::ConnectorPtr&,
const IceInternal::EndpointIPtr&, const ObjectAdapterPtr&);
virtual ~ConnectionI();
@@ -267,10 +253,11 @@ private:
void setState(State);
void initiateShutdown();
+ void heartbeat();
bool initialize(IceInternal::SocketOperation = IceInternal::SocketOperationNone);
bool validate(IceInternal::SocketOperation = IceInternal::SocketOperationNone);
- IceInternal::SocketOperation sendNextMessage(std::vector<SentCallback>&);
+ IceInternal::SocketOperation sendNextMessage(std::vector<OutgoingMessage>&);
IceInternal::AsyncStatus sendMessage(OutgoingMessage&);
#ifndef ICE_OS_WINRT
@@ -279,7 +266,7 @@ private:
#endif
IceInternal::SocketOperation parseMessage(IceInternal::BasicStream&, Int&, Int&, Byte&,
IceInternal::ServantManagerPtr&, ObjectAdapterPtr&,
- IceInternal::OutgoingAsyncPtr&);
+ IceInternal::OutgoingAsyncPtr&, ConnectionCallbackPtr&);
void invokeAll(IceInternal::BasicStream&, Int, Int, Byte,
const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&);
@@ -288,12 +275,14 @@ private:
Ice::ConnectionInfoPtr initConnectionInfo() const;
Ice::Instrumentation::ConnectionState toConnectionState(State) const;
-
+
+ void reap();
+
AsyncResultPtr __begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
Ice::CommunicatorPtr _communicator;
const IceInternal::InstancePtr _instance;
- const IceInternal::ConnectionReaperPtr _reaper;
+ IceInternal::ACMMonitorPtr _monitor;
const IceInternal::TransceiverPtr _transceiver;
const std::string _desc;
const std::string _type;
@@ -320,8 +309,8 @@ private:
const bool _warn;
const bool _warnUdp;
- const int _acmTimeout;
- IceUtil::Time _acmAbsoluteTimeout;
+
+ IceUtil::Time _acmLastActivity;
const int _compressionLevel;
@@ -355,6 +344,8 @@ private:
State _state; // The current state.
bool _shutdownInitiated;
bool _validated;
+
+ Ice::ConnectionCallbackPtr _callback;
};
}