diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.h')
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 116 |
1 files changed, 91 insertions, 25 deletions
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 8e712cb3b42..9d7b9cc15af 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -26,11 +26,16 @@ #include <Ice/TraceLevelsF.h> #include <Ice/OutgoingAsyncF.h> #include <Ice/EventHandler.h> +#include <Ice/SelectorThread.h> + +#include <deque> namespace IceInternal { class Outgoing; +class BatchOutgoing; +class OutgoingMessageCallback; } @@ -39,23 +44,35 @@ namespace Ice class LocalException; -class ICE_API ConnectionI : public Connection, public IceInternal::EventHandler, +class ICE_API ConnectionI : public Connection, + public IceInternal::EventHandler, + public IceInternal::SocketReadyCallback, public IceUtil::Monitor<IceUtil::Mutex> { public: - void validate(); + class StartCallback : virtual public IceUtil::Shared + { + public: + + virtual void connectionStartCompleted(const ConnectionIPtr&) = 0; + virtual void connectionStartFailed(const ConnectionIPtr&, const Ice::LocalException&) = 0; + }; + typedef IceUtil::Handle<StartCallback> StartCallbackPtr; + enum DestructionReason { ObjectAdapterDeactivated, CommunicatorDestroyed }; + + void start(const StartCallbackPtr&); void activate(); void hold(); void destroy(DestructionReason); virtual void close(bool); // From Connection. - bool isDestroyed() const; + bool isActiveOrHolding() const; bool isFinished() const; void throwException() const; // Throws the connection exception if destroyed. @@ -65,14 +82,18 @@ public: void monitor(); - void sendRequest(IceInternal::BasicStream*, IceInternal::Outgoing*, bool); - void sendAsyncRequest(IceInternal::BasicStream*, const IceInternal::OutgoingAsyncPtr&, bool); + bool sendRequest(IceInternal::Outgoing*, bool, bool); + void sendAsyncRequest(const IceInternal::OutgoingAsyncPtr&, bool, bool); void prepareBatchRequest(IceInternal::BasicStream*); void finishBatchRequest(IceInternal::BasicStream*, bool); void abortBatchRequest(); + virtual void flushBatchRequests(); // From Connection. + bool flushBatchRequests(IceInternal::BatchOutgoing*); + void flushAsyncBatchRequests(const IceInternal::BatchOutgoingAsyncPtr&); + void sendResponse(IceInternal::BasicStream*, Byte); void sendNoResponse(); @@ -88,7 +109,7 @@ public: // virtual bool datagram() const; virtual bool readable() const; - virtual void read(IceInternal::BasicStream&); + virtual bool read(IceInternal::BasicStream&); virtual void message(IceInternal::BasicStream&, const IceInternal::ThreadPoolPtr&); virtual void finished(const IceInternal::ThreadPoolPtr&); virtual void exception(const LocalException&); @@ -97,6 +118,12 @@ public: virtual Ice::Int timeout() const; // From Connection. virtual std::string toString() const; // From Connection and EvantHandler. + // + // Operations from SocketReadyCallback + // + virtual IceInternal::SocketStatus socketReady(bool); + virtual void socketTimeout(); + // SSL plug-in needs to be able to get the transceiver. IceInternal::TransceiverPtr getTransceiver() const; @@ -105,12 +132,13 @@ private: ConnectionI(const IceInternal::InstancePtr&, const IceInternal::TransceiverPtr&, const IceInternal::EndpointIPtr&, const ObjectAdapterPtr&, bool, size_t); virtual ~ConnectionI(); - void start(); + friend class IceInternal::IncomingConnectionFactory; friend class IceInternal::OutgoingConnectionFactory; enum State { + StateNotInitialized, StateNotValidated, StateActive, StateHolding, @@ -118,13 +146,53 @@ private: StateClosed }; - void resetBatch(bool); - void flushBatchRequestsInternal(bool); + bool setState(State, const LocalException&); + bool setState(State); + + bool initiateShutdown(bool); + + struct OutgoingMessage + { + OutgoingMessage() + { + } + + OutgoingMessage(IceInternal::BasicStream* str, bool comp) : + stream(str), out(0), compress(comp), response(false), adopted(false) + { + } + + OutgoingMessage(IceInternal::OutgoingMessageCallback* o, IceInternal::BasicStream* str, bool comp, bool resp) : + stream(str), out(o), compress(comp), response(resp), adopted(false) + { + } + + OutgoingMessage(const IceInternal::OutgoingAsyncMessageCallbackPtr& o, IceInternal::BasicStream* str, + bool comp, bool resp) : + stream(str), out(0), outAsync(o), compress(comp), response(resp), adopted(false) + { + } + + void adopt(IceInternal::BasicStream*); + void sent(ConnectionI*, bool); + void finished(const Ice::LocalException&); + + IceInternal::BasicStream* stream; + IceInternal::OutgoingMessageCallback* out; + IceInternal::OutgoingAsyncMessageCallbackPtr outAsync; + bool compress; + bool response; + bool adopted; + }; - void setState(State, const LocalException&); - void setState(State); + IceInternal::SocketStatus initialize(); + IceInternal::SocketStatus validate(); + bool send(int); + bool sendMessage(OutgoingMessage&, bool = false); + void finishSendMessage(); - void initiateShutdown() const; + void finishStart(); + void finishStart(const Ice::LocalException&); void registerWithPool(); void unregisterWithPool(); @@ -171,6 +239,10 @@ private: int _finishedCount; const IceInternal::ThreadPoolPtr _threadPool; + const IceInternal::SelectorThreadPtr _selectorThread; + + StartCallbackPtr _startCallback; + const bool _warn; const int _acmTimeout; @@ -183,13 +255,8 @@ private: std::map<Int, IceInternal::Outgoing*> _requests; std::map<Int, IceInternal::Outgoing*>::iterator _requestsHint; - struct AsyncRequest - { - IceInternal::OutgoingAsyncPtr p; - IceUtil::Time t; - }; - std::map<Int, AsyncRequest> _asyncRequests; - std::map<Int, AsyncRequest>::iterator _asyncRequestsHint; + std::map<Int, IceInternal::OutgoingAsyncPtr> _asyncRequests; + std::map<Int, IceInternal::OutgoingAsyncPtr>::iterator _asyncRequestsHint; std::auto_ptr<LocalException> _exception; @@ -200,16 +267,15 @@ private: bool _batchRequestCompress; size_t _batchMarker; + std::deque<OutgoingMessage> _queuedStreams; + std::deque<OutgoingMessage> _sendStreams; + bool _sendInProgress; + int _waitingForSend; + int _dispatchCount; State _state; // The current state. IceUtil::Time _stateTime; // The last time when the state was changed. - - // - // We have a separate mutex for sending, so that we don't block - // the whole connection when we do a blocking send. - // - IceUtil::Mutex _sendMutex; }; } |