summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Selector.h
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2015-10-09 15:00:57 +0200
committerBenoit Foucher <benoit@zeroc.com>2015-10-09 15:00:57 +0200
commit20b6c0ccb95118ffc685826904a8edd06a38ac1b (patch)
tree1b389964fa35ca9de23c548120ecedcc9d82074c /cpp/src/Ice/Selector.h
parentMerge branch '3.6' (diff)
downloadice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.tar.bz2
ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.tar.xz
ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.zip
Added ready callback to allow transports to signal readiness to the thread pool
Diffstat (limited to 'cpp/src/Ice/Selector.h')
-rw-r--r--cpp/src/Ice/Selector.h234
1 files changed, 93 insertions, 141 deletions
diff --git a/cpp/src/Ice/Selector.h b/cpp/src/Ice/Selector.h
index e890db6e2bf..4098da30171 100644
--- a/cpp/src/Ice/Selector.h
+++ b/cpp/src/Ice/Selector.h
@@ -57,67 +57,59 @@ class SelectorTimeoutException
{
};
-#if defined(ICE_OS_WINRT)
-
-struct SelectEvent
-{
- SelectEvent(const EventHandlerPtr& handler, SocketOperation status) : handler(handler), status(status)
- {
- }
- EventHandlerPtr handler;
- SocketOperation status;
-};
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
-class Selector : IceUtil::Monitor<IceUtil::Mutex>
+class Selector
{
-public:
-
- Selector(const InstancePtr&);
-
- void destroy();
-
- void initialize(EventHandler*);
- void update(EventHandler*, SocketOperation, SocketOperation);
- void finish(EventHandler*);
-
- EventHandlerPtr getNextHandler(SocketOperation&, int);
-
- void completed(const EventHandlerPtr&, SocketOperation);
-
-private:
-
- const InstancePtr _instance;
- std::deque<SelectEvent> _events;
-};
+#if defined(ICE_OS_WINRT)
+ struct SelectEvent
+ {
+ SelectEvent(const EventHandlerPtr& handler, SocketOperation status) : handler(handler), status(status)
+ {
+ }
-#elif defined(ICE_USE_IOCP)
+ EventHandlerPtr handler;
+ SocketOperation status;
+ };
+#endif
-class Selector
-{
public:
Selector(const InstancePtr&);
~Selector();
+#ifdef ICE_USE_IOCP
void setup(int);
- void destroy();
+#endif
+ void destroy();
void initialize(EventHandler*);
void update(EventHandler*, SocketOperation, SocketOperation);
void finish(EventHandler*);
+ void ready(EventHandler*, SocketOperation, bool);
+
+#ifdef ICE_USE_IOCP
EventHandler* getNextHandler(SocketOperation&, DWORD&, int&, int);
+#else
+ EventHandler* getNextHandler(SocketOperation&, int);
+#endif
+
+ void completed(EventHandler*, SocketOperation);
- HANDLE getIOCPHandle() { return _handle; }
-
private:
const InstancePtr _instance;
+#ifdef ICE_USE_IOCP
HANDLE _handle;
+#else
+ IceUtil::Monitor<IceUtil::Mutex> _monitor;
+ std::deque<SelectEvent> _events;
+#endif
};
-#elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL)
+#elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL) || defined(ICE_USE_SELECT) || defined(ICE_USE_POLL)
class Selector
{
@@ -126,7 +118,7 @@ public:
Selector(const InstancePtr&);
~Selector();
- void destroy();
+ void destroy();
void initialize(EventHandler*)
{
@@ -137,43 +129,62 @@ public:
void disable(EventHandler*, SocketOperation);
bool finish(EventHandler*, bool);
-#if defined(ICE_USE_KQUEUE)
- void updateSelector();
-#endif
-
- void
- startSelect()
- {
-#ifdef ICE_USE_KQUEUE
- _selecting = true;
- if(!_changes.empty())
- {
- updateSelector();
- }
-#endif
- }
-
- void
- finishSelect()
- {
-#ifdef ICE_USE_KQUEUE
- _selecting = false;
-#endif
- }
+ void ready(EventHandler*, SocketOperation, bool);
- void select(std::vector<std::pair<EventHandler*, SocketOperation> >&, int);
+ void startSelect();
+ void finishSelect(std::vector<std::pair<EventHandler*, SocketOperation> >&);
+ void select(int);
private:
+ void wakeup();
+ void checkReady(EventHandler*);
+ void updateSelector();
+
const InstancePtr _instance;
+
+ SOCKET _fdIntrRead;
+ SOCKET _fdIntrWrite;
+ bool _interrupted;
+ bool _selectNow;
+ int _count;
+ bool _selecting;
+ std::map<EventHandlerPtr, SocketOperation> _readyHandlers;
+
#if defined(ICE_USE_EPOLL)
std::vector<struct epoll_event> _events;
-#else
+ int _queueFd;
+#elif defined(ICE_USE_KQUEUE)
std::vector<struct kevent> _events;
std::vector<struct kevent> _changes;
- bool _selecting;
-#endif
int _queueFd;
+#elif defined(ICE_USE_SELECT)
+ std::vector<std::pair<EventHandler*, SocketOperation> > _changes;
+ std::map<SOCKET, EventHandler*> _handlers;
+
+ fd_set _readFdSet;
+ fd_set _writeFdSet;
+ fd_set _errorFdSet;
+ fd_set _selectedReadFdSet;
+ fd_set _selectedWriteFdSet;
+ fd_set _selectedErrorFdSet;
+
+ fd_set*
+ fdSetCopy(fd_set& dest, fd_set& src)
+ {
+ if(src.fd_count > 0)
+ {
+ dest.fd_count = src.fd_count;
+ memcpy(dest.fd_array, src.fd_array, sizeof(SOCKET) * src.fd_count);
+ return &dest;
+ }
+ return 0;
+ }
+#elif defined(ICE_USE_POLL)
+ std::vector<std::pair<EventHandler*, SocketOperation> > _changes;
+ std::map<SOCKET, EventHandler*> _handlers;
+ std::vector<struct pollfd> _pollFdSet;
+#endif
};
#elif defined(ICE_USE_CFSTREAM)
@@ -182,7 +193,7 @@ class Selector;
class SelectorReadyCallback : public IceUtil::Shared
{
-public:
+public:
virtual ~SelectorReadyCallback() { }
virtual void readyCallback(SocketOperation, int = 0) = 0;
@@ -201,9 +212,9 @@ public:
virtual SocketOperation unregisterFromRunLoop(SocketOperation, bool) = 0;
virtual void closeStreams() = 0;
- void setConnectError(int error)
+ void setConnectError(int error)
{
- _connectError = error;
+ _connectError = error;
}
private:
@@ -222,14 +233,13 @@ public:
void updateRunLoop();
virtual void readyCallback(SocketOperation, int = 0);
-
void ready(SocketOperation, int);
SocketOperation readyOp();
- void checkReady();
+ bool checkReady();
bool update(SocketOperation, SocketOperation);
- void finish();
+ bool finish();
bool operator<(const EventHandlerWrapper& o)
{
@@ -241,7 +251,7 @@ private:
friend class Selector;
EventHandlerPtr _handler;
- StreamNativeInfoPtr _nativeInfo;
+ StreamNativeInfoPtr _streamNativeInfo;
Selector& _selector;
SocketOperation _ready;
bool _finish;
@@ -266,17 +276,22 @@ public:
void disable(EventHandler*, SocketOperation);
bool finish(EventHandler*, bool);
- void startSelect() { }
- void finishSelect() { }
- void select(std::vector<std::pair<EventHandler*, SocketOperation> >&, int);
+ void ready(EventHandler*, SocketOperation, bool);
+
+ void startSelect();
+ void finishSelect(std::vector<std::pair<EventHandler*, SocketOperation> >&);
+ void select(int);
void processInterrupt();
- void ready(EventHandlerWrapper*, SocketOperation, int = 0);
- void addReadyHandler(EventHandlerWrapper*);
void run();
private:
+ void ready(EventHandlerWrapper*, SocketOperation, int = 0);
+ void addReadyHandler(EventHandlerWrapper*);
+
+ friend class EventHandlerWrapper;
+
InstancePtr _instance;
IceUtil::ThreadPtr _thread;
CFRunLoopRef _runLoop;
@@ -285,74 +300,11 @@ private:
std::set<EventHandlerWrapperPtr> _changes;
- std::vector<EventHandlerWrapperPtr> _readyHandlers;
+ std::set<EventHandlerWrapperPtr> _readyHandlers;
std::vector<std::pair<EventHandlerWrapperPtr, SocketOperation> > _selectedHandlers;
std::map<EventHandler*, EventHandlerWrapperPtr> _wrappers;
};
-#elif defined(ICE_USE_SELECT) || defined(ICE_USE_POLL)
-
-class Selector
-{
-public:
-
- Selector(const InstancePtr&);
- ~Selector();
-
- void destroy();
-
- void initialize(EventHandler*)
- {
- // Nothing to do
- }
- void update(EventHandler*, SocketOperation, SocketOperation);
- void enable(EventHandler*, SocketOperation);
- void disable(EventHandler*, SocketOperation);
- bool finish(EventHandler*, bool);
-
- void startSelect();
- void finishSelect();
- void select(std::vector<std::pair<EventHandler*, SocketOperation> >&, int);
-
-private:
-
- void updateSelector();
- void updateImpl(EventHandler*);
-
- const InstancePtr _instance;
-
- SOCKET _fdIntrRead;
- SOCKET _fdIntrWrite;
- bool _selecting;
- bool _interrupted;
-
- std::vector<std::pair<EventHandler*, SocketOperation> > _changes;
- std::map<SOCKET, EventHandler*> _handlers;
-
-#if defined(ICE_USE_SELECT)
- fd_set _readFdSet;
- fd_set _writeFdSet;
- fd_set _errorFdSet;
- fd_set _selectedReadFdSet;
- fd_set _selectedWriteFdSet;
- fd_set _selectedErrorFdSet;
-
- fd_set*
- fdSetCopy(fd_set& dest, fd_set& src)
- {
- if(src.fd_count > 0)
- {
- dest.fd_count = src.fd_count;
- memcpy(dest.fd_array, src.fd_array, sizeof(SOCKET) * src.fd_count);
- return &dest;
- }
- return 0;
- }
-#else
- std::vector<struct pollfd> _pollFdSet;
-#endif
-};
-
#endif
}