summaryrefslogtreecommitdiff
path: root/cpp/src/Ice
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2015-08-11 10:42:04 +0200
committerBenoit Foucher <benoit@zeroc.com>2015-08-11 10:42:04 +0200
commit12f8959ad8d0843a55dd309f087dd1f8a5dda88f (patch)
tree5186165ce0e0b4bb432a9e70314e597563bb8c56 /cpp/src/Ice
parentUpdates branch description (diff)
downloadice-12f8959ad8d0843a55dd309f087dd1f8a5dda88f.tar.bz2
ice-12f8959ad8d0843a55dd309f087dd1f8a5dda88f.tar.xz
ice-12f8959ad8d0843a55dd309f087dd1f8a5dda88f.zip
Fixed ICE-6695, Ice for WinRT hang on Windows 10
Diffstat (limited to 'cpp/src/Ice')
-rw-r--r--cpp/src/Ice/Network.h10
-rw-r--r--cpp/src/Ice/Selector.cpp96
-rw-r--r--cpp/src/Ice/Selector.h8
-rw-r--r--cpp/src/Ice/ThreadPool.cpp10
-rw-r--r--cpp/src/Ice/UdpTransceiver.cpp6
-rw-r--r--cpp/src/Ice/winrt/StreamTransceiver.cpp14
6 files changed, 86 insertions, 58 deletions
diff --git a/cpp/src/Ice/Network.h b/cpp/src/Ice/Network.h
index f90e3d6da61..32182784a6d 100644
--- a/cpp/src/Ice/Network.h
+++ b/cpp/src/Ice/Network.h
@@ -152,6 +152,16 @@ enum SocketOperation
};
//
+// On WinRT, wait only for read to return, on IOCP/Win32 wait for
+// both pending read and write operations to complete (#ICE-6695).
+//
+#if defined(ICE_OS_WINRT)
+const int SocketOperationWaitForClose = 1;
+#elif defined(ICE_USE_IOCP)
+const int SocketOperationWaitForClose = 3;
+#endif
+
+//
// AsyncInfo struct for Windows IOCP or WinRT holds the result of
// asynchronous operations after it completed.
//
diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp
index 39f411fb115..af4a5b07c52 100644
--- a/cpp/src/Ice/Selector.cpp
+++ b/cpp/src/Ice/Selector.cpp
@@ -40,11 +40,16 @@ Selector::destroy()
void
Selector::initialize(IceInternal::EventHandler* handler)
{
+ EventHandlerPtr h = handler;
handler->__incRef();
handler->getNativeInfo()->setCompletedHandler(
- ref new SocketOperationCompletedHandler([=](int operation)
+ ref new SocketOperationCompletedHandler([=](int operation)
{
- completed(handler, static_cast<SocketOperation>(operation));
+ //
+ // Use the reference counted handler to ensure it's not
+ // destroyed as long as the callback lambda exists.
+ //
+ completed(h, static_cast<SocketOperation>(operation));
}));
}
@@ -56,12 +61,12 @@ Selector::update(IceInternal::EventHandler* handler, SocketOperation remove, Soc
if(add & SocketOperationRead && !(handler->_pending & SocketOperationRead))
{
handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationRead);
- completed(handler, SocketOperationRead); // Start an asynchrnous read
+ completed(handler, SocketOperationRead); // Start an asynchrnous read
}
else if(add & SocketOperationWrite && !(handler->_pending & SocketOperationWrite))
{
handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationWrite);
- completed(handler, SocketOperationWrite); // Start an asynchrnous write
+ completed(handler, SocketOperationWrite); // Start an asynchrnous write
}
}
@@ -69,10 +74,11 @@ void
Selector::finish(IceInternal::EventHandler* handler)
{
handler->_registered = SocketOperationNone;
+ handler->_finish = false; // Ensures that finished() is only called once on the event handler.
handler->__decRef();
}
-IceInternal::EventHandler*
+IceInternal::EventHandlerPtr
Selector::getNextHandler(SocketOperation& status, int timeout)
{
Lock lock(*this);
@@ -83,7 +89,7 @@ Selector::getNextHandler(SocketOperation& status, int timeout)
timedWait(IceUtil::Time::seconds(timeout));
if(_events.empty())
{
- throw SelectorTimeoutException();
+ throw SelectorTimeoutException();
}
}
else
@@ -92,15 +98,15 @@ Selector::getNextHandler(SocketOperation& status, int timeout)
}
}
assert(!_events.empty());
- IceInternal::EventHandler* handler = _events.front().handler;
+ IceInternal::EventHandlerPtr handler = _events.front().handler;
const SelectEvent& event = _events.front();
status = event.status;
_events.pop_front();
return handler;
}
-void
-Selector::completed(IceInternal::EventHandler* handler, SocketOperation op)
+void
+Selector::completed(const IceInternal::EventHandlerPtr& handler, SocketOperation op)
{
Lock lock(*this);
_events.push_back(SelectEvent(handler, op));
@@ -316,7 +322,7 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation
if(remove & SocketOperationRead)
{
struct kevent ev;
- EV_SET(&ev, fd, EVFILT_READ, EV_DELETE, 0, 0, handler);
+ EV_SET(&ev, fd, EVFILT_READ, EV_DELETE, 0, 0, handler);
_changes.push_back(ev);
}
if(remove & SocketOperationWrite)
@@ -328,14 +334,14 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation
if(add & SocketOperationRead)
{
struct kevent ev;
- EV_SET(&ev, fd, EVFILT_READ, EV_ADD | (handler->_disabled & SocketOperationRead ? EV_DISABLE : 0), 0, 0,
+ EV_SET(&ev, fd, EVFILT_READ, EV_ADD | (handler->_disabled & SocketOperationRead ? EV_DISABLE : 0), 0, 0,
handler);
_changes.push_back(ev);
}
if(add & SocketOperationWrite)
{
struct kevent ev;
- EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | (handler->_disabled & SocketOperationWrite ? EV_DISABLE : 0), 0, 0,
+ EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | (handler->_disabled & SocketOperationWrite ? EV_DISABLE : 0), 0, 0,
handler);
_changes.push_back(ev);
}
@@ -391,7 +397,7 @@ Selector::disable(EventHandler* handler, SocketOperation status)
return;
}
handler->_disabled = static_cast<SocketOperation>(handler->_disabled | status);
-
+
if(handler->_registered & status)
{
SOCKET fd = handler->getNativeInfo()->fd();
@@ -431,7 +437,7 @@ Selector::finish(EventHandler* handler, bool closeNow)
{
//
// Update selector now to remove the FD from the kqueue if
- // we're going to close it now. This isn't necessary for
+ // we're going to close it now. This isn't necessary for
// epoll since we always update the epoll FD immediately.
//
updateSelector();
@@ -441,7 +447,7 @@ Selector::finish(EventHandler* handler, bool closeNow)
}
#if defined(ICE_USE_KQUEUE)
-void
+void
Selector::updateSelector()
{
int rs = kevent(_queueFd, &_changes[0], _changes.size(), 0, 0, 0);
@@ -454,7 +460,7 @@ Selector::updateSelector()
}
#endif
-void
+void
Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int timeout)
{
int ret = 0;
@@ -505,9 +511,9 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti
#if defined(ICE_USE_EPOLL)
struct epoll_event& ev = _events[i];
p.first = reinterpret_cast<EventHandler*>(ev.data.ptr);
- p.second = static_cast<SocketOperation>(((ev.events & (EPOLLIN | EPOLLERR)) ?
- SocketOperationRead : SocketOperationNone) |
- ((ev.events & (EPOLLOUT | EPOLLERR)) ?
+ p.second = static_cast<SocketOperation>(((ev.events & (EPOLLIN | EPOLLERR)) ?
+ SocketOperationRead : SocketOperationNone) |
+ ((ev.events & (EPOLLOUT | EPOLLERR)) ?
SocketOperationWrite : SocketOperationNone));
#else
struct kevent& ev = _events[i];
@@ -546,7 +552,7 @@ void eventHandlerSocketCallback(CFSocketRef, CFSocketCallBackType callbackType,
}
else if(callbackType == kCFSocketConnectCallBack)
{
- reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationConnect,
+ reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationConnect,
d ? *reinterpret_cast<const SInt32*>(d) : 0);
}
}
@@ -558,7 +564,7 @@ public:
SelectorHelperThread(Selector& selector) : _selector(selector)
{
}
-
+
virtual void run()
{
_selector.run();
@@ -590,8 +596,8 @@ toCFCallbacks(SocketOperation op)
}
-EventHandlerWrapper::EventHandlerWrapper(const EventHandlerPtr& handler, Selector& selector) :
- _handler(handler),
+EventHandlerWrapper::EventHandlerWrapper(const EventHandlerPtr& handler, Selector& selector) :
+ _handler(handler),
_nativeInfo(StreamNativeInfoPtr::dynamicCast(handler->getNativeInfo())),
_selector(selector),
_ready(SocketOperationNone),
@@ -601,14 +607,14 @@ EventHandlerWrapper::EventHandlerWrapper(const EventHandlerPtr& handler, Selecto
{
SOCKET fd = handler->getNativeInfo()->fd();
CFSocketContext ctx = { 0, this, 0, 0, 0 };
- _socket = CFSocketCreateWithNative(kCFAllocatorDefault,
+ _socket = CFSocketCreateWithNative(kCFAllocatorDefault,
fd,
- kCFSocketReadCallBack |
- kCFSocketWriteCallBack |
+ kCFSocketReadCallBack |
+ kCFSocketWriteCallBack |
kCFSocketConnectCallBack,
- eventHandlerSocketCallback,
+ eventHandlerSocketCallback,
&ctx);
-
+
// Disable automatic re-enabling of callbacks and closing of the native socket.
CFSocketSetSocketFlags(_socket, 0);
CFSocketDisableCallBacks(_socket, kCFSocketReadCallBack | kCFSocketWriteCallBack | kCFSocketConnectCallBack);
@@ -666,7 +672,7 @@ EventHandlerWrapper::updateRunLoop()
{
_nativeInfo->unregisterFromRunLoop(SocketOperationWrite, false);
}
-
+
if(!(op & (SocketOperationRead | SocketOperationConnect)) || _ready & SocketOperationRead)
{
_nativeInfo->unregisterFromRunLoop(SocketOperationRead, false);
@@ -698,7 +704,7 @@ EventHandlerWrapper::ready(SocketOperation op, int error)
//
// Unregister the stream from the runloop as soon as we got the callback. This is
// required to allow thread pool thread to perform read/write operations on the
- // stream (which can't be used from another thread than the run loop thread if
+ // stream (which can't be used from another thread than the run loop thread if
// it's registered with a run loop).
//
op = _nativeInfo->unregisterFromRunLoop(op, error != 0);
@@ -731,7 +737,7 @@ EventHandlerWrapper::checkReady()
if(_ready & _handler->_registered)
{
_selector.addReadyHandler(this);
- }
+ }
}
SocketOperation
@@ -748,7 +754,7 @@ EventHandlerWrapper::update(SocketOperation remove, SocketOperation add)
{
SocketOperation previous = _handler->_registered;
_handler->_registered = static_cast<SocketOperation>(_handler->_registered & ~remove);
- _handler->_registered = static_cast<SocketOperation>(_handler->_registered | add);
+ _handler->_registered = static_cast<SocketOperation>(_handler->_registered | add);
if(previous == _handler->_registered)
{
return false;
@@ -804,7 +810,7 @@ Selector::destroy()
{
CFRunLoopSourceSignal(_source);
CFRunLoopWakeUp(_runLoop);
-
+
wait();
}
@@ -825,7 +831,7 @@ Selector::initialize(EventHandler* handler)
_wrappers[handler] = new EventHandlerWrapper(handler, *this);
}
-void
+void
Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation add)
{
Lock sync(*this);
@@ -838,7 +844,7 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation
}
}
-void
+void
Selector::enable(EventHandler* handler, SocketOperation op)
{
Lock sync(*this);
@@ -878,7 +884,7 @@ Selector::finish(EventHandler* handler, bool closeNow)
notify();
return closeNow;
}
-
+
void
Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handlers, int timeout)
{
@@ -902,7 +908,7 @@ Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handle
//
// Wait for handlers to be ready.
- //
+ //
handlers.clear();
while(_selectedHandlers.empty())
{
@@ -910,7 +916,7 @@ Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handle
{
CFRunLoopSourceSignal(_source);
CFRunLoopWakeUp(_runLoop);
-
+
wait();
}
@@ -966,7 +972,7 @@ Selector::processInterrupt()
}
}
-void
+void
Selector::ready(EventHandlerWrapper* wrapper, SocketOperation op, int error)
{
Lock sync(*this);
@@ -984,7 +990,7 @@ Selector::addReadyHandler(EventHandlerWrapper* wrapper)
}
}
-void
+void
Selector::run()
{
{
@@ -1087,7 +1093,7 @@ Selector::disable(EventHandler* handler, SocketOperation status)
return;
}
handler->_disabled = static_cast<SocketOperation>(handler->_disabled | status);
-
+
if(handler->_registered & status)
{
updateImpl(handler);
@@ -1125,7 +1131,7 @@ Selector::startSelect()
{
continue;
}
-
+
Ice::SocketException ex(__FILE__, __LINE__);
ex.error = IceInternal::getSocketErrno();
throw ex;
@@ -1148,7 +1154,7 @@ Selector::finishSelect()
_selecting = false;
}
-void
+void
Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int timeout)
{
int ret = 0;
@@ -1230,7 +1236,7 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti
}
if(fd == _fdIntrRead) // Interrupted, we have to process the interrupt before returning any handlers
- {
+ {
handlers.clear();
return;
}
@@ -1292,7 +1298,7 @@ Selector::updateImpl(EventHandler* handler)
{
continue;
}
-
+
Ice::SocketException ex(__FILE__, __LINE__);
ex.error = IceInternal::getSocketErrno();
throw ex;
diff --git a/cpp/src/Ice/Selector.h b/cpp/src/Ice/Selector.h
index 7ca36dee887..e890db6e2bf 100644
--- a/cpp/src/Ice/Selector.h
+++ b/cpp/src/Ice/Selector.h
@@ -61,11 +61,11 @@ class SelectorTimeoutException
struct SelectEvent
{
- SelectEvent(EventHandler* handler, SocketOperation status) : handler(handler), status(status)
+ SelectEvent(const EventHandlerPtr& handler, SocketOperation status) : handler(handler), status(status)
{
}
- EventHandler* handler;
+ EventHandlerPtr handler;
SocketOperation status;
};
@@ -81,9 +81,9 @@ public:
void update(EventHandler*, SocketOperation, SocketOperation);
void finish(EventHandler*);
- EventHandler* getNextHandler(SocketOperation&, int);
+ EventHandlerPtr getNextHandler(SocketOperation&, int);
- void completed(EventHandler*, SocketOperation);
+ void completed(const EventHandlerPtr&, SocketOperation);
private:
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index f7c25cc20f4..7ff3f57db4b 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -597,7 +597,7 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow)
return closeNow;
#else
// If there are no pending asynchronous operations, we can call finish on the handler now.
- if(!handler->_pending)
+ if(!(handler->_pending & SocketOperationWaitForClose))
{
_workQueue->queue(new FinishedWorkItem(handler, false));
_selector.finish(handler.get());
@@ -1103,7 +1103,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
if(!current._handler->finishAsync(current.operation)) // Returns false if the handler is finished.
{
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
- if(!current._handler->_pending && current._handler->_finish)
+ if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish)
{
_workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
@@ -1117,7 +1117,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
if(!current._handler->startAsync(current.operation))
{
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
- if(!current._handler->_pending && current._handler->_finish)
+ if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish)
{
_workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
@@ -1140,7 +1140,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
else
{
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
- if(!current._handler->_pending && current._handler->_finish)
+ if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish)
{
_workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
@@ -1170,7 +1170,7 @@ IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current)
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
}
- if(!current._handler->_pending && current._handler->_finish)
+ if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish)
{
// There are no more pending async operations, it's time to call finish.
_workQueue->queue(new FinishedWorkItem(current._handler, false));
diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp
index b0392ad9cfd..cabe30f8cdf 100644
--- a/cpp/src/Ice/UdpTransceiver.cpp
+++ b/cpp/src/Ice/UdpTransceiver.cpp
@@ -952,10 +952,11 @@ IceInternal::UdpTransceiver::UdpTransceiver(const ProtocolInstancePtr& instance,
}
#else
DatagramSocket^ socket = safe_cast<DatagramSocket^>(_fd);
+ IceUtil::Handle<UdpTransceiver> self(this);
socket->MessageReceived += ref new TypedEventHandler<DatagramSocket^, DatagramSocketMessageReceivedEventArgs^>(
[=](DatagramSocket^ fd, DatagramSocketMessageReceivedEventArgs^ args)
{
- this->appendMessage(args);
+ self->appendMessage(args);
});
#endif
@@ -999,10 +1000,11 @@ IceInternal::UdpTransceiver::UdpTransceiver(const UdpEndpointIPtr& endpoint, con
_mcastAddr.saStorage.ss_family = AF_UNSPEC;
#else
DatagramSocket^ socket = safe_cast<DatagramSocket^>(_fd);
+ IceUtil::Handle<UdpTransceiver> self(this);
socket->MessageReceived += ref new TypedEventHandler<DatagramSocket^, DatagramSocketMessageReceivedEventArgs^>(
[=](DatagramSocket^ fd, DatagramSocketMessageReceivedEventArgs^ args)
{
- this->appendMessage(args);
+ self->appendMessage(args);
});
#endif
}
diff --git a/cpp/src/Ice/winrt/StreamTransceiver.cpp b/cpp/src/Ice/winrt/StreamTransceiver.cpp
index 54a29035feb..e0a7acf6b9a 100644
--- a/cpp/src/Ice/winrt/StreamTransceiver.cpp
+++ b/cpp/src/Ice/winrt/StreamTransceiver.cpp
@@ -99,6 +99,11 @@ void
IceInternal::StreamTransceiver::close()
{
assert(_fd != INVALID_SOCKET);
+
+ _completedHandler = nullptr;
+ _readOperationCompletedHandler = nullptr;
+ _writeOperationCompletedHandler = nullptr;
+
try
{
closeSocket(_fd);
@@ -146,6 +151,7 @@ IceInternal::StreamTransceiver::startWrite(Buffer& buf)
if(!checkIfErrorOrCompleted(SocketOperationConnect, action))
{
+ SocketOperationCompletedHandler^ completed = _completedHandler;
action->Completed = ref new AsyncActionCompletedHandler(
[=] (IAsyncAction^ info, Windows::Foundation::AsyncStatus status)
{
@@ -157,9 +163,8 @@ IceInternal::StreamTransceiver::startWrite(Buffer& buf)
else
{
_write.count = 0;
- _verified = true;
}
- _completedHandler(SocketOperationConnect);
+ completed(SocketOperationConnect);
});
}
}
@@ -204,6 +209,11 @@ IceInternal::StreamTransceiver::finishWrite(Buffer& buf)
{
if(_state < StateConnected)
{
+ if(_write.count == SOCKET_ERROR)
+ {
+ checkConnectErrorCode(__FILE__, __LINE__, _write.error, _connectAddr.host);
+ }
+ _verified = true;
return;
}