// ********************************************************************** // // Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. // // ********************************************************************** #ifndef ICE_SELECTOR_H #define ICE_SELECTOR_H #include #include #include #include #include #include #if defined(ICE_USE_EPOLL) # include #elif defined(ICE_USE_KQUEUE) # include #elif defined(ICE_USE_IOCP) // Nothing to include #elif defined(ICE_USE_POLL) # include #endif #if defined(ICE_USE_CFSTREAM) # include # include # include struct __CFRunLoop; typedef struct __CFRunLoop * CFRunLoopRef; struct __CFRunLoopSource; typedef struct __CFRunLoopSource * CFRunLoopSourceRef; struct __CFSocket; typedef struct __CFSocket * CFSocketRef; #endif #if defined(ICE_OS_WINRT) # include #endif namespace IceInternal { // // Exception raised if select times out. // class SelectorTimeoutException { }; #if defined(ICE_OS_WINRT) struct SelectEvent { SelectEvent(const EventHandlerPtr& handler, SocketOperation status) : handler(handler), status(status) { } EventHandlerPtr handler; SocketOperation status; }; class Selector : IceUtil::Monitor { public: Selector(const InstancePtr&); void destroy(); void initialize(EventHandler*); void update(EventHandler*, SocketOperation, SocketOperation); void finish(EventHandler*); EventHandlerPtr getNextHandler(SocketOperation&, int); void completed(const EventHandlerPtr&, SocketOperation); private: const InstancePtr _instance; std::deque _events; }; #elif defined(ICE_USE_IOCP) class Selector { public: Selector(const InstancePtr&); ~Selector(); void setup(int); void destroy(); void initialize(EventHandler*); void update(EventHandler*, SocketOperation, SocketOperation); void finish(EventHandler*); EventHandler* getNextHandler(SocketOperation&, DWORD&, int&, int); HANDLE getIOCPHandle() { return _handle; } private: const InstancePtr _instance; HANDLE _handle; }; #elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL) class Selector { public: Selector(const InstancePtr&); ~Selector(); void destroy(); void initialize(EventHandler*) { // Nothing to do } void update(EventHandler*, SocketOperation, SocketOperation); void enable(EventHandler*, SocketOperation); void disable(EventHandler*, SocketOperation); bool finish(EventHandler*, bool); #if defined(ICE_USE_KQUEUE) void updateSelector(); #endif void startSelect() { #ifdef ICE_USE_KQUEUE _selecting = true; if(!_changes.empty()) { updateSelector(); } #endif } void finishSelect() { #ifdef ICE_USE_KQUEUE _selecting = false; #endif } void select(std::vector >&, int); private: const InstancePtr _instance; #if defined(ICE_USE_EPOLL) std::vector _events; #else std::vector _events; std::vector _changes; bool _selecting; #endif int _queueFd; }; #elif defined(ICE_USE_CFSTREAM) class Selector; class SelectorReadyCallback : public IceUtil::Shared { public: virtual ~SelectorReadyCallback() { } virtual void readyCallback(SocketOperation, int = 0) = 0; }; class StreamNativeInfo : public NativeInfo { public: StreamNativeInfo(SOCKET fd) : NativeInfo(fd), _connectError(0) { } virtual void initStreams(SelectorReadyCallback*) = 0; virtual SocketOperation registerWithRunLoop(SocketOperation) = 0; virtual SocketOperation unregisterFromRunLoop(SocketOperation, bool) = 0; virtual void closeStreams() = 0; void setConnectError(int error) { _connectError = error; } private: int _connectError; }; typedef IceUtil::Handle StreamNativeInfoPtr; class EventHandlerWrapper : public SelectorReadyCallback { public: EventHandlerWrapper(const EventHandlerPtr&, Selector&); ~EventHandlerWrapper(); void updateRunLoop(); virtual void readyCallback(SocketOperation, int = 0); void ready(SocketOperation, int); SocketOperation readyOp(); void checkReady(); bool update(SocketOperation, SocketOperation); void finish(); bool operator<(const EventHandlerWrapper& o) { return this < &o; } private: friend class Selector; EventHandlerPtr _handler; StreamNativeInfoPtr _nativeInfo; Selector& _selector; SocketOperation _ready; bool _finish; CFSocketRef _socket; CFRunLoopSourceRef _source; }; typedef IceUtil::Handle EventHandlerWrapperPtr; class Selector : IceUtil::Monitor { public: Selector(const InstancePtr&); virtual ~Selector(); void destroy(); void initialize(EventHandler*); void update(EventHandler*, SocketOperation, SocketOperation); void enable(EventHandler*, SocketOperation); void disable(EventHandler*, SocketOperation); bool finish(EventHandler*, bool); void startSelect() { } void finishSelect() { } void select(std::vector >&, int); void processInterrupt(); void ready(EventHandlerWrapper*, SocketOperation, int = 0); void addReadyHandler(EventHandlerWrapper*); void run(); private: InstancePtr _instance; IceUtil::ThreadPtr _thread; CFRunLoopRef _runLoop; CFRunLoopSourceRef _source; bool _destroyed; std::set _changes; std::vector _readyHandlers; std::vector > _selectedHandlers; std::map _wrappers; }; #elif defined(ICE_USE_SELECT) || defined(ICE_USE_POLL) class Selector { public: Selector(const InstancePtr&); ~Selector(); void destroy(); void initialize(EventHandler*) { // Nothing to do } void update(EventHandler*, SocketOperation, SocketOperation); void enable(EventHandler*, SocketOperation); void disable(EventHandler*, SocketOperation); bool finish(EventHandler*, bool); void startSelect(); void finishSelect(); void select(std::vector >&, int); private: void updateSelector(); void updateImpl(EventHandler*); const InstancePtr _instance; SOCKET _fdIntrRead; SOCKET _fdIntrWrite; bool _selecting; bool _interrupted; std::vector > _changes; std::map _handlers; #if defined(ICE_USE_SELECT) fd_set _readFdSet; fd_set _writeFdSet; fd_set _errorFdSet; fd_set _selectedReadFdSet; fd_set _selectedWriteFdSet; fd_set _selectedErrorFdSet; fd_set* fdSetCopy(fd_set& dest, fd_set& src) { if(src.fd_count > 0) { dest.fd_count = src.fd_count; memcpy(dest.fd_array, src.fd_array, sizeof(SOCKET) * src.fd_count); return &dest; } return 0; } #else std::vector _pollFdSet; #endif }; #endif } #endif