diff options
author | Benoit Foucher <benoit@zeroc.com> | 2015-08-11 10:42:04 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2015-08-11 10:42:04 +0200 |
commit | 12f8959ad8d0843a55dd309f087dd1f8a5dda88f (patch) | |
tree | 5186165ce0e0b4bb432a9e70314e597563bb8c56 /cpp/src/Ice/Selector.cpp | |
parent | Updates branch description (diff) | |
download | ice-12f8959ad8d0843a55dd309f087dd1f8a5dda88f.tar.bz2 ice-12f8959ad8d0843a55dd309f087dd1f8a5dda88f.tar.xz ice-12f8959ad8d0843a55dd309f087dd1f8a5dda88f.zip |
Fixed ICE-6695, Ice for WinRT hang on Windows 10
Diffstat (limited to 'cpp/src/Ice/Selector.cpp')
-rw-r--r-- | cpp/src/Ice/Selector.cpp | 96 |
1 files changed, 51 insertions, 45 deletions
diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp index 39f411fb115..af4a5b07c52 100644 --- a/cpp/src/Ice/Selector.cpp +++ b/cpp/src/Ice/Selector.cpp @@ -40,11 +40,16 @@ Selector::destroy() void Selector::initialize(IceInternal::EventHandler* handler) { + EventHandlerPtr h = handler; handler->__incRef(); handler->getNativeInfo()->setCompletedHandler( - ref new SocketOperationCompletedHandler([=](int operation) + ref new SocketOperationCompletedHandler([=](int operation) { - completed(handler, static_cast<SocketOperation>(operation)); + // + // Use the reference counted handler to ensure it's not + // destroyed as long as the callback lambda exists. + // + completed(h, static_cast<SocketOperation>(operation)); })); } @@ -56,12 +61,12 @@ Selector::update(IceInternal::EventHandler* handler, SocketOperation remove, Soc if(add & SocketOperationRead && !(handler->_pending & SocketOperationRead)) { handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationRead); - completed(handler, SocketOperationRead); // Start an asynchrnous read + completed(handler, SocketOperationRead); // Start an asynchrnous read } else if(add & SocketOperationWrite && !(handler->_pending & SocketOperationWrite)) { handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationWrite); - completed(handler, SocketOperationWrite); // Start an asynchrnous write + completed(handler, SocketOperationWrite); // Start an asynchrnous write } } @@ -69,10 +74,11 @@ void Selector::finish(IceInternal::EventHandler* handler) { handler->_registered = SocketOperationNone; + handler->_finish = false; // Ensures that finished() is only called once on the event handler. handler->__decRef(); } -IceInternal::EventHandler* +IceInternal::EventHandlerPtr Selector::getNextHandler(SocketOperation& status, int timeout) { Lock lock(*this); @@ -83,7 +89,7 @@ Selector::getNextHandler(SocketOperation& status, int timeout) timedWait(IceUtil::Time::seconds(timeout)); if(_events.empty()) { - throw SelectorTimeoutException(); + throw SelectorTimeoutException(); } } else @@ -92,15 +98,15 @@ Selector::getNextHandler(SocketOperation& status, int timeout) } } assert(!_events.empty()); - IceInternal::EventHandler* handler = _events.front().handler; + IceInternal::EventHandlerPtr handler = _events.front().handler; const SelectEvent& event = _events.front(); status = event.status; _events.pop_front(); return handler; } -void -Selector::completed(IceInternal::EventHandler* handler, SocketOperation op) +void +Selector::completed(const IceInternal::EventHandlerPtr& handler, SocketOperation op) { Lock lock(*this); _events.push_back(SelectEvent(handler, op)); @@ -316,7 +322,7 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation if(remove & SocketOperationRead) { struct kevent ev; - EV_SET(&ev, fd, EVFILT_READ, EV_DELETE, 0, 0, handler); + EV_SET(&ev, fd, EVFILT_READ, EV_DELETE, 0, 0, handler); _changes.push_back(ev); } if(remove & SocketOperationWrite) @@ -328,14 +334,14 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation if(add & SocketOperationRead) { struct kevent ev; - EV_SET(&ev, fd, EVFILT_READ, EV_ADD | (handler->_disabled & SocketOperationRead ? EV_DISABLE : 0), 0, 0, + EV_SET(&ev, fd, EVFILT_READ, EV_ADD | (handler->_disabled & SocketOperationRead ? EV_DISABLE : 0), 0, 0, handler); _changes.push_back(ev); } if(add & SocketOperationWrite) { struct kevent ev; - EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | (handler->_disabled & SocketOperationWrite ? EV_DISABLE : 0), 0, 0, + EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | (handler->_disabled & SocketOperationWrite ? EV_DISABLE : 0), 0, 0, handler); _changes.push_back(ev); } @@ -391,7 +397,7 @@ Selector::disable(EventHandler* handler, SocketOperation status) return; } handler->_disabled = static_cast<SocketOperation>(handler->_disabled | status); - + if(handler->_registered & status) { SOCKET fd = handler->getNativeInfo()->fd(); @@ -431,7 +437,7 @@ Selector::finish(EventHandler* handler, bool closeNow) { // // Update selector now to remove the FD from the kqueue if - // we're going to close it now. This isn't necessary for + // we're going to close it now. This isn't necessary for // epoll since we always update the epoll FD immediately. // updateSelector(); @@ -441,7 +447,7 @@ Selector::finish(EventHandler* handler, bool closeNow) } #if defined(ICE_USE_KQUEUE) -void +void Selector::updateSelector() { int rs = kevent(_queueFd, &_changes[0], _changes.size(), 0, 0, 0); @@ -454,7 +460,7 @@ Selector::updateSelector() } #endif -void +void Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int timeout) { int ret = 0; @@ -505,9 +511,9 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti #if defined(ICE_USE_EPOLL) struct epoll_event& ev = _events[i]; p.first = reinterpret_cast<EventHandler*>(ev.data.ptr); - p.second = static_cast<SocketOperation>(((ev.events & (EPOLLIN | EPOLLERR)) ? - SocketOperationRead : SocketOperationNone) | - ((ev.events & (EPOLLOUT | EPOLLERR)) ? + p.second = static_cast<SocketOperation>(((ev.events & (EPOLLIN | EPOLLERR)) ? + SocketOperationRead : SocketOperationNone) | + ((ev.events & (EPOLLOUT | EPOLLERR)) ? SocketOperationWrite : SocketOperationNone)); #else struct kevent& ev = _events[i]; @@ -546,7 +552,7 @@ void eventHandlerSocketCallback(CFSocketRef, CFSocketCallBackType callbackType, } else if(callbackType == kCFSocketConnectCallBack) { - reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationConnect, + reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationConnect, d ? *reinterpret_cast<const SInt32*>(d) : 0); } } @@ -558,7 +564,7 @@ public: SelectorHelperThread(Selector& selector) : _selector(selector) { } - + virtual void run() { _selector.run(); @@ -590,8 +596,8 @@ toCFCallbacks(SocketOperation op) } -EventHandlerWrapper::EventHandlerWrapper(const EventHandlerPtr& handler, Selector& selector) : - _handler(handler), +EventHandlerWrapper::EventHandlerWrapper(const EventHandlerPtr& handler, Selector& selector) : + _handler(handler), _nativeInfo(StreamNativeInfoPtr::dynamicCast(handler->getNativeInfo())), _selector(selector), _ready(SocketOperationNone), @@ -601,14 +607,14 @@ EventHandlerWrapper::EventHandlerWrapper(const EventHandlerPtr& handler, Selecto { SOCKET fd = handler->getNativeInfo()->fd(); CFSocketContext ctx = { 0, this, 0, 0, 0 }; - _socket = CFSocketCreateWithNative(kCFAllocatorDefault, + _socket = CFSocketCreateWithNative(kCFAllocatorDefault, fd, - kCFSocketReadCallBack | - kCFSocketWriteCallBack | + kCFSocketReadCallBack | + kCFSocketWriteCallBack | kCFSocketConnectCallBack, - eventHandlerSocketCallback, + eventHandlerSocketCallback, &ctx); - + // Disable automatic re-enabling of callbacks and closing of the native socket. CFSocketSetSocketFlags(_socket, 0); CFSocketDisableCallBacks(_socket, kCFSocketReadCallBack | kCFSocketWriteCallBack | kCFSocketConnectCallBack); @@ -666,7 +672,7 @@ EventHandlerWrapper::updateRunLoop() { _nativeInfo->unregisterFromRunLoop(SocketOperationWrite, false); } - + if(!(op & (SocketOperationRead | SocketOperationConnect)) || _ready & SocketOperationRead) { _nativeInfo->unregisterFromRunLoop(SocketOperationRead, false); @@ -698,7 +704,7 @@ EventHandlerWrapper::ready(SocketOperation op, int error) // // Unregister the stream from the runloop as soon as we got the callback. This is // required to allow thread pool thread to perform read/write operations on the - // stream (which can't be used from another thread than the run loop thread if + // stream (which can't be used from another thread than the run loop thread if // it's registered with a run loop). // op = _nativeInfo->unregisterFromRunLoop(op, error != 0); @@ -731,7 +737,7 @@ EventHandlerWrapper::checkReady() if(_ready & _handler->_registered) { _selector.addReadyHandler(this); - } + } } SocketOperation @@ -748,7 +754,7 @@ EventHandlerWrapper::update(SocketOperation remove, SocketOperation add) { SocketOperation previous = _handler->_registered; _handler->_registered = static_cast<SocketOperation>(_handler->_registered & ~remove); - _handler->_registered = static_cast<SocketOperation>(_handler->_registered | add); + _handler->_registered = static_cast<SocketOperation>(_handler->_registered | add); if(previous == _handler->_registered) { return false; @@ -804,7 +810,7 @@ Selector::destroy() { CFRunLoopSourceSignal(_source); CFRunLoopWakeUp(_runLoop); - + wait(); } @@ -825,7 +831,7 @@ Selector::initialize(EventHandler* handler) _wrappers[handler] = new EventHandlerWrapper(handler, *this); } -void +void Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation add) { Lock sync(*this); @@ -838,7 +844,7 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation } } -void +void Selector::enable(EventHandler* handler, SocketOperation op) { Lock sync(*this); @@ -878,7 +884,7 @@ Selector::finish(EventHandler* handler, bool closeNow) notify(); return closeNow; } - + void Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handlers, int timeout) { @@ -902,7 +908,7 @@ Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handle // // Wait for handlers to be ready. - // + // handlers.clear(); while(_selectedHandlers.empty()) { @@ -910,7 +916,7 @@ Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handle { CFRunLoopSourceSignal(_source); CFRunLoopWakeUp(_runLoop); - + wait(); } @@ -966,7 +972,7 @@ Selector::processInterrupt() } } -void +void Selector::ready(EventHandlerWrapper* wrapper, SocketOperation op, int error) { Lock sync(*this); @@ -984,7 +990,7 @@ Selector::addReadyHandler(EventHandlerWrapper* wrapper) } } -void +void Selector::run() { { @@ -1087,7 +1093,7 @@ Selector::disable(EventHandler* handler, SocketOperation status) return; } handler->_disabled = static_cast<SocketOperation>(handler->_disabled | status); - + if(handler->_registered & status) { updateImpl(handler); @@ -1125,7 +1131,7 @@ Selector::startSelect() { continue; } - + Ice::SocketException ex(__FILE__, __LINE__); ex.error = IceInternal::getSocketErrno(); throw ex; @@ -1148,7 +1154,7 @@ Selector::finishSelect() _selecting = false; } -void +void Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int timeout) { int ret = 0; @@ -1230,7 +1236,7 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti } if(fd == _fdIntrRead) // Interrupted, we have to process the interrupt before returning any handlers - { + { handlers.clear(); return; } @@ -1292,7 +1298,7 @@ Selector::updateImpl(EventHandler* handler) { continue; } - + Ice::SocketException ex(__FILE__, __LINE__); ex.error = IceInternal::getSocketErrno(); throw ex; |