summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Selector.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2015-03-20 18:18:03 +0100
committerBenoit Foucher <benoit@zeroc.com>2015-03-20 18:18:03 +0100
commitc6133370d229578697c157f94b297dcc593b4a5f (patch)
tree9bbc2e3083338315393109a511e7a65d09dc9ac6 /cpp/src/Ice/Selector.cpp
parentRemove src directory. (diff)
downloadice-c6133370d229578697c157f94b297dcc593b4a5f.tar.bz2
ice-c6133370d229578697c157f94b297dcc593b4a5f.tar.xz
ice-c6133370d229578697c157f94b297dcc593b4a5f.zip
Added iOS support for IceTouch
Diffstat (limited to 'cpp/src/Ice/Selector.cpp')
-rw-r--r--cpp/src/Ice/Selector.cpp479
1 files changed, 479 insertions, 0 deletions
diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp
index 07f755276cb..39f411fb115 100644
--- a/cpp/src/Ice/Selector.cpp
+++ b/cpp/src/Ice/Selector.cpp
@@ -14,6 +14,11 @@
#include <Ice/LocalException.h>
#include <IceUtil/Time.h>
+#ifdef ICE_USE_CFSTREAM
+# include <CoreFoundation/CoreFoundation.h>
+# include <CoreFoundation/CFStream.h>
+#endif
+
using namespace std;
using namespace IceInternal;
@@ -519,6 +524,480 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti
}
}
+#elif defined(ICE_USE_CFSTREAM)
+
+namespace
+{
+
+void selectorInterrupt(void* info)
+{
+ reinterpret_cast<Selector*>(info)->processInterrupt();
+}
+
+void eventHandlerSocketCallback(CFSocketRef, CFSocketCallBackType callbackType, CFDataRef, const void* d, void* info)
+{
+ if(callbackType == kCFSocketReadCallBack)
+ {
+ reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationRead);
+ }
+ else if(callbackType == kCFSocketWriteCallBack)
+ {
+ reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationWrite);
+ }
+ else if(callbackType == kCFSocketConnectCallBack)
+ {
+ reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationConnect,
+ d ? *reinterpret_cast<const SInt32*>(d) : 0);
+ }
+}
+
+class SelectorHelperThread : public IceUtil::Thread
+{
+public:
+
+ SelectorHelperThread(Selector& selector) : _selector(selector)
+ {
+ }
+
+ virtual void run()
+ {
+ _selector.run();
+ }
+
+private:
+
+ Selector& _selector;
+};
+
+CFOptionFlags
+toCFCallbacks(SocketOperation op)
+{
+ CFOptionFlags cbs = 0;
+ if(op & SocketOperationRead)
+ {
+ cbs |= kCFSocketReadCallBack;
+ }
+ if(op & SocketOperationWrite)
+ {
+ cbs |= kCFSocketWriteCallBack;
+ }
+ if(op & SocketOperationConnect)
+ {
+ cbs |= kCFSocketConnectCallBack;
+ }
+ return cbs;
+}
+
+}
+
+EventHandlerWrapper::EventHandlerWrapper(const EventHandlerPtr& handler, Selector& selector) :
+ _handler(handler),
+ _nativeInfo(StreamNativeInfoPtr::dynamicCast(handler->getNativeInfo())),
+ _selector(selector),
+ _ready(SocketOperationNone),
+ _finish(false)
+{
+ if(!StreamNativeInfoPtr::dynamicCast(handler->getNativeInfo()))
+ {
+ SOCKET fd = handler->getNativeInfo()->fd();
+ CFSocketContext ctx = { 0, this, 0, 0, 0 };
+ _socket = CFSocketCreateWithNative(kCFAllocatorDefault,
+ fd,
+ kCFSocketReadCallBack |
+ kCFSocketWriteCallBack |
+ kCFSocketConnectCallBack,
+ eventHandlerSocketCallback,
+ &ctx);
+
+ // Disable automatic re-enabling of callbacks and closing of the native socket.
+ CFSocketSetSocketFlags(_socket, 0);
+ CFSocketDisableCallBacks(_socket, kCFSocketReadCallBack | kCFSocketWriteCallBack | kCFSocketConnectCallBack);
+ _source = CFSocketCreateRunLoopSource(kCFAllocatorDefault, _socket, 0);
+ }
+ else
+ {
+ _socket = 0;
+ _source = 0;
+ _nativeInfo->initStreams(this);
+ }
+}
+
+EventHandlerWrapper::~EventHandlerWrapper()
+{
+ if(_socket)
+ {
+ CFRelease(_socket);
+ CFRelease(_source);
+ }
+}
+
+void
+EventHandlerWrapper::updateRunLoop()
+{
+ SocketOperation op = _handler->_registered;
+ assert(!op || !_finish);
+
+ if(_socket)
+ {
+ CFSocketDisableCallBacks(_socket, kCFSocketReadCallBack | kCFSocketWriteCallBack | kCFSocketConnectCallBack);
+ if(op)
+ {
+ CFSocketEnableCallBacks(_socket, toCFCallbacks(op));
+ }
+
+ if(op && !CFRunLoopContainsSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode))
+ {
+ CFRunLoopAddSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode);
+ }
+ else if(!op && CFRunLoopContainsSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode))
+ {
+ CFRunLoopRemoveSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode);
+ }
+
+ if(_finish)
+ {
+ CFSocketInvalidate(_socket);
+ }
+ }
+ else
+ {
+ SocketOperation readyOp = _nativeInfo->registerWithRunLoop(op);
+ if(!(op & (SocketOperationWrite | SocketOperationConnect)) || _ready & SocketOperationWrite)
+ {
+ _nativeInfo->unregisterFromRunLoop(SocketOperationWrite, false);
+ }
+
+ if(!(op & (SocketOperationRead | SocketOperationConnect)) || _ready & SocketOperationRead)
+ {
+ _nativeInfo->unregisterFromRunLoop(SocketOperationRead, false);
+ }
+
+ if(readyOp)
+ {
+ ready(readyOp, 0);
+ }
+
+ if(_finish)
+ {
+ _nativeInfo->closeStreams();
+ }
+ }
+}
+
+void
+EventHandlerWrapper::readyCallback(SocketOperation op, int error)
+{
+ _selector.ready(this, op, error);
+}
+
+void
+EventHandlerWrapper::ready(SocketOperation op, int error)
+{
+ if(!_socket)
+ {
+ //
+ // Unregister the stream from the runloop as soon as we got the callback. This is
+ // required to allow thread pool thread to perform read/write operations on the
+ // stream (which can't be used from another thread than the run loop thread if
+ // it's registered with a run loop).
+ //
+ op = _nativeInfo->unregisterFromRunLoop(op, error != 0);
+ }
+
+ op = static_cast<SocketOperation>(_handler->_registered & op);
+ if(!op || _ready & op)
+ {
+ return;
+ }
+
+ if(_socket)
+ {
+ if(op & SocketOperationConnect)
+ {
+ _nativeInfo->setConnectError(error);
+ }
+ }
+
+ _ready = static_cast<SocketOperation>(_ready | op);
+ if(!(_handler->_disabled & op))
+ {
+ _selector.addReadyHandler(this);
+ }
+}
+
+void
+EventHandlerWrapper::checkReady()
+{
+ if(_ready & _handler->_registered)
+ {
+ _selector.addReadyHandler(this);
+ }
+}
+
+SocketOperation
+EventHandlerWrapper::readyOp()
+{
+ assert(!(~_handler->_registered & _ready));
+ SocketOperation op = static_cast<SocketOperation>(~_handler->_disabled & _ready);
+ _ready = static_cast<SocketOperation>(~op & _ready);
+ return op;
+}
+
+bool
+EventHandlerWrapper::update(SocketOperation remove, SocketOperation add)
+{
+ SocketOperation previous = _handler->_registered;
+ _handler->_registered = static_cast<SocketOperation>(_handler->_registered & ~remove);
+ _handler->_registered = static_cast<SocketOperation>(_handler->_registered | add);
+ if(previous == _handler->_registered)
+ {
+ return false;
+ }
+
+ // Clear ready flags which might not be valid anymore.
+ _ready = static_cast<SocketOperation>(_ready & _handler->_registered);
+ return true;
+}
+
+void
+EventHandlerWrapper::finish()
+{
+ _finish = true;
+ _ready = SocketOperationNone;
+ _handler->_registered = SocketOperationNone;
+}
+
+Selector::Selector(const InstancePtr& instance) : _instance(instance), _destroyed(false)
+{
+ CFRunLoopSourceContext ctx;
+ memset(&ctx, 0, sizeof(CFRunLoopSourceContext));
+ ctx.info = this;
+ ctx.perform = selectorInterrupt;
+ _source = CFRunLoopSourceCreate(0, 0, &ctx);
+ _runLoop = 0;
+
+ _thread = new SelectorHelperThread(*this);
+ _thread->start();
+
+ Lock sync(*this);
+ while(!_runLoop)
+ {
+ wait();
+ }
+}
+
+Selector::~Selector()
+{
+}
+
+void
+Selector::destroy()
+{
+ Lock sync(*this);
+
+ //
+ // Make sure any pending changes are processed to ensure remaining
+ // streams/sockets are closed.
+ //
+ _destroyed = true;
+ while(!_changes.empty())
+ {
+ CFRunLoopSourceSignal(_source);
+ CFRunLoopWakeUp(_runLoop);
+
+ wait();
+ }
+
+ _thread->getThreadControl().join();
+ _thread = 0;
+
+ CFRelease(_source);
+
+ assert(_wrappers.empty());
+ _readyHandlers.clear();
+ _selectedHandlers.clear();
+}
+
+void
+Selector::initialize(EventHandler* handler)
+{
+ Lock sync(*this);
+ _wrappers[handler] = new EventHandlerWrapper(handler, *this);
+}
+
+void
+Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation add)
+{
+ Lock sync(*this);
+ const EventHandlerWrapperPtr& wrapper = _wrappers[handler];
+ assert(wrapper);
+ if(wrapper->update(remove, add))
+ {
+ _changes.insert(wrapper);
+ notify();
+ }
+}
+
+void
+Selector::enable(EventHandler* handler, SocketOperation op)
+{
+ Lock sync(*this);
+ if(!(handler->_disabled & op))
+ {
+ return;
+ }
+ handler->_disabled = static_cast<SocketOperation>(handler->_disabled & ~op);
+
+ if(handler->_registered & op)
+ {
+ _wrappers[handler]->checkReady();
+ }
+}
+
+void
+Selector::disable(EventHandler* handler, SocketOperation op)
+{
+ Lock sync(*this);
+ if(handler->_disabled & op)
+ {
+ return;
+ }
+ handler->_disabled = static_cast<SocketOperation>(handler->_disabled | op);
+}
+
+bool
+Selector::finish(EventHandler* handler, bool closeNow)
+{
+ Lock sync(*this);
+ std::map<EventHandler*, EventHandlerWrapperPtr>::iterator p = _wrappers.find(handler);
+ assert(p != _wrappers.end());
+ EventHandlerWrapperPtr wrapper = p->second;
+ wrapper->finish();
+ _wrappers.erase(p);
+ _changes.insert(wrapper);
+ notify();
+ return closeNow;
+}
+
+void
+Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handlers, int timeout)
+{
+ Lock sync(*this);
+
+ //
+ // Re-enable callbacks for previously selected handlers.
+ //
+ if(!_selectedHandlers.empty())
+ {
+ vector<pair<EventHandlerWrapperPtr, SocketOperation> >::const_iterator p;
+ for(p = _selectedHandlers.begin(); p != _selectedHandlers.end(); ++p)
+ {
+ if(!p->first->_finish)
+ {
+ _changes.insert(p->first);
+ }
+ }
+ _selectedHandlers.clear();
+ }
+
+ //
+ // Wait for handlers to be ready.
+ //
+ handlers.clear();
+ while(_selectedHandlers.empty())
+ {
+ while(!_changes.empty())
+ {
+ CFRunLoopSourceSignal(_source);
+ CFRunLoopWakeUp(_runLoop);
+
+ wait();
+ }
+
+ if(_readyHandlers.empty())
+ {
+ if(timeout > 0)
+ {
+ if(!timedWait(IceUtil::Time::seconds(timeout)))
+ {
+ break;
+ }
+ }
+ else
+ {
+ wait();
+ }
+ }
+
+ if(!_changes.empty())
+ {
+ continue; // Make sure to process the changes first.
+ }
+
+ for(vector<EventHandlerWrapperPtr>::const_iterator p = _readyHandlers.begin(); p != _readyHandlers.end(); ++p)
+ {
+ SocketOperation op = (*p)->readyOp();
+ if(op)
+ {
+ _selectedHandlers.push_back(pair<EventHandlerWrapperPtr, SocketOperation>(*p, op));
+ handlers.push_back(pair<EventHandler*, SocketOperation>((*p)->_handler.get(), op));
+ }
+ }
+ _readyHandlers.clear();
+ }
+}
+
+void
+Selector::processInterrupt()
+{
+ Lock sync(*this);
+ if(!_changes.empty())
+ {
+ for(set<EventHandlerWrapperPtr>::const_iterator p = _changes.begin(); p != _changes.end(); ++p)
+ {
+ (*p)->updateRunLoop();
+ }
+ _changes.clear();
+ notify();
+ }
+ if(_destroyed)
+ {
+ CFRunLoopStop(_runLoop);
+ }
+}
+
+void
+Selector::ready(EventHandlerWrapper* wrapper, SocketOperation op, int error)
+{
+ Lock sync(*this);
+ wrapper->ready(op, error);
+}
+
+void
+Selector::addReadyHandler(EventHandlerWrapper* wrapper)
+{
+ // Called from ready()
+ _readyHandlers.push_back(wrapper);
+ if(_readyHandlers.size() == 1)
+ {
+ notify();
+ }
+}
+
+void
+Selector::run()
+{
+ {
+ Lock sync(*this);
+ _runLoop = CFRunLoopGetCurrent();
+ notify();
+ }
+
+ CFRunLoopAddSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode);
+ CFRunLoopRun();
+ CFRunLoopRemoveSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode);
+}
+
#elif defined(ICE_USE_SELECT) || defined(ICE_USE_POLL)
Selector::Selector(const InstancePtr& instance) : _instance(instance), _selecting(false), _interrupted(false)