diff options
author | Benoit Foucher <benoit@zeroc.com> | 2015-10-09 15:00:57 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2015-10-09 15:00:57 +0200 |
commit | 20b6c0ccb95118ffc685826904a8edd06a38ac1b (patch) | |
tree | 1b389964fa35ca9de23c548120ecedcc9d82074c /cpp/src/Ice/Selector.h | |
parent | Merge branch '3.6' (diff) | |
download | ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.tar.bz2 ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.tar.xz ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.zip |
Added ready callback to allow transports to signal readiness to the thread pool
Diffstat (limited to 'cpp/src/Ice/Selector.h')
-rw-r--r-- | cpp/src/Ice/Selector.h | 234 |
1 files changed, 93 insertions, 141 deletions
diff --git a/cpp/src/Ice/Selector.h b/cpp/src/Ice/Selector.h index e890db6e2bf..4098da30171 100644 --- a/cpp/src/Ice/Selector.h +++ b/cpp/src/Ice/Selector.h @@ -57,67 +57,59 @@ class SelectorTimeoutException { }; -#if defined(ICE_OS_WINRT) - -struct SelectEvent -{ - SelectEvent(const EventHandlerPtr& handler, SocketOperation status) : handler(handler), status(status) - { - } - EventHandlerPtr handler; - SocketOperation status; -}; +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) -class Selector : IceUtil::Monitor<IceUtil::Mutex> +class Selector { -public: - - Selector(const InstancePtr&); - - void destroy(); - - void initialize(EventHandler*); - void update(EventHandler*, SocketOperation, SocketOperation); - void finish(EventHandler*); - - EventHandlerPtr getNextHandler(SocketOperation&, int); - - void completed(const EventHandlerPtr&, SocketOperation); - -private: - - const InstancePtr _instance; - std::deque<SelectEvent> _events; -}; +#if defined(ICE_OS_WINRT) + struct SelectEvent + { + SelectEvent(const EventHandlerPtr& handler, SocketOperation status) : handler(handler), status(status) + { + } -#elif defined(ICE_USE_IOCP) + EventHandlerPtr handler; + SocketOperation status; + }; +#endif -class Selector -{ public: Selector(const InstancePtr&); ~Selector(); +#ifdef ICE_USE_IOCP void setup(int); - void destroy(); +#endif + void destroy(); void initialize(EventHandler*); void update(EventHandler*, SocketOperation, SocketOperation); void finish(EventHandler*); + void ready(EventHandler*, SocketOperation, bool); + +#ifdef ICE_USE_IOCP EventHandler* getNextHandler(SocketOperation&, DWORD&, int&, int); +#else + EventHandler* getNextHandler(SocketOperation&, int); +#endif + + void completed(EventHandler*, SocketOperation); - HANDLE getIOCPHandle() { return _handle; } - private: const InstancePtr _instance; +#ifdef ICE_USE_IOCP HANDLE _handle; +#else + IceUtil::Monitor<IceUtil::Mutex> _monitor; + std::deque<SelectEvent> _events; +#endif }; -#elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL) +#elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL) || defined(ICE_USE_SELECT) || defined(ICE_USE_POLL) class Selector { @@ -126,7 +118,7 @@ public: Selector(const InstancePtr&); ~Selector(); - void destroy(); + void destroy(); void initialize(EventHandler*) { @@ -137,43 +129,62 @@ public: void disable(EventHandler*, SocketOperation); bool finish(EventHandler*, bool); -#if defined(ICE_USE_KQUEUE) - void updateSelector(); -#endif - - void - startSelect() - { -#ifdef ICE_USE_KQUEUE - _selecting = true; - if(!_changes.empty()) - { - updateSelector(); - } -#endif - } - - void - finishSelect() - { -#ifdef ICE_USE_KQUEUE - _selecting = false; -#endif - } + void ready(EventHandler*, SocketOperation, bool); - void select(std::vector<std::pair<EventHandler*, SocketOperation> >&, int); + void startSelect(); + void finishSelect(std::vector<std::pair<EventHandler*, SocketOperation> >&); + void select(int); private: + void wakeup(); + void checkReady(EventHandler*); + void updateSelector(); + const InstancePtr _instance; + + SOCKET _fdIntrRead; + SOCKET _fdIntrWrite; + bool _interrupted; + bool _selectNow; + int _count; + bool _selecting; + std::map<EventHandlerPtr, SocketOperation> _readyHandlers; + #if defined(ICE_USE_EPOLL) std::vector<struct epoll_event> _events; -#else + int _queueFd; +#elif defined(ICE_USE_KQUEUE) std::vector<struct kevent> _events; std::vector<struct kevent> _changes; - bool _selecting; -#endif int _queueFd; +#elif defined(ICE_USE_SELECT) + std::vector<std::pair<EventHandler*, SocketOperation> > _changes; + std::map<SOCKET, EventHandler*> _handlers; + + fd_set _readFdSet; + fd_set _writeFdSet; + fd_set _errorFdSet; + fd_set _selectedReadFdSet; + fd_set _selectedWriteFdSet; + fd_set _selectedErrorFdSet; + + fd_set* + fdSetCopy(fd_set& dest, fd_set& src) + { + if(src.fd_count > 0) + { + dest.fd_count = src.fd_count; + memcpy(dest.fd_array, src.fd_array, sizeof(SOCKET) * src.fd_count); + return &dest; + } + return 0; + } +#elif defined(ICE_USE_POLL) + std::vector<std::pair<EventHandler*, SocketOperation> > _changes; + std::map<SOCKET, EventHandler*> _handlers; + std::vector<struct pollfd> _pollFdSet; +#endif }; #elif defined(ICE_USE_CFSTREAM) @@ -182,7 +193,7 @@ class Selector; class SelectorReadyCallback : public IceUtil::Shared { -public: +public: virtual ~SelectorReadyCallback() { } virtual void readyCallback(SocketOperation, int = 0) = 0; @@ -201,9 +212,9 @@ public: virtual SocketOperation unregisterFromRunLoop(SocketOperation, bool) = 0; virtual void closeStreams() = 0; - void setConnectError(int error) + void setConnectError(int error) { - _connectError = error; + _connectError = error; } private: @@ -222,14 +233,13 @@ public: void updateRunLoop(); virtual void readyCallback(SocketOperation, int = 0); - void ready(SocketOperation, int); SocketOperation readyOp(); - void checkReady(); + bool checkReady(); bool update(SocketOperation, SocketOperation); - void finish(); + bool finish(); bool operator<(const EventHandlerWrapper& o) { @@ -241,7 +251,7 @@ private: friend class Selector; EventHandlerPtr _handler; - StreamNativeInfoPtr _nativeInfo; + StreamNativeInfoPtr _streamNativeInfo; Selector& _selector; SocketOperation _ready; bool _finish; @@ -266,17 +276,22 @@ public: void disable(EventHandler*, SocketOperation); bool finish(EventHandler*, bool); - void startSelect() { } - void finishSelect() { } - void select(std::vector<std::pair<EventHandler*, SocketOperation> >&, int); + void ready(EventHandler*, SocketOperation, bool); + + void startSelect(); + void finishSelect(std::vector<std::pair<EventHandler*, SocketOperation> >&); + void select(int); void processInterrupt(); - void ready(EventHandlerWrapper*, SocketOperation, int = 0); - void addReadyHandler(EventHandlerWrapper*); void run(); private: + void ready(EventHandlerWrapper*, SocketOperation, int = 0); + void addReadyHandler(EventHandlerWrapper*); + + friend class EventHandlerWrapper; + InstancePtr _instance; IceUtil::ThreadPtr _thread; CFRunLoopRef _runLoop; @@ -285,74 +300,11 @@ private: std::set<EventHandlerWrapperPtr> _changes; - std::vector<EventHandlerWrapperPtr> _readyHandlers; + std::set<EventHandlerWrapperPtr> _readyHandlers; std::vector<std::pair<EventHandlerWrapperPtr, SocketOperation> > _selectedHandlers; std::map<EventHandler*, EventHandlerWrapperPtr> _wrappers; }; -#elif defined(ICE_USE_SELECT) || defined(ICE_USE_POLL) - -class Selector -{ -public: - - Selector(const InstancePtr&); - ~Selector(); - - void destroy(); - - void initialize(EventHandler*) - { - // Nothing to do - } - void update(EventHandler*, SocketOperation, SocketOperation); - void enable(EventHandler*, SocketOperation); - void disable(EventHandler*, SocketOperation); - bool finish(EventHandler*, bool); - - void startSelect(); - void finishSelect(); - void select(std::vector<std::pair<EventHandler*, SocketOperation> >&, int); - -private: - - void updateSelector(); - void updateImpl(EventHandler*); - - const InstancePtr _instance; - - SOCKET _fdIntrRead; - SOCKET _fdIntrWrite; - bool _selecting; - bool _interrupted; - - std::vector<std::pair<EventHandler*, SocketOperation> > _changes; - std::map<SOCKET, EventHandler*> _handlers; - -#if defined(ICE_USE_SELECT) - fd_set _readFdSet; - fd_set _writeFdSet; - fd_set _errorFdSet; - fd_set _selectedReadFdSet; - fd_set _selectedWriteFdSet; - fd_set _selectedErrorFdSet; - - fd_set* - fdSetCopy(fd_set& dest, fd_set& src) - { - if(src.fd_count > 0) - { - dest.fd_count = src.fd_count; - memcpy(dest.fd_array, src.fd_array, sizeof(SOCKET) * src.fd_count); - return &dest; - } - return 0; - } -#else - std::vector<struct pollfd> _pollFdSet; -#endif -}; - #endif } |