diff options
author | Benoit Foucher <benoit@zeroc.com> | 2015-03-20 18:18:03 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2015-03-20 18:18:03 +0100 |
commit | c6133370d229578697c157f94b297dcc593b4a5f (patch) | |
tree | 9bbc2e3083338315393109a511e7a65d09dc9ac6 /cpp/src/Ice/Selector.cpp | |
parent | Remove src directory. (diff) | |
download | ice-c6133370d229578697c157f94b297dcc593b4a5f.tar.bz2 ice-c6133370d229578697c157f94b297dcc593b4a5f.tar.xz ice-c6133370d229578697c157f94b297dcc593b4a5f.zip |
Added iOS support for IceTouch
Diffstat (limited to 'cpp/src/Ice/Selector.cpp')
-rw-r--r-- | cpp/src/Ice/Selector.cpp | 479 |
1 files changed, 479 insertions, 0 deletions
diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp index 07f755276cb..39f411fb115 100644 --- a/cpp/src/Ice/Selector.cpp +++ b/cpp/src/Ice/Selector.cpp @@ -14,6 +14,11 @@ #include <Ice/LocalException.h> #include <IceUtil/Time.h> +#ifdef ICE_USE_CFSTREAM +# include <CoreFoundation/CoreFoundation.h> +# include <CoreFoundation/CFStream.h> +#endif + using namespace std; using namespace IceInternal; @@ -519,6 +524,480 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti } } +#elif defined(ICE_USE_CFSTREAM) + +namespace +{ + +void selectorInterrupt(void* info) +{ + reinterpret_cast<Selector*>(info)->processInterrupt(); +} + +void eventHandlerSocketCallback(CFSocketRef, CFSocketCallBackType callbackType, CFDataRef, const void* d, void* info) +{ + if(callbackType == kCFSocketReadCallBack) + { + reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationRead); + } + else if(callbackType == kCFSocketWriteCallBack) + { + reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationWrite); + } + else if(callbackType == kCFSocketConnectCallBack) + { + reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationConnect, + d ? *reinterpret_cast<const SInt32*>(d) : 0); + } +} + +class SelectorHelperThread : public IceUtil::Thread +{ +public: + + SelectorHelperThread(Selector& selector) : _selector(selector) + { + } + + virtual void run() + { + _selector.run(); + } + +private: + + Selector& _selector; +}; + +CFOptionFlags +toCFCallbacks(SocketOperation op) +{ + CFOptionFlags cbs = 0; + if(op & SocketOperationRead) + { + cbs |= kCFSocketReadCallBack; + } + if(op & SocketOperationWrite) + { + cbs |= kCFSocketWriteCallBack; + } + if(op & SocketOperationConnect) + { + cbs |= kCFSocketConnectCallBack; + } + return cbs; +} + +} + +EventHandlerWrapper::EventHandlerWrapper(const EventHandlerPtr& handler, Selector& selector) : + _handler(handler), + _nativeInfo(StreamNativeInfoPtr::dynamicCast(handler->getNativeInfo())), + _selector(selector), + _ready(SocketOperationNone), + _finish(false) +{ + if(!StreamNativeInfoPtr::dynamicCast(handler->getNativeInfo())) + { + SOCKET fd = handler->getNativeInfo()->fd(); + CFSocketContext ctx = { 0, this, 0, 0, 0 }; + _socket = CFSocketCreateWithNative(kCFAllocatorDefault, + fd, + kCFSocketReadCallBack | + kCFSocketWriteCallBack | + kCFSocketConnectCallBack, + eventHandlerSocketCallback, + &ctx); + + // Disable automatic re-enabling of callbacks and closing of the native socket. + CFSocketSetSocketFlags(_socket, 0); + CFSocketDisableCallBacks(_socket, kCFSocketReadCallBack | kCFSocketWriteCallBack | kCFSocketConnectCallBack); + _source = CFSocketCreateRunLoopSource(kCFAllocatorDefault, _socket, 0); + } + else + { + _socket = 0; + _source = 0; + _nativeInfo->initStreams(this); + } +} + +EventHandlerWrapper::~EventHandlerWrapper() +{ + if(_socket) + { + CFRelease(_socket); + CFRelease(_source); + } +} + +void +EventHandlerWrapper::updateRunLoop() +{ + SocketOperation op = _handler->_registered; + assert(!op || !_finish); + + if(_socket) + { + CFSocketDisableCallBacks(_socket, kCFSocketReadCallBack | kCFSocketWriteCallBack | kCFSocketConnectCallBack); + if(op) + { + CFSocketEnableCallBacks(_socket, toCFCallbacks(op)); + } + + if(op && !CFRunLoopContainsSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode)) + { + CFRunLoopAddSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode); + } + else if(!op && CFRunLoopContainsSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode)) + { + CFRunLoopRemoveSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode); + } + + if(_finish) + { + CFSocketInvalidate(_socket); + } + } + else + { + SocketOperation readyOp = _nativeInfo->registerWithRunLoop(op); + if(!(op & (SocketOperationWrite | SocketOperationConnect)) || _ready & SocketOperationWrite) + { + _nativeInfo->unregisterFromRunLoop(SocketOperationWrite, false); + } + + if(!(op & (SocketOperationRead | SocketOperationConnect)) || _ready & SocketOperationRead) + { + _nativeInfo->unregisterFromRunLoop(SocketOperationRead, false); + } + + if(readyOp) + { + ready(readyOp, 0); + } + + if(_finish) + { + _nativeInfo->closeStreams(); + } + } +} + +void +EventHandlerWrapper::readyCallback(SocketOperation op, int error) +{ + _selector.ready(this, op, error); +} + +void +EventHandlerWrapper::ready(SocketOperation op, int error) +{ + if(!_socket) + { + // + // Unregister the stream from the runloop as soon as we got the callback. This is + // required to allow thread pool thread to perform read/write operations on the + // stream (which can't be used from another thread than the run loop thread if + // it's registered with a run loop). + // + op = _nativeInfo->unregisterFromRunLoop(op, error != 0); + } + + op = static_cast<SocketOperation>(_handler->_registered & op); + if(!op || _ready & op) + { + return; + } + + if(_socket) + { + if(op & SocketOperationConnect) + { + _nativeInfo->setConnectError(error); + } + } + + _ready = static_cast<SocketOperation>(_ready | op); + if(!(_handler->_disabled & op)) + { + _selector.addReadyHandler(this); + } +} + +void +EventHandlerWrapper::checkReady() +{ + if(_ready & _handler->_registered) + { + _selector.addReadyHandler(this); + } +} + +SocketOperation +EventHandlerWrapper::readyOp() +{ + assert(!(~_handler->_registered & _ready)); + SocketOperation op = static_cast<SocketOperation>(~_handler->_disabled & _ready); + _ready = static_cast<SocketOperation>(~op & _ready); + return op; +} + +bool +EventHandlerWrapper::update(SocketOperation remove, SocketOperation add) +{ + SocketOperation previous = _handler->_registered; + _handler->_registered = static_cast<SocketOperation>(_handler->_registered & ~remove); + _handler->_registered = static_cast<SocketOperation>(_handler->_registered | add); + if(previous == _handler->_registered) + { + return false; + } + + // Clear ready flags which might not be valid anymore. + _ready = static_cast<SocketOperation>(_ready & _handler->_registered); + return true; +} + +void +EventHandlerWrapper::finish() +{ + _finish = true; + _ready = SocketOperationNone; + _handler->_registered = SocketOperationNone; +} + +Selector::Selector(const InstancePtr& instance) : _instance(instance), _destroyed(false) +{ + CFRunLoopSourceContext ctx; + memset(&ctx, 0, sizeof(CFRunLoopSourceContext)); + ctx.info = this; + ctx.perform = selectorInterrupt; + _source = CFRunLoopSourceCreate(0, 0, &ctx); + _runLoop = 0; + + _thread = new SelectorHelperThread(*this); + _thread->start(); + + Lock sync(*this); + while(!_runLoop) + { + wait(); + } +} + +Selector::~Selector() +{ +} + +void +Selector::destroy() +{ + Lock sync(*this); + + // + // Make sure any pending changes are processed to ensure remaining + // streams/sockets are closed. + // + _destroyed = true; + while(!_changes.empty()) + { + CFRunLoopSourceSignal(_source); + CFRunLoopWakeUp(_runLoop); + + wait(); + } + + _thread->getThreadControl().join(); + _thread = 0; + + CFRelease(_source); + + assert(_wrappers.empty()); + _readyHandlers.clear(); + _selectedHandlers.clear(); +} + +void +Selector::initialize(EventHandler* handler) +{ + Lock sync(*this); + _wrappers[handler] = new EventHandlerWrapper(handler, *this); +} + +void +Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation add) +{ + Lock sync(*this); + const EventHandlerWrapperPtr& wrapper = _wrappers[handler]; + assert(wrapper); + if(wrapper->update(remove, add)) + { + _changes.insert(wrapper); + notify(); + } +} + +void +Selector::enable(EventHandler* handler, SocketOperation op) +{ + Lock sync(*this); + if(!(handler->_disabled & op)) + { + return; + } + handler->_disabled = static_cast<SocketOperation>(handler->_disabled & ~op); + + if(handler->_registered & op) + { + _wrappers[handler]->checkReady(); + } +} + +void +Selector::disable(EventHandler* handler, SocketOperation op) +{ + Lock sync(*this); + if(handler->_disabled & op) + { + return; + } + handler->_disabled = static_cast<SocketOperation>(handler->_disabled | op); +} + +bool +Selector::finish(EventHandler* handler, bool closeNow) +{ + Lock sync(*this); + std::map<EventHandler*, EventHandlerWrapperPtr>::iterator p = _wrappers.find(handler); + assert(p != _wrappers.end()); + EventHandlerWrapperPtr wrapper = p->second; + wrapper->finish(); + _wrappers.erase(p); + _changes.insert(wrapper); + notify(); + return closeNow; +} + +void +Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handlers, int timeout) +{ + Lock sync(*this); + + // + // Re-enable callbacks for previously selected handlers. + // + if(!_selectedHandlers.empty()) + { + vector<pair<EventHandlerWrapperPtr, SocketOperation> >::const_iterator p; + for(p = _selectedHandlers.begin(); p != _selectedHandlers.end(); ++p) + { + if(!p->first->_finish) + { + _changes.insert(p->first); + } + } + _selectedHandlers.clear(); + } + + // + // Wait for handlers to be ready. + // + handlers.clear(); + while(_selectedHandlers.empty()) + { + while(!_changes.empty()) + { + CFRunLoopSourceSignal(_source); + CFRunLoopWakeUp(_runLoop); + + wait(); + } + + if(_readyHandlers.empty()) + { + if(timeout > 0) + { + if(!timedWait(IceUtil::Time::seconds(timeout))) + { + break; + } + } + else + { + wait(); + } + } + + if(!_changes.empty()) + { + continue; // Make sure to process the changes first. + } + + for(vector<EventHandlerWrapperPtr>::const_iterator p = _readyHandlers.begin(); p != _readyHandlers.end(); ++p) + { + SocketOperation op = (*p)->readyOp(); + if(op) + { + _selectedHandlers.push_back(pair<EventHandlerWrapperPtr, SocketOperation>(*p, op)); + handlers.push_back(pair<EventHandler*, SocketOperation>((*p)->_handler.get(), op)); + } + } + _readyHandlers.clear(); + } +} + +void +Selector::processInterrupt() +{ + Lock sync(*this); + if(!_changes.empty()) + { + for(set<EventHandlerWrapperPtr>::const_iterator p = _changes.begin(); p != _changes.end(); ++p) + { + (*p)->updateRunLoop(); + } + _changes.clear(); + notify(); + } + if(_destroyed) + { + CFRunLoopStop(_runLoop); + } +} + +void +Selector::ready(EventHandlerWrapper* wrapper, SocketOperation op, int error) +{ + Lock sync(*this); + wrapper->ready(op, error); +} + +void +Selector::addReadyHandler(EventHandlerWrapper* wrapper) +{ + // Called from ready() + _readyHandlers.push_back(wrapper); + if(_readyHandlers.size() == 1) + { + notify(); + } +} + +void +Selector::run() +{ + { + Lock sync(*this); + _runLoop = CFRunLoopGetCurrent(); + notify(); + } + + CFRunLoopAddSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode); + CFRunLoopRun(); + CFRunLoopRemoveSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode); +} + #elif defined(ICE_USE_SELECT) || defined(ICE_USE_POLL) Selector::Selector(const InstancePtr& instance) : _instance(instance), _selecting(false), _interrupted(false) |