summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.h')
-rw-r--r--cpp/src/Ice/ConnectionI.h116
1 files changed, 91 insertions, 25 deletions
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 8e712cb3b42..9d7b9cc15af 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -26,11 +26,16 @@
#include <Ice/TraceLevelsF.h>
#include <Ice/OutgoingAsyncF.h>
#include <Ice/EventHandler.h>
+#include <Ice/SelectorThread.h>
+
+#include <deque>
namespace IceInternal
{
class Outgoing;
+class BatchOutgoing;
+class OutgoingMessageCallback;
}
@@ -39,23 +44,35 @@ namespace Ice
class LocalException;
-class ICE_API ConnectionI : public Connection, public IceInternal::EventHandler,
+class ICE_API ConnectionI : public Connection,
+ public IceInternal::EventHandler,
+ public IceInternal::SocketReadyCallback,
public IceUtil::Monitor<IceUtil::Mutex>
{
public:
- void validate();
+ class StartCallback : virtual public IceUtil::Shared
+ {
+ public:
+
+ virtual void connectionStartCompleted(const ConnectionIPtr&) = 0;
+ virtual void connectionStartFailed(const ConnectionIPtr&, const Ice::LocalException&) = 0;
+ };
+ typedef IceUtil::Handle<StartCallback> StartCallbackPtr;
+
enum DestructionReason
{
ObjectAdapterDeactivated,
CommunicatorDestroyed
};
+
+ void start(const StartCallbackPtr&);
void activate();
void hold();
void destroy(DestructionReason);
virtual void close(bool); // From Connection.
- bool isDestroyed() const;
+ bool isActiveOrHolding() const;
bool isFinished() const;
void throwException() const; // Throws the connection exception if destroyed.
@@ -65,14 +82,18 @@ public:
void monitor();
- void sendRequest(IceInternal::BasicStream*, IceInternal::Outgoing*, bool);
- void sendAsyncRequest(IceInternal::BasicStream*, const IceInternal::OutgoingAsyncPtr&, bool);
+ bool sendRequest(IceInternal::Outgoing*, bool, bool);
+ void sendAsyncRequest(const IceInternal::OutgoingAsyncPtr&, bool, bool);
void prepareBatchRequest(IceInternal::BasicStream*);
void finishBatchRequest(IceInternal::BasicStream*, bool);
void abortBatchRequest();
+
virtual void flushBatchRequests(); // From Connection.
+ bool flushBatchRequests(IceInternal::BatchOutgoing*);
+ void flushAsyncBatchRequests(const IceInternal::BatchOutgoingAsyncPtr&);
+
void sendResponse(IceInternal::BasicStream*, Byte);
void sendNoResponse();
@@ -88,7 +109,7 @@ public:
//
virtual bool datagram() const;
virtual bool readable() const;
- virtual void read(IceInternal::BasicStream&);
+ virtual bool read(IceInternal::BasicStream&);
virtual void message(IceInternal::BasicStream&, const IceInternal::ThreadPoolPtr&);
virtual void finished(const IceInternal::ThreadPoolPtr&);
virtual void exception(const LocalException&);
@@ -97,6 +118,12 @@ public:
virtual Ice::Int timeout() const; // From Connection.
virtual std::string toString() const; // From Connection and EvantHandler.
+ //
+ // Operations from SocketReadyCallback
+ //
+ virtual IceInternal::SocketStatus socketReady(bool);
+ virtual void socketTimeout();
+
// SSL plug-in needs to be able to get the transceiver.
IceInternal::TransceiverPtr getTransceiver() const;
@@ -105,12 +132,13 @@ private:
ConnectionI(const IceInternal::InstancePtr&, const IceInternal::TransceiverPtr&,
const IceInternal::EndpointIPtr&, const ObjectAdapterPtr&, bool, size_t);
virtual ~ConnectionI();
- void start();
+
friend class IceInternal::IncomingConnectionFactory;
friend class IceInternal::OutgoingConnectionFactory;
enum State
{
+ StateNotInitialized,
StateNotValidated,
StateActive,
StateHolding,
@@ -118,13 +146,53 @@ private:
StateClosed
};
- void resetBatch(bool);
- void flushBatchRequestsInternal(bool);
+ bool setState(State, const LocalException&);
+ bool setState(State);
+
+ bool initiateShutdown(bool);
+
+ struct OutgoingMessage
+ {
+ OutgoingMessage()
+ {
+ }
+
+ OutgoingMessage(IceInternal::BasicStream* str, bool comp) :
+ stream(str), out(0), compress(comp), response(false), adopted(false)
+ {
+ }
+
+ OutgoingMessage(IceInternal::OutgoingMessageCallback* o, IceInternal::BasicStream* str, bool comp, bool resp) :
+ stream(str), out(o), compress(comp), response(resp), adopted(false)
+ {
+ }
+
+ OutgoingMessage(const IceInternal::OutgoingAsyncMessageCallbackPtr& o, IceInternal::BasicStream* str,
+ bool comp, bool resp) :
+ stream(str), out(0), outAsync(o), compress(comp), response(resp), adopted(false)
+ {
+ }
+
+ void adopt(IceInternal::BasicStream*);
+ void sent(ConnectionI*, bool);
+ void finished(const Ice::LocalException&);
+
+ IceInternal::BasicStream* stream;
+ IceInternal::OutgoingMessageCallback* out;
+ IceInternal::OutgoingAsyncMessageCallbackPtr outAsync;
+ bool compress;
+ bool response;
+ bool adopted;
+ };
- void setState(State, const LocalException&);
- void setState(State);
+ IceInternal::SocketStatus initialize();
+ IceInternal::SocketStatus validate();
+ bool send(int);
+ bool sendMessage(OutgoingMessage&, bool = false);
+ void finishSendMessage();
- void initiateShutdown() const;
+ void finishStart();
+ void finishStart(const Ice::LocalException&);
void registerWithPool();
void unregisterWithPool();
@@ -171,6 +239,10 @@ private:
int _finishedCount;
const IceInternal::ThreadPoolPtr _threadPool;
+ const IceInternal::SelectorThreadPtr _selectorThread;
+
+ StartCallbackPtr _startCallback;
+
const bool _warn;
const int _acmTimeout;
@@ -183,13 +255,8 @@ private:
std::map<Int, IceInternal::Outgoing*> _requests;
std::map<Int, IceInternal::Outgoing*>::iterator _requestsHint;
- struct AsyncRequest
- {
- IceInternal::OutgoingAsyncPtr p;
- IceUtil::Time t;
- };
- std::map<Int, AsyncRequest> _asyncRequests;
- std::map<Int, AsyncRequest>::iterator _asyncRequestsHint;
+ std::map<Int, IceInternal::OutgoingAsyncPtr> _asyncRequests;
+ std::map<Int, IceInternal::OutgoingAsyncPtr>::iterator _asyncRequestsHint;
std::auto_ptr<LocalException> _exception;
@@ -200,16 +267,15 @@ private:
bool _batchRequestCompress;
size_t _batchMarker;
+ std::deque<OutgoingMessage> _queuedStreams;
+ std::deque<OutgoingMessage> _sendStreams;
+ bool _sendInProgress;
+ int _waitingForSend;
+
int _dispatchCount;
State _state; // The current state.
IceUtil::Time _stateTime; // The last time when the state was changed.
-
- //
- // We have a separate mutex for sending, so that we don't block
- // the whole connection when we do a blocking send.
- //
- IceUtil::Mutex _sendMutex;
};
}