diff options
author | Benoit Foucher <benoit@zeroc.com> | 2015-08-11 10:42:04 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2015-08-11 10:42:04 +0200 |
commit | 12f8959ad8d0843a55dd309f087dd1f8a5dda88f (patch) | |
tree | 5186165ce0e0b4bb432a9e70314e597563bb8c56 /cpp/src/Ice | |
parent | Updates branch description (diff) | |
download | ice-12f8959ad8d0843a55dd309f087dd1f8a5dda88f.tar.bz2 ice-12f8959ad8d0843a55dd309f087dd1f8a5dda88f.tar.xz ice-12f8959ad8d0843a55dd309f087dd1f8a5dda88f.zip |
Fixed ICE-6695, Ice for WinRT hang on Windows 10
Diffstat (limited to 'cpp/src/Ice')
-rw-r--r-- | cpp/src/Ice/Network.h | 10 | ||||
-rw-r--r-- | cpp/src/Ice/Selector.cpp | 96 | ||||
-rw-r--r-- | cpp/src/Ice/Selector.h | 8 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 10 | ||||
-rw-r--r-- | cpp/src/Ice/UdpTransceiver.cpp | 6 | ||||
-rw-r--r-- | cpp/src/Ice/winrt/StreamTransceiver.cpp | 14 |
6 files changed, 86 insertions, 58 deletions
diff --git a/cpp/src/Ice/Network.h b/cpp/src/Ice/Network.h index f90e3d6da61..32182784a6d 100644 --- a/cpp/src/Ice/Network.h +++ b/cpp/src/Ice/Network.h @@ -152,6 +152,16 @@ enum SocketOperation }; // +// On WinRT, wait only for read to return, on IOCP/Win32 wait for +// both pending read and write operations to complete (#ICE-6695). +// +#if defined(ICE_OS_WINRT) +const int SocketOperationWaitForClose = 1; +#elif defined(ICE_USE_IOCP) +const int SocketOperationWaitForClose = 3; +#endif + +// // AsyncInfo struct for Windows IOCP or WinRT holds the result of // asynchronous operations after it completed. // diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp index 39f411fb115..af4a5b07c52 100644 --- a/cpp/src/Ice/Selector.cpp +++ b/cpp/src/Ice/Selector.cpp @@ -40,11 +40,16 @@ Selector::destroy() void Selector::initialize(IceInternal::EventHandler* handler) { + EventHandlerPtr h = handler; handler->__incRef(); handler->getNativeInfo()->setCompletedHandler( - ref new SocketOperationCompletedHandler([=](int operation) + ref new SocketOperationCompletedHandler([=](int operation) { - completed(handler, static_cast<SocketOperation>(operation)); + // + // Use the reference counted handler to ensure it's not + // destroyed as long as the callback lambda exists. + // + completed(h, static_cast<SocketOperation>(operation)); })); } @@ -56,12 +61,12 @@ Selector::update(IceInternal::EventHandler* handler, SocketOperation remove, Soc if(add & SocketOperationRead && !(handler->_pending & SocketOperationRead)) { handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationRead); - completed(handler, SocketOperationRead); // Start an asynchrnous read + completed(handler, SocketOperationRead); // Start an asynchrnous read } else if(add & SocketOperationWrite && !(handler->_pending & SocketOperationWrite)) { handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationWrite); - completed(handler, SocketOperationWrite); // Start an asynchrnous write + completed(handler, SocketOperationWrite); // Start an asynchrnous write } } @@ -69,10 +74,11 @@ void Selector::finish(IceInternal::EventHandler* handler) { handler->_registered = SocketOperationNone; + handler->_finish = false; // Ensures that finished() is only called once on the event handler. handler->__decRef(); } -IceInternal::EventHandler* +IceInternal::EventHandlerPtr Selector::getNextHandler(SocketOperation& status, int timeout) { Lock lock(*this); @@ -83,7 +89,7 @@ Selector::getNextHandler(SocketOperation& status, int timeout) timedWait(IceUtil::Time::seconds(timeout)); if(_events.empty()) { - throw SelectorTimeoutException(); + throw SelectorTimeoutException(); } } else @@ -92,15 +98,15 @@ Selector::getNextHandler(SocketOperation& status, int timeout) } } assert(!_events.empty()); - IceInternal::EventHandler* handler = _events.front().handler; + IceInternal::EventHandlerPtr handler = _events.front().handler; const SelectEvent& event = _events.front(); status = event.status; _events.pop_front(); return handler; } -void -Selector::completed(IceInternal::EventHandler* handler, SocketOperation op) +void +Selector::completed(const IceInternal::EventHandlerPtr& handler, SocketOperation op) { Lock lock(*this); _events.push_back(SelectEvent(handler, op)); @@ -316,7 +322,7 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation if(remove & SocketOperationRead) { struct kevent ev; - EV_SET(&ev, fd, EVFILT_READ, EV_DELETE, 0, 0, handler); + EV_SET(&ev, fd, EVFILT_READ, EV_DELETE, 0, 0, handler); _changes.push_back(ev); } if(remove & SocketOperationWrite) @@ -328,14 +334,14 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation if(add & SocketOperationRead) { struct kevent ev; - EV_SET(&ev, fd, EVFILT_READ, EV_ADD | (handler->_disabled & SocketOperationRead ? EV_DISABLE : 0), 0, 0, + EV_SET(&ev, fd, EVFILT_READ, EV_ADD | (handler->_disabled & SocketOperationRead ? EV_DISABLE : 0), 0, 0, handler); _changes.push_back(ev); } if(add & SocketOperationWrite) { struct kevent ev; - EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | (handler->_disabled & SocketOperationWrite ? EV_DISABLE : 0), 0, 0, + EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | (handler->_disabled & SocketOperationWrite ? EV_DISABLE : 0), 0, 0, handler); _changes.push_back(ev); } @@ -391,7 +397,7 @@ Selector::disable(EventHandler* handler, SocketOperation status) return; } handler->_disabled = static_cast<SocketOperation>(handler->_disabled | status); - + if(handler->_registered & status) { SOCKET fd = handler->getNativeInfo()->fd(); @@ -431,7 +437,7 @@ Selector::finish(EventHandler* handler, bool closeNow) { // // Update selector now to remove the FD from the kqueue if - // we're going to close it now. This isn't necessary for + // we're going to close it now. This isn't necessary for // epoll since we always update the epoll FD immediately. // updateSelector(); @@ -441,7 +447,7 @@ Selector::finish(EventHandler* handler, bool closeNow) } #if defined(ICE_USE_KQUEUE) -void +void Selector::updateSelector() { int rs = kevent(_queueFd, &_changes[0], _changes.size(), 0, 0, 0); @@ -454,7 +460,7 @@ Selector::updateSelector() } #endif -void +void Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int timeout) { int ret = 0; @@ -505,9 +511,9 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti #if defined(ICE_USE_EPOLL) struct epoll_event& ev = _events[i]; p.first = reinterpret_cast<EventHandler*>(ev.data.ptr); - p.second = static_cast<SocketOperation>(((ev.events & (EPOLLIN | EPOLLERR)) ? - SocketOperationRead : SocketOperationNone) | - ((ev.events & (EPOLLOUT | EPOLLERR)) ? + p.second = static_cast<SocketOperation>(((ev.events & (EPOLLIN | EPOLLERR)) ? + SocketOperationRead : SocketOperationNone) | + ((ev.events & (EPOLLOUT | EPOLLERR)) ? SocketOperationWrite : SocketOperationNone)); #else struct kevent& ev = _events[i]; @@ -546,7 +552,7 @@ void eventHandlerSocketCallback(CFSocketRef, CFSocketCallBackType callbackType, } else if(callbackType == kCFSocketConnectCallBack) { - reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationConnect, + reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationConnect, d ? *reinterpret_cast<const SInt32*>(d) : 0); } } @@ -558,7 +564,7 @@ public: SelectorHelperThread(Selector& selector) : _selector(selector) { } - + virtual void run() { _selector.run(); @@ -590,8 +596,8 @@ toCFCallbacks(SocketOperation op) } -EventHandlerWrapper::EventHandlerWrapper(const EventHandlerPtr& handler, Selector& selector) : - _handler(handler), +EventHandlerWrapper::EventHandlerWrapper(const EventHandlerPtr& handler, Selector& selector) : + _handler(handler), _nativeInfo(StreamNativeInfoPtr::dynamicCast(handler->getNativeInfo())), _selector(selector), _ready(SocketOperationNone), @@ -601,14 +607,14 @@ EventHandlerWrapper::EventHandlerWrapper(const EventHandlerPtr& handler, Selecto { SOCKET fd = handler->getNativeInfo()->fd(); CFSocketContext ctx = { 0, this, 0, 0, 0 }; - _socket = CFSocketCreateWithNative(kCFAllocatorDefault, + _socket = CFSocketCreateWithNative(kCFAllocatorDefault, fd, - kCFSocketReadCallBack | - kCFSocketWriteCallBack | + kCFSocketReadCallBack | + kCFSocketWriteCallBack | kCFSocketConnectCallBack, - eventHandlerSocketCallback, + eventHandlerSocketCallback, &ctx); - + // Disable automatic re-enabling of callbacks and closing of the native socket. CFSocketSetSocketFlags(_socket, 0); CFSocketDisableCallBacks(_socket, kCFSocketReadCallBack | kCFSocketWriteCallBack | kCFSocketConnectCallBack); @@ -666,7 +672,7 @@ EventHandlerWrapper::updateRunLoop() { _nativeInfo->unregisterFromRunLoop(SocketOperationWrite, false); } - + if(!(op & (SocketOperationRead | SocketOperationConnect)) || _ready & SocketOperationRead) { _nativeInfo->unregisterFromRunLoop(SocketOperationRead, false); @@ -698,7 +704,7 @@ EventHandlerWrapper::ready(SocketOperation op, int error) // // Unregister the stream from the runloop as soon as we got the callback. This is // required to allow thread pool thread to perform read/write operations on the - // stream (which can't be used from another thread than the run loop thread if + // stream (which can't be used from another thread than the run loop thread if // it's registered with a run loop). // op = _nativeInfo->unregisterFromRunLoop(op, error != 0); @@ -731,7 +737,7 @@ EventHandlerWrapper::checkReady() if(_ready & _handler->_registered) { _selector.addReadyHandler(this); - } + } } SocketOperation @@ -748,7 +754,7 @@ EventHandlerWrapper::update(SocketOperation remove, SocketOperation add) { SocketOperation previous = _handler->_registered; _handler->_registered = static_cast<SocketOperation>(_handler->_registered & ~remove); - _handler->_registered = static_cast<SocketOperation>(_handler->_registered | add); + _handler->_registered = static_cast<SocketOperation>(_handler->_registered | add); if(previous == _handler->_registered) { return false; @@ -804,7 +810,7 @@ Selector::destroy() { CFRunLoopSourceSignal(_source); CFRunLoopWakeUp(_runLoop); - + wait(); } @@ -825,7 +831,7 @@ Selector::initialize(EventHandler* handler) _wrappers[handler] = new EventHandlerWrapper(handler, *this); } -void +void Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation add) { Lock sync(*this); @@ -838,7 +844,7 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation } } -void +void Selector::enable(EventHandler* handler, SocketOperation op) { Lock sync(*this); @@ -878,7 +884,7 @@ Selector::finish(EventHandler* handler, bool closeNow) notify(); return closeNow; } - + void Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handlers, int timeout) { @@ -902,7 +908,7 @@ Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handle // // Wait for handlers to be ready. - // + // handlers.clear(); while(_selectedHandlers.empty()) { @@ -910,7 +916,7 @@ Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handle { CFRunLoopSourceSignal(_source); CFRunLoopWakeUp(_runLoop); - + wait(); } @@ -966,7 +972,7 @@ Selector::processInterrupt() } } -void +void Selector::ready(EventHandlerWrapper* wrapper, SocketOperation op, int error) { Lock sync(*this); @@ -984,7 +990,7 @@ Selector::addReadyHandler(EventHandlerWrapper* wrapper) } } -void +void Selector::run() { { @@ -1087,7 +1093,7 @@ Selector::disable(EventHandler* handler, SocketOperation status) return; } handler->_disabled = static_cast<SocketOperation>(handler->_disabled | status); - + if(handler->_registered & status) { updateImpl(handler); @@ -1125,7 +1131,7 @@ Selector::startSelect() { continue; } - + Ice::SocketException ex(__FILE__, __LINE__); ex.error = IceInternal::getSocketErrno(); throw ex; @@ -1148,7 +1154,7 @@ Selector::finishSelect() _selecting = false; } -void +void Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int timeout) { int ret = 0; @@ -1230,7 +1236,7 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti } if(fd == _fdIntrRead) // Interrupted, we have to process the interrupt before returning any handlers - { + { handlers.clear(); return; } @@ -1292,7 +1298,7 @@ Selector::updateImpl(EventHandler* handler) { continue; } - + Ice::SocketException ex(__FILE__, __LINE__); ex.error = IceInternal::getSocketErrno(); throw ex; diff --git a/cpp/src/Ice/Selector.h b/cpp/src/Ice/Selector.h index 7ca36dee887..e890db6e2bf 100644 --- a/cpp/src/Ice/Selector.h +++ b/cpp/src/Ice/Selector.h @@ -61,11 +61,11 @@ class SelectorTimeoutException struct SelectEvent { - SelectEvent(EventHandler* handler, SocketOperation status) : handler(handler), status(status) + SelectEvent(const EventHandlerPtr& handler, SocketOperation status) : handler(handler), status(status) { } - EventHandler* handler; + EventHandlerPtr handler; SocketOperation status; }; @@ -81,9 +81,9 @@ public: void update(EventHandler*, SocketOperation, SocketOperation); void finish(EventHandler*); - EventHandler* getNextHandler(SocketOperation&, int); + EventHandlerPtr getNextHandler(SocketOperation&, int); - void completed(EventHandler*, SocketOperation); + void completed(const EventHandlerPtr&, SocketOperation); private: diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index f7c25cc20f4..7ff3f57db4b 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -597,7 +597,7 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow) return closeNow; #else // If there are no pending asynchronous operations, we can call finish on the handler now. - if(!handler->_pending) + if(!(handler->_pending & SocketOperationWaitForClose)) { _workQueue->queue(new FinishedWorkItem(handler, false)); _selector.finish(handler.get()); @@ -1103,7 +1103,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) if(!current._handler->finishAsync(current.operation)) // Returns false if the handler is finished. { current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); - if(!current._handler->_pending && current._handler->_finish) + if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish) { _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); @@ -1117,7 +1117,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) if(!current._handler->startAsync(current.operation)) { current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); - if(!current._handler->_pending && current._handler->_finish) + if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish) { _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); @@ -1140,7 +1140,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) else { current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); - if(!current._handler->_pending && current._handler->_finish) + if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish) { _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); @@ -1170,7 +1170,7 @@ IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current) current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); } - if(!current._handler->_pending && current._handler->_finish) + if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish) { // There are no more pending async operations, it's time to call finish. _workQueue->queue(new FinishedWorkItem(current._handler, false)); diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp index b0392ad9cfd..cabe30f8cdf 100644 --- a/cpp/src/Ice/UdpTransceiver.cpp +++ b/cpp/src/Ice/UdpTransceiver.cpp @@ -952,10 +952,11 @@ IceInternal::UdpTransceiver::UdpTransceiver(const ProtocolInstancePtr& instance, } #else DatagramSocket^ socket = safe_cast<DatagramSocket^>(_fd); + IceUtil::Handle<UdpTransceiver> self(this); socket->MessageReceived += ref new TypedEventHandler<DatagramSocket^, DatagramSocketMessageReceivedEventArgs^>( [=](DatagramSocket^ fd, DatagramSocketMessageReceivedEventArgs^ args) { - this->appendMessage(args); + self->appendMessage(args); }); #endif @@ -999,10 +1000,11 @@ IceInternal::UdpTransceiver::UdpTransceiver(const UdpEndpointIPtr& endpoint, con _mcastAddr.saStorage.ss_family = AF_UNSPEC; #else DatagramSocket^ socket = safe_cast<DatagramSocket^>(_fd); + IceUtil::Handle<UdpTransceiver> self(this); socket->MessageReceived += ref new TypedEventHandler<DatagramSocket^, DatagramSocketMessageReceivedEventArgs^>( [=](DatagramSocket^ fd, DatagramSocketMessageReceivedEventArgs^ args) { - this->appendMessage(args); + self->appendMessage(args); }); #endif } diff --git a/cpp/src/Ice/winrt/StreamTransceiver.cpp b/cpp/src/Ice/winrt/StreamTransceiver.cpp index 54a29035feb..e0a7acf6b9a 100644 --- a/cpp/src/Ice/winrt/StreamTransceiver.cpp +++ b/cpp/src/Ice/winrt/StreamTransceiver.cpp @@ -99,6 +99,11 @@ void IceInternal::StreamTransceiver::close() { assert(_fd != INVALID_SOCKET); + + _completedHandler = nullptr; + _readOperationCompletedHandler = nullptr; + _writeOperationCompletedHandler = nullptr; + try { closeSocket(_fd); @@ -146,6 +151,7 @@ IceInternal::StreamTransceiver::startWrite(Buffer& buf) if(!checkIfErrorOrCompleted(SocketOperationConnect, action)) { + SocketOperationCompletedHandler^ completed = _completedHandler; action->Completed = ref new AsyncActionCompletedHandler( [=] (IAsyncAction^ info, Windows::Foundation::AsyncStatus status) { @@ -157,9 +163,8 @@ IceInternal::StreamTransceiver::startWrite(Buffer& buf) else { _write.count = 0; - _verified = true; } - _completedHandler(SocketOperationConnect); + completed(SocketOperationConnect); }); } } @@ -204,6 +209,11 @@ IceInternal::StreamTransceiver::finishWrite(Buffer& buf) { if(_state < StateConnected) { + if(_write.count == SOCKET_ERROR) + { + checkConnectErrorCode(__FILE__, __LINE__, _write.error, _connectAddr.host); + } + _verified = true; return; } |