summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2015-10-09 15:00:57 +0200
committerBenoit Foucher <benoit@zeroc.com>2015-10-09 15:00:57 +0200
commit20b6c0ccb95118ffc685826904a8edd06a38ac1b (patch)
tree1b389964fa35ca9de23c548120ecedcc9d82074c /cpp/src
parentMerge branch '3.6' (diff)
downloadice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.tar.bz2
ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.tar.xz
ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.zip
Added ready callback to allow transports to signal readiness to the thread pool
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp1
-rw-r--r--cpp/src/Ice/ConnectionI.cpp52
-rw-r--r--cpp/src/Ice/EventHandler.cpp4
-rw-r--r--cpp/src/Ice/EventHandler.h8
-rw-r--r--cpp/src/Ice/Network.cpp8
-rw-r--r--cpp/src/Ice/Network.h21
-rw-r--r--cpp/src/Ice/Selector.cpp1479
-rw-r--r--cpp/src/Ice/Selector.h234
-rw-r--r--cpp/src/Ice/TcpTransceiver.cpp6
-rw-r--r--cpp/src/Ice/TcpTransceiver.h6
-rw-r--r--cpp/src/Ice/ThreadPool.cpp322
-rw-r--r--cpp/src/Ice/ThreadPool.h54
-rw-r--r--cpp/src/Ice/Transceiver.h6
-rw-r--r--cpp/src/Ice/UdpTransceiver.cpp6
-rw-r--r--cpp/src/Ice/UdpTransceiver.h6
-rw-r--r--cpp/src/Ice/WSTransceiver.cpp45
-rw-r--r--cpp/src/Ice/WSTransceiver.h6
-rw-r--r--cpp/src/Ice/winrt/StreamTransceiver.cpp6
-rw-r--r--cpp/src/Ice/winrt/StreamTransceiver.h6
-rw-r--r--cpp/src/IceGrid/Makefile2
-rw-r--r--cpp/src/IceSSL/OpenSSLTransceiverI.cpp7
-rw-r--r--cpp/src/IceSSL/OpenSSLTransceiverI.h4
-rw-r--r--cpp/src/IceSSL/SChannelTransceiverI.cpp19
-rw-r--r--cpp/src/IceSSL/SChannelTransceiverI.h6
-rw-r--r--cpp/src/IceSSL/SecureTransportTransceiverI.cpp11
-rw-r--r--cpp/src/IceSSL/SecureTransportTransceiverI.h6
26 files changed, 1205 insertions, 1126 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index a0faafc6145..3dbef2a5a6d 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -1413,7 +1413,6 @@ string
IceInternal::IncomingConnectionFactory::toString() const
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
if(_transceiver)
{
return _transceiver->toString();
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 49707d9e124..4a078bcc1f8 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -1372,19 +1372,12 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
}
else if(operation & SocketOperationRead)
{
- if(!_hasMoreData)
+ if(_observer && !_readHeader)
{
- if(_observer && !_readHeader)
- {
- _observer.startRead(_readStream);
- }
-
- _transceiver->startRead(_readStream);
- }
- else
- {
- _transceiver->getNativeInfo()->completed(IceInternal::SocketOperationRead);
+ _observer.startRead(_readStream);
}
+
+ _transceiver->startRead(_readStream);
}
}
catch(const Ice::LocalException& ex)
@@ -1422,29 +1415,26 @@ Ice::ConnectionI::finishAsync(SocketOperation operation)
}
else if(operation & SocketOperationRead)
{
- if(!_hasMoreData)
+ Buffer::Container::iterator start = _readStream.i;
+ _transceiver->finishRead(_readStream);
+ if(_instance->traceLevels()->network >= 3 && _readStream.i != start)
{
- Buffer::Container::iterator start = _readStream.i;
- _transceiver->finishRead(_readStream, _hasMoreData);
- if(_instance->traceLevels()->network >= 3 && _readStream.i != start)
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
+ out << "received ";
+ if(_endpoint->datagram())
{
- Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
- out << "received ";
- if(_endpoint->datagram())
- {
- out << _readStream.b.size();
- }
- else
- {
- out << (_readStream.i - start) << " of " << (_readStream.b.end() - start);
- }
- out << " bytes via " << _endpoint->protocol() << "\n" << toString();
+ out << _readStream.b.size();
}
-
- if(_observer && !_readHeader)
+ else
{
- _observer.finishRead(_readStream);
+ out << (_readStream.i - start) << " of " << (_readStream.b.end() - start);
}
+ out << " bytes via " << _endpoint->protocol() << "\n" << toString();
+ }
+
+ if(_observer && !_readHeader)
+ {
+ _observer.finishRead(_readStream);
}
}
}
@@ -2517,7 +2507,7 @@ Ice::ConnectionI::heartbeat()
bool
Ice::ConnectionI::initialize(SocketOperation operation)
{
- SocketOperation s = _transceiver->initialize(_readStream, _writeStream, _hasMoreData);
+ SocketOperation s = _transceiver->initialize(_readStream, _writeStream);
if(s != SocketOperationNone)
{
scheduleTimeout(s);
@@ -3548,7 +3538,7 @@ SocketOperation
ConnectionI::read(Buffer& buf)
{
Buffer::Container::iterator start = buf.i;
- SocketOperation op = _transceiver->read(buf, _hasMoreData);
+ SocketOperation op = _transceiver->read(buf);
if(_instance->traceLevels()->network >= 3 && buf.i != start)
{
Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
diff --git a/cpp/src/Ice/EventHandler.cpp b/cpp/src/Ice/EventHandler.cpp
index 8ba48464184..6edea2d9307 100644
--- a/cpp/src/Ice/EventHandler.cpp
+++ b/cpp/src/Ice/EventHandler.cpp
@@ -18,14 +18,14 @@ IceUtil::Shared* IceInternal::upCast(EventHandler* p) { return p; }
IceInternal::EventHandler::EventHandler() :
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- _ready(SocketOperationNone),
_pending(SocketOperationNone),
_started(SocketOperationNone),
+ _completed(SocketOperationNone),
_finish(false),
#else
_disabled(SocketOperationNone),
#endif
- _hasMoreData(false),
+ _ready(SocketOperationNone),
_registered(SocketOperationNone)
{
}
diff --git a/cpp/src/Ice/EventHandler.h b/cpp/src/Ice/EventHandler.h
index afbd608b961..a47a1899d10 100644
--- a/cpp/src/Ice/EventHandler.h
+++ b/cpp/src/Ice/EventHandler.h
@@ -20,7 +20,7 @@
namespace IceInternal
{
-class ICE_API EventHandler : virtual public ::IceUtil::Shared
+class ICE_API EventHandler : virtual public ::Ice::LocalObject
{
public:
@@ -53,19 +53,19 @@ public:
virtual NativeInfoPtr getNativeInfo() = 0;
protected:
-
+
EventHandler();
virtual ~EventHandler();
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- SocketOperation _ready;
SocketOperation _pending;
SocketOperation _started;
+ SocketOperation _completed;
bool _finish;
#else
SocketOperation _disabled;
#endif
- bool _hasMoreData;
+ SocketOperation _ready;
SocketOperation _registered;
friend class ThreadPool;
diff --git a/cpp/src/Ice/Network.cpp b/cpp/src/Ice/Network.cpp
index c151c4b9af6..cc7cd41a812 100644
--- a/cpp/src/Ice/Network.cpp
+++ b/cpp/src/Ice/Network.cpp
@@ -762,7 +762,14 @@ getAddressStorageSize(const Address& addr)
}
+void
+NativeInfo::setReadyCallback(const ReadyCallbackPtr& callback)
+{
+ _readyCallback = callback;
+}
+
#ifdef ICE_USE_IOCP
+
IceInternal::AsyncInfo::AsyncInfo(SocketOperation s)
{
ZeroMemory(this, sizeof(AsyncInfo));
@@ -786,6 +793,7 @@ IceInternal::NativeInfo::completed(SocketOperation operation)
throw ex;
}
}
+
#elif defined(ICE_OS_WINRT)
void
diff --git a/cpp/src/Ice/Network.h b/cpp/src/Ice/Network.h
index 32182784a6d..a398fa1209c 100644
--- a/cpp/src/Ice/Network.h
+++ b/cpp/src/Ice/Network.h
@@ -24,8 +24,8 @@
#include <Ice/ProtocolInstanceF.h>
#include <Ice/EndpointTypes.h>
-#ifdef ICE_OS_WINRT
-# include <Ice/EventHandlerF.h>
+#if defined(ICE_OS_WINRT)
+// Nothing to include
#elif defined(_WIN32)
# include <winsock2.h>
# include <ws2tcpip.h>
@@ -186,6 +186,14 @@ struct ICE_API AsyncInfo
delegate void SocketOperationCompletedHandler(int);
#endif
+class ICE_API ReadyCallback : virtual public ::IceUtil::Shared
+{
+public:
+
+ virtual void ready(SocketOperation, bool) = 0;
+};
+typedef IceUtil::Handle<ReadyCallback> ReadyCallbackPtr;
+
class ICE_API NativeInfo : virtual public IceUtil::Shared
{
public:
@@ -199,6 +207,14 @@ public:
return _fd;
}
+ void setReadyCallback(const ReadyCallbackPtr& callback);
+
+ void ready(SocketOperation operation, bool value)
+ {
+ assert(_readyCallback);
+ _readyCallback->ready(operation, value);
+ }
+
//
// This is implemented by transceiver and acceptor implementations.
//
@@ -214,6 +230,7 @@ public:
protected:
SOCKET _fd;
+ ReadyCallbackPtr _readyCallback;
#if defined(ICE_USE_IOCP)
HANDLE _handle;
diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp
index af4a5b07c52..e01f3d7f65a 100644
--- a/cpp/src/Ice/Selector.cpp
+++ b/cpp/src/Ice/Selector.cpp
@@ -23,97 +23,14 @@ using namespace std;
using namespace IceInternal;
#ifdef ICE_OS_WINRT
-using namespace Windows::Foundation;
+//using namespace Windows::Foundation;
using namespace Windows::Storage::Streams;
using namespace Windows::Networking;
using namespace Windows::Networking::Sockets;
-Selector::Selector(const InstancePtr& instance) : _instance(instance)
-{
-}
-
-void
-Selector::destroy()
-{
-}
-
-void
-Selector::initialize(IceInternal::EventHandler* handler)
-{
- EventHandlerPtr h = handler;
- handler->__incRef();
- handler->getNativeInfo()->setCompletedHandler(
- ref new SocketOperationCompletedHandler([=](int operation)
- {
- //
- // Use the reference counted handler to ensure it's not
- // destroyed as long as the callback lambda exists.
- //
- completed(h, static_cast<SocketOperation>(operation));
- }));
-}
-
-void
-Selector::update(IceInternal::EventHandler* handler, SocketOperation remove, SocketOperation add)
-{
- handler->_registered = static_cast<SocketOperation>(handler->_registered & ~remove);
- handler->_registered = static_cast<SocketOperation>(handler->_registered | add);
- if(add & SocketOperationRead && !(handler->_pending & SocketOperationRead))
- {
- handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationRead);
- completed(handler, SocketOperationRead); // Start an asynchrnous read
- }
- else if(add & SocketOperationWrite && !(handler->_pending & SocketOperationWrite))
- {
- handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationWrite);
- completed(handler, SocketOperationWrite); // Start an asynchrnous write
- }
-}
-
-void
-Selector::finish(IceInternal::EventHandler* handler)
-{
- handler->_registered = SocketOperationNone;
- handler->_finish = false; // Ensures that finished() is only called once on the event handler.
- handler->__decRef();
-}
-
-IceInternal::EventHandlerPtr
-Selector::getNextHandler(SocketOperation& status, int timeout)
-{
- Lock lock(*this);
- while(_events.empty())
- {
- if(timeout > 0)
- {
- timedWait(IceUtil::Time::seconds(timeout));
- if(_events.empty())
- {
- throw SelectorTimeoutException();
- }
- }
- else
- {
- wait();
- }
- }
- assert(!_events.empty());
- IceInternal::EventHandlerPtr handler = _events.front().handler;
- const SelectEvent& event = _events.front();
- status = event.status;
- _events.pop_front();
- return handler;
-}
-
-void
-Selector::completed(const IceInternal::EventHandlerPtr& handler, SocketOperation op)
-{
- Lock lock(*this);
- _events.push_back(SelectEvent(handler, op));
- notify();
-}
+#endif
-#elif defined(ICE_USE_IOCP)
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
Selector::Selector(const InstancePtr& instance) : _instance(instance)
{
@@ -123,6 +40,7 @@ Selector::~Selector()
{
}
+#ifdef ICE_USE_IOCP
void
Selector::setup(int sizeIO)
{
@@ -134,16 +52,24 @@ Selector::setup(int sizeIO)
throw ex;
}
}
+#endif
void
Selector::destroy()
{
+#ifdef ICE_USE_IOCP
CloseHandle(_handle);
+#endif
}
void
Selector::initialize(EventHandler* handler)
{
+ if(!handler->getNativeInfo())
+ {
+ return;
+ }
+#ifdef ICE_USE_IOCP
HANDLE socket = reinterpret_cast<HANDLE>(handler->getNativeInfo()->fd());
if(CreateIoCompletionPort(socket, _handle, reinterpret_cast<ULONG_PTR>(handler), 0) == NULL)
{
@@ -153,6 +79,20 @@ Selector::initialize(EventHandler* handler)
}
handler->__incRef();
handler->getNativeInfo()->initialize(_handle, reinterpret_cast<ULONG_PTR>(handler));
+#else
+ EventHandlerPtr h = handler;
+ handler->__incRef();
+ handler->getNativeInfo()->setCompletedHandler(
+ ref new SocketOperationCompletedHandler(
+ [=](int operation)
+ {
+ //
+ // Use the reference counted handler to ensure it's not
+ // destroyed as long as the callback lambda exists.
+ //
+ completed(h.get(), static_cast<SocketOperation>(operation));
+ }));
+#endif
}
void
@@ -160,26 +100,15 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation
{
handler->_registered = static_cast<SocketOperation>(handler->_registered & ~remove);
handler->_registered = static_cast<SocketOperation>(handler->_registered | add);
- AsyncInfo* info = 0;
if(add & SocketOperationRead && !(handler->_pending & SocketOperationRead))
{
handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationRead);
- info = handler->getNativeInfo()->getAsyncInfo(SocketOperationRead);
+ completed(handler, SocketOperationRead); // Start an asynchrnous read
}
else if(add & SocketOperationWrite && !(handler->_pending & SocketOperationWrite))
{
handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationWrite);
- info = handler->getNativeInfo()->getAsyncInfo(SocketOperationWrite);
- }
-
- if(info)
- {
- if(!PostQueuedCompletionStatus(_handle, 0, reinterpret_cast<ULONG_PTR>(handler), info))
- {
- Ice::SocketException ex(__FILE__, __LINE__);
- ex.error = GetLastError();
- throw ex;
- }
+ completed(handler, SocketOperationWrite); // Start an asynchrnous write
}
}
@@ -187,12 +116,36 @@ void
Selector::finish(IceInternal::EventHandler* handler)
{
handler->_registered = SocketOperationNone;
+ handler->_finish = false; // Ensures that finished() is only called once on the event handler.
handler->__decRef();
}
+void
+Selector::ready(EventHandler* handler, SocketOperation status, bool value)
+{
+ if(((handler->_ready & status) != 0) == value)
+ {
+ return; // Nothing to do if ready state already correctly set.
+ }
+
+ if(value)
+ {
+ handler->_ready = static_cast<SocketOperation>(handler->_ready | status);
+ }
+ else
+ {
+ handler->_ready = static_cast<SocketOperation>(handler->_ready & ~status);
+ }
+}
+
EventHandler*
+#ifdef ICE_USE_IOCP
Selector::getNextHandler(SocketOperation& status, DWORD& count, int& error, int timeout)
+#else
+Selector::getNextHandler(SocketOperation& status, int timeout)
+#endif
{
+#ifdef ICE_USE_IOCP
ULONG_PTR key;
LPOVERLAPPED ol;
error = 0;
@@ -217,24 +170,86 @@ Selector::getNextHandler(SocketOperation& status, DWORD& count, int& error, int
}
}
AsyncInfo* info = static_cast<AsyncInfo*>(ol);
- status = info->status;
+ if(info)
+ {
+ status = info->status;
+ }
count = SOCKET_ERROR;
error = WSAGetLastError();
return reinterpret_cast<EventHandler*>(key);
}
- assert(ol);
AsyncInfo* info = static_cast<AsyncInfo*>(ol);
- status = info->status;
+ if(info)
+ {
+ status = info->status;
+ }
+ else
+ {
+ status = reinterpret_cast<EventHandler*>(key)->_ready;
+ }
return reinterpret_cast<EventHandler*>(key);
+#else
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
+ while(_events.empty())
+ {
+ if(timeout > 0)
+ {
+ _monitor.timedWait(IceUtil::Time::seconds(timeout));
+ if(_events.empty())
+ {
+ throw SelectorTimeoutException();
+ }
+ }
+ else
+ {
+ _monitor.wait();
+ }
+ }
+ assert(!_events.empty());
+ IceInternal::EventHandlerPtr handler = _events.front().handler;
+ const SelectEvent& event = _events.front();
+ status = event.status;
+ _events.pop_front();
+ return handler.get();
+#endif
}
-#elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL)
+void
+Selector::completed(EventHandler* handler, SocketOperation op)
+{
+#ifdef ICE_USE_IOCP
+ AsyncInfo* info = 0;
+ NativeInfoPtr nativeInfo = handler->getNativeInfo();
+ if(nativeInfo)
+ {
+ info = nativeInfo->getAsyncInfo(op);
+ }
+ if(!PostQueuedCompletionStatus(_handle, 0, reinterpret_cast<ULONG_PTR>(handler), info))
+ {
+ Ice::SocketException ex(__FILE__, __LINE__);
+ ex.error = GetLastError();
+ throw ex;
+ }
+#else
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
+ _events.push_back(SelectEvent(handler, op));
+ _monitor.notify();
+#endif
+}
-Selector::Selector(const InstancePtr& instance) : _instance(instance)
+#elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL) || defined(ICE_USE_SELECT) || defined(ICE_USE_POLL)
+
+Selector::Selector(const InstancePtr& instance) : _instance(instance), _interrupted(false)
{
- _events.resize(256);
+ SOCKET fds[2];
+ createPipe(fds);
+ _fdIntrRead = fds[0];
+ _fdIntrWrite = fds[1];
+ _selecting = false;
+
#if defined(ICE_USE_EPOLL)
+ _events.resize(256);
_queueFd = epoll_create(1);
if(_queueFd < 0)
{
@@ -242,7 +257,18 @@ Selector::Selector(const InstancePtr& instance) : _instance(instance)
ex.error = IceInternal::getSocketErrno();
throw ex;
}
-#else
+
+ epoll_event event;
+ memset(&event, 0, sizeof(epoll_event));
+ event.data.ptr = 0;
+ event.events = EPOLLIN;
+ if(epoll_ctl(_queueFd, EPOLL_CTL_ADD, _fdIntrRead, &event) != 0)
+ {
+ Ice::Error out(_instance->initializationData().logger);
+ out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
+ }
+#elif defined(ICE_USE_KQUEUE)
+ _events.resize(256);
_queueFd = kqueue();
if(_queueFd < 0)
{
@@ -250,7 +276,25 @@ Selector::Selector(const InstancePtr& instance) : _instance(instance)
ex.error = getSocketErrno();
throw ex;
}
- _selecting = false;
+
+ struct kevent ev;
+ EV_SET(&ev, _fdIntrRead, EVFILT_READ, EV_ADD, 0, 0, 0);
+ int rs = kevent(_queueFd, &ev, 1, 0, 0, 0);
+ if(rs < 0)
+ {
+ Ice::Error out(_instance->initializationData().logger);
+ out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
+ }
+#elif defined(ICE_USE_SELECT)
+ FD_ZERO(&_readFdSet);
+ FD_ZERO(&_writeFdSet);
+ FD_ZERO(&_errorFdSet);
+ FD_SET(_fdIntrRead, &_readFdSet);
+#else
+ struct pollfd pollFd;
+ pollFd.fd = _fdIntrRead;
+ pollFd.events = POLLIN;
+ _pollFdSet.push_back(pollFd);
#endif
}
@@ -261,6 +305,7 @@ Selector::~Selector()
void
Selector::destroy()
{
+#if defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL)
try
{
closeSocket(_queueFd);
@@ -270,6 +315,27 @@ Selector::destroy()
Ice::Error out(_instance->initializationData().logger);
out << "exception in selector while calling closeSocket():\n" << ex;
}
+#endif
+
+ try
+ {
+ closeSocket(_fdIntrWrite);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Error out(_instance->initializationData().logger);
+ out << "exception in selector while calling closeSocket():\n" << ex;
+ }
+
+ try
+ {
+ closeSocket(_fdIntrRead);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Error out(_instance->initializationData().logger);
+ out << "exception in selector while calling closeSocket():\n" << ex;
+ }
}
void
@@ -282,9 +348,16 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation
{
return;
}
+ checkReady(handler);
+
+ NativeInfoPtr nativeInfo = handler->getNativeInfo();
+ if(!nativeInfo || nativeInfo->fd() == INVALID_SOCKET)
+ {
+ return;
+ }
- SOCKET fd = handler->getNativeInfo()->fd();
#if defined(ICE_USE_EPOLL)
+ SOCKET fd = nativeInfo->fd();
epoll_event event;
memset(&event, 0, sizeof(epoll_event));
event.data.ptr = handler;
@@ -318,7 +391,8 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation
Ice::Error out(_instance->initializationData().logger);
out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
}
-#else // ICE_USE_KQUEUE
+#elif defined(ICE_USE_KQUEUE)
+ SOCKET fd = nativeInfo->fd();
if(remove & SocketOperationRead)
{
struct kevent ev;
@@ -349,22 +423,34 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation
{
updateSelector();
}
+#else
+ _changes.push_back(make_pair(handler, static_cast<SocketOperation>(handler->_registered & ~handler->_disabled)));
+ wakeup();
#endif
+ checkReady(handler);
}
void
Selector::enable(EventHandler* handler, SocketOperation status)
{
- if(!(handler->_disabled & status))
+ NativeInfoPtr nativeInfo = handler->getNativeInfo();
+ if(!nativeInfo || !(handler->_disabled & status))
{
return;
}
handler->_disabled = static_cast<SocketOperation>(handler->_disabled & ~status);
+ checkReady(handler);
+
+ NativeInfoPtr nativeInfo = handler->getNativeInfo();
+ if(!nativeInfo || nativeInfo->fd() == INVALID_SOCKET)
+ {
+ return;
+ }
if(handler->_registered & status)
{
- SOCKET fd = handler->getNativeInfo()->fd();
#if defined(ICE_USE_EPOLL)
+ SOCKET fd = nativeInfo->fd();
SocketOperation previous = static_cast<SocketOperation>(handler->_registered & ~(handler->_disabled | status));
SocketOperation newStatus = static_cast<SocketOperation>(handler->_registered & ~handler->_disabled);
epoll_event event;
@@ -377,31 +463,44 @@ Selector::enable(EventHandler* handler, SocketOperation status)
Ice::Error out(_instance->initializationData().logger);
out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
}
-#else // ICE_USE_KQUEUE
+#elif defined(ICE_USE_KQUEUE)
struct kevent ev;
+ SOCKET fd = handler->getNativeInfo()->fd();
EV_SET(&ev, fd, status == SocketOperationRead ? EVFILT_READ : EVFILT_WRITE, EV_ENABLE, 0, 0, handler);
_changes.push_back(ev);
if(_selecting)
{
updateSelector();
}
+#else
+ _changes.push_back(make_pair(handler, static_cast<SocketOperation>(handler->_registered & ~handler->_disabled)));
+ wakeup();
#endif
}
+ checkReady(handler);
}
void
Selector::disable(EventHandler* handler, SocketOperation status)
{
- if(handler->_disabled & status)
+ NativeInfoPtr nativeInfo = handler->getNativeInfo();
+ if(!nativeInfo || handler->_disabled & status)
{
return;
}
handler->_disabled = static_cast<SocketOperation>(handler->_disabled | status);
+ checkReady(handler);
+
+ NativeInfoPtr nativeInfo = handler->getNativeInfo();
+ if(!nativeInfo || nativeInfo->fd() == INVALID_SOCKET)
+ {
+ return;
+ }
if(handler->_registered & status)
{
- SOCKET fd = handler->getNativeInfo()->fd();
#if defined(ICE_USE_EPOLL)
+ SOCKET fd = nativeInfo->fd();
SocketOperation newStatus = static_cast<SocketOperation>(handler->_registered & ~handler->_disabled);
epoll_event event;
memset(&event, 0, sizeof(epoll_event));
@@ -413,7 +512,8 @@ Selector::disable(EventHandler* handler, SocketOperation status)
Ice::Error out(_instance->initializationData().logger);
out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
}
-#else // ICE_USE_KQUEUE
+#elif defined(ICE_USE_KQUEUE)
+ SOCKET fd = nativeInfo->fd();
struct kevent ev;
EV_SET(&ev, fd, status == SocketOperationRead ? EVFILT_READ : EVFILT_WRITE, EV_DISABLE, 0, 0, handler);
_changes.push_back(ev);
@@ -421,8 +521,12 @@ Selector::disable(EventHandler* handler, SocketOperation status)
{
updateSelector();
}
+#else
+ _changes.push_back(make_pair(handler, static_cast<SocketOperation>(handler->_registered & ~handler->_disabled)));
+ wakeup();
#endif
}
+ checkReady(handler);
}
bool
@@ -431,7 +535,11 @@ Selector::finish(EventHandler* handler, bool closeNow)
if(handler->_registered)
{
update(handler, handler->_registered, SocketOperationNone);
+#if !defined(ICE_USE_EPOLL) && !defined(ICE_USE_KQUEUE)
+ return false; // Don't close now if selecting
+#endif
}
+
#if defined(ICE_USE_KQUEUE)
if(closeNow && !_changes.empty())
{
@@ -446,43 +554,261 @@ Selector::finish(EventHandler* handler, bool closeNow)
return closeNow;
}
-#if defined(ICE_USE_KQUEUE)
void
-Selector::updateSelector()
+Selector::ready(EventHandler* handler, SocketOperation status, bool value)
{
- int rs = kevent(_queueFd, &_changes[0], _changes.size(), 0, 0, 0);
- if(rs < 0)
+ if(((handler->_ready & status) != 0) == value)
{
- Ice::Error out(_instance->initializationData().logger);
- out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
+ return; // Nothing to do if ready state already correctly set.
}
- _changes.clear();
+
+ if(value)
+ {
+ handler->_ready = static_cast<SocketOperation>(handler->_ready | status);
+ }
+ else
+ {
+ handler->_ready = static_cast<SocketOperation>(handler->_ready & ~status);
+ }
+ checkReady(handler);
}
+
+void
+Selector::wakeup()
+{
+ if(_selecting && !_interrupted)
+ {
+ char c = 0;
+ while(true)
+ {
+ if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR)
+ {
+ if(interrupted())
+ {
+ continue;
+ }
+
+ Ice::SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ break;
+ }
+ _interrupted = true;
+ }
+}
+
+void
+Selector::startSelect()
+{
+ if(_interrupted)
+ {
+ char c;
+ while(true)
+ {
+ ssize_t ret = ::read(_fdIntrRead, &c, 1);
+ if(ret == SOCKET_ERROR)
+ {
+ if(interrupted())
+ {
+ continue;
+ }
+ Ice::SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ break;
+ }
+ _interrupted = false;
+ }
+}
+
+#if !defined(ICE_USE_EPOLL)
+ if(!_changes.empty())
+ {
+ updateSelector();
+ }
#endif
+ _selecting = true;
+
+ //
+ // If there are ready handlers, don't block in select, just do a non-blocking
+ // select to retrieve new ready handlers from the Java selector.
+ //
+ _selectNow = !_readyHandlers.empty();
+}
void
-Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int timeout)
+Selector::finishSelect(vector<pair<EventHandler*, SocketOperation> >& handlers)
{
- int ret = 0;
- while(true)
+ _selecting = false;
+
+ assert(handlers.empty());
+
+#if defined(ICE_USE_POLL) || defined(ICE_USE_SELECT)
+ if(_interrupted) // Interrupted, we have to process the interrupt before returning any handlers
{
+ return;
+ }
+#endif
+
+#if defined(ICE_USE_POLL)
+ for(vector<struct pollfd>::const_iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r)
+#else
+ for(int i = 0; i < _count; ++i)
+#endif
+ {
+ pair<EventHandler*, SocketOperation> p;
+
#if defined(ICE_USE_EPOLL)
- ret = epoll_wait(_queueFd, &_events[0], _events.size(), timeout > 0 ? timeout * 1000 : -1);
+ struct epoll_event& ev = _events[i];
+ p.first = reinterpret_cast<EventHandler*>(ev.data.ptr);
+ p.second = static_cast<SocketOperation>(((ev.events & (EPOLLIN | EPOLLERR)) ?
+ SocketOperationRead : SocketOperationNone) |
+ ((ev.events & (EPOLLOUT | EPOLLERR)) ?
+ SocketOperationWrite : SocketOperationNone));
+#elif defined(ICE_USE_KQUEUE)
+ struct kevent& ev = _events[i];
+ if(ev.flags & EV_ERROR)
+ {
+ Ice::Error out(_instance->initializationData().logger);
+ out << "selector returned error:\n" << IceUtilInternal::errorToString(ev.data);
+ continue;
+ }
+ p.first = reinterpret_cast<EventHandler*>(ev.udata);
+ p.second = (ev.filter == EVFILT_READ) ? SocketOperationRead : SocketOperationWrite;
+#elif defined(ICE_USE_SELECT)
+ //
+ // Round robin for the filedescriptors.
+ //
+ SOCKET fd;
+ p.second = SocketOperationNone;
+ if(i < _selectedReadFdSet.fd_count)
+ {
+ fd = _selectedReadFdSet.fd_array[i];
+ p.second = static_cast<SocketOperation>(p.second | SocketOperationRead);
+ }
+ else if(i < _selectedWriteFdSet.fd_count + _selectedReadFdSet.fd_count)
+ {
+ fd = _selectedWriteFdSet.fd_array[i - _selectedReadFdSet.fd_count];
+ p.second = static_cast<SocketOperation>(p.second | SocketOperationWrite);
+ }
+ else
+ {
+ fd = _selectedErrorFdSet.fd_array[i - _selectedReadFdSet.fd_count - _selectedWriteFdSet.fd_count];
+ p.second = static_cast<SocketOperation>(p.second | SocketOperationConnect);
+ }
+
+ assert(fd != _fdIntrRead);
+ p.first = _handlers[fd];
#else
+ if(r->revents == 0)
+ {
+ continue;
+ }
+
+ SOCKET fd = r->fd;
+ assert(_handlers.find(fd) != _handlers.end());
+ p.first = _handlers[fd];
+ p.second = SocketOperationNone;
+ if(r->revents & (POLLIN | POLLERR | POLLHUP))
+ {
+ p.second = static_cast<SocketOperation>(p.second | SocketOperationRead);
+ }
+ if(r->revents & (POLLOUT | POLLERR | POLLHUP))
+ {
+ p.second = static_cast<SocketOperation>(p.second | SocketOperationWrite);
+ }
+ assert(p.second);
+#endif
+ if(!p.first)
+ {
+ continue; // Interrupted
+ }
+
+ map<EventHandlerPtr, SocketOperation>::iterator q = _readyHandlers.find(p.first);
+ if(q != _readyHandlers.end()) // Handler will be added by the loop below
+ {
+ q->second = p.second; // We just remember which operations are ready here.
+ }
+ else
+ {
+ handlers.push_back(p);
+ }
+ }
+
+ for(map<EventHandlerPtr, SocketOperation>::iterator q = _readyHandlers.begin(); q != _readyHandlers.end(); ++q)
+ {
+ pair<EventHandler*, SocketOperation> p;
+ p.first = q->first.get();
+ p.second = static_cast<SocketOperation>(p.first->_ready & ~p.first->_disabled & p.first->_registered);
+ p.second = static_cast<SocketOperation>(p.second | q->second);
+ if(p.second)
+ {
+ handlers.push_back(p);
+ }
+
+ //
+ // Reset the operation, it's only used by this method to temporarly store the socket status
+ // return by the select operation above.
+ //
+ q->second = SocketOperationNone;
+ }
+}
+
+void
+Selector::select(int timeout)
+{
+ if(_selectNow)
+ {
+ timeout = 0;
+ }
+ else if(timeout > 0)
+ {
+ timeout = timeout * 1000;
+ }
+ else
+ {
+ timeout = -1;
+ }
+
+ while(true)
+ {
+#if defined(ICE_USE_EPOLL)
+ _count = epoll_wait(_queueFd, &_events[0], _events.size(), timeout);
+#elif defined(ICE_USE_KQUEUE)
assert(!_events.empty());
- if(timeout > 0)
+ if(timeout >= 0)
{
struct timespec ts;
ts.tv_sec = timeout;
ts.tv_nsec = 0;
- ret = kevent(_queueFd, 0, 0, &_events[0], _events.size(), &ts);
+ _count = kevent(_queueFd, 0, 0, &_events[0], _events.size(), &ts);
}
else
{
- ret = kevent(_queueFd, 0, 0, &_events[0], _events.size(), 0);
+ _count = kevent(_queueFd, 0, 0, &_events[0], _events.size(), 0);
}
+#elif defined(ICE_USE_SELECT)
+ fd_set* rFdSet = fdSetCopy(_selectedReadFdSet, _readFdSet);
+ fd_set* wFdSet = fdSetCopy(_selectedWriteFdSet, _writeFdSet);
+ fd_set* eFdSet = fdSetCopy(_selectedErrorFdSet, _errorFdSet);
+ if(timeout >= 0)
+ {
+ struct timeval tv;
+ tv.tv_sec = timeout;
+ tv.tv_usec = 0;
+ _count = ::select(0, rFdSet, wFdSet, eFdSet, &tv); // The first parameter is ignored on Windows
+ }
+ else
+ {
+ _count = ::select(0, rFdSet, wFdSet, eFdSet, 0); // The first parameter is ignored on Windows
+ }
+#else
+ _count = poll(&_pollFdSet[0], _pollFdSet.size(), timeout);
#endif
- if(ret == SOCKET_ERROR)
+
+ if(_count == SOCKET_ERROR)
{
if(interrupted())
{
@@ -499,35 +825,326 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti
break;
}
- if(ret == 0)
+ if(_count == 0 && !_selectNow)
{
throw SelectorTimeoutException();
}
+}
- assert(ret > 0);
- for(int i = 0; i < ret; ++i)
+void
+Selector::checkReady(EventHandler* handler)
+{
+ if(handler->_ready & ~handler->_disabled & handler->_registered)
{
- pair<EventHandler*, SocketOperation> p;
-#if defined(ICE_USE_EPOLL)
- struct epoll_event& ev = _events[i];
- p.first = reinterpret_cast<EventHandler*>(ev.data.ptr);
- p.second = static_cast<SocketOperation>(((ev.events & (EPOLLIN | EPOLLERR)) ?
- SocketOperationRead : SocketOperationNone) |
- ((ev.events & (EPOLLOUT | EPOLLERR)) ?
- SocketOperationWrite : SocketOperationNone));
+ _readyHandlers.insert(make_pair(handler, SocketOperationNone));
+ wakeup();
+ }
+ else
+ {
+ map<EventHandlerPtr, SocketOperation>::iterator p = _readyHandlers.find(handler);
+ if(p != _readyHandlers.end())
+ {
+ _readyHandlers.erase(p);
+ }
+ }
+}
+
+void
+Selector::updateSelector()
+{
+#if defined(ICE_USE_KQUEUE)
+ int rs = kevent(_queueFd, &_changes[0], _changes.size(), 0, 0, 0);
+ if(rs < 0)
+ {
+ Ice::Error out(_instance->initializationData().logger);
+ out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
+ }
+ _changes.clear();
+#elif !defined(ICE_USE_EPOLL)
+ assert(!_selecting);
+
+ for(vector<pair<EventHandler*, SocketOperation> >::const_iterator p = _changes.begin(); p != _changes.end(); ++p)
+ {
+ EventHandler* handler = p->first;
+ SocketOperation status = p->second;
+
+ SOCKET fd = handler->getNativeInfo()->fd();
+ if(status)
+ {
+#if defined(ICE_USE_SELECT)
+ if(status & SocketOperationRead)
+ {
+ FD_SET(fd, &_readFdSet);
+ }
+ else
+ {
+ FD_CLR(fd, &_readFdSet);
+ }
+ if(status & SocketOperationWrite)
+ {
+ FD_SET(fd, &_writeFdSet);
+ }
+ else
+ {
+ FD_CLR(fd, &_writeFdSet);
+ }
+ if(status & SocketOperationConnect)
+ {
+ FD_SET(fd, &_writeFdSet);
+ FD_SET(fd, &_errorFdSet);
+ }
+ else
+ {
+ FD_CLR(fd, &_writeFdSet);
+ FD_CLR(fd, &_errorFdSet);
+ }
+ _handlers[fd] = handler;
#else
- struct kevent& ev = _events[i];
- if(ev.flags & EV_ERROR)
+ short events = 0;
+ if(status & SocketOperationRead)
+ {
+ events |= POLLIN;
+ }
+ if(status & SocketOperationWrite)
+ {
+ events |= POLLOUT;
+ }
+ map<SOCKET, EventHandler*>::const_iterator q = _handlers.find(fd);
+ if(q == _handlers.end())
+ {
+ struct pollfd pollFd;
+ pollFd.fd = fd;
+ pollFd.events = events;
+ pollFd.revents = 0;
+ _pollFdSet.push_back(pollFd);
+ _handlers.insert(make_pair(fd, handler));
+ }
+ else
+ {
+ for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r)
+ {
+ if(r->fd == fd)
+ {
+ r->events = events;
+ break;
+ }
+ }
+ }
+#endif
+ }
+ else
{
- Ice::Error out(_instance->initializationData().logger);
- out << "selector returned error:\n" << IceUtilInternal::errorToString(ev.data);
- continue;
+#if defined(ICE_USE_SELECT)
+ FD_CLR(fd, &_readFdSet);
+ FD_CLR(fd, &_writeFdSet);
+ FD_CLR(fd, &_errorFdSet);
+#else
+ for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r)
+ {
+ if(r->fd == fd)
+ {
+ _pollFdSet.erase(r);
+ break;
+ }
+ }
+#endif
+ _handlers.erase(fd);
}
- p.first = reinterpret_cast<EventHandler*>(ev.udata);
- p.second = (ev.filter == EVFILT_READ) ? SocketOperationRead : SocketOperationWrite;
+ }
+ _changes.clear();
+#endif
+}
+
+#elif defined(ICE_USE_CFSTREAM)
+
+namespace
+{
+
+void selectorInterrupt(void* info)
+{
+ reinterpret_cast<Selector*>(info)->processInterrupt();
+}
+
+void eventHandlerSocketCallback(CFSocketRef, CFSocketCallBackType callbackType, CFDataRef, const void* d, void* info)
+{
+ if(callbackType == kCFSocketReadCallBack)
+ {
+ reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationRead);
+ }
+ else if(callbackType == kCFSocketWriteCallBack)
+ {
+ reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationWrite);
+ }
+ else if(callbackType == kCFSocketConnectCallBack)
+ {
+ reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationConnect,
+ d ? *reinterpret_cast<const SInt32*>(d) : 0);
+ }
+}
+
+class SelectorHelperThread : public IceUtil::Thread
+{
+public:
+
+ SelectorHelperThread(Selector& selector) : _selector(selector)
+ {
+ }
+
+ virtual void run()
+ {
+ _selector.run();
+ }
+
+private:
+
+ Selector& _selector;
+};
+
+CFOptionFlags
+toCFCallbacks(SocketOperation op)
+{
+ CFOptionFlags cbs = 0;
+ if(op & SocketOperationRead)
+ {
+ cbs |= kCFSocketReadCallBack;
+ }
+ if(op & SocketOperationWrite)
+ {
+ cbs |= kCFSocketWriteCallBack;
+ }
+
+ if(_count == 0 && !_selectNow)
+ {
+ throw SelectorTimeoutException();
+ }
+}
+
+}
+
+EventHandlerWrapper::EventHandlerWrapper(const EventHandlerPtr& handler, Selector& selector) :
+ _handler(handler),
+ _streamNativeInfo(StreamNativeInfoPtr::dynamicCast(handler->getNativeInfo())),
+ _selector(selector),
+ _ready(SocketOperationNone),
+ _finish(false),
+ _socket(0),
+ _source(0)
+{
+ if(_streamNativeInfo)
+ {
+ _streamNativeInfo->initStreams(this);
+ }
+ else if(handler->getNativeInfo())
+ {
+ _readyHandlers.insert(make_pair(handler, SocketOperationNone));
+ wakeup();
+ }
+}
+
+void
+Selector::updateSelector()
+{
+#if defined(ICE_USE_KQUEUE)
+ int rs = kevent(_queueFd, &_changes[0], _changes.size(), 0, 0, 0);
+ if(rs < 0)
+ {
+ Ice::Error out(_instance->initializationData().logger);
+ out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
+ }
+ _changes.clear();
+#elif !defined(ICE_USE_EPOLL)
+ assert(!_selecting);
+
+ for(vector<pair<EventHandler*, SocketOperation> >::const_iterator p = _changes.begin(); p != _changes.end(); ++p)
+ {
+ EventHandler* handler = p->first;
+ SocketOperation status = p->second;
+
+ SOCKET fd = handler->getNativeInfo()->fd();
+ if(status)
+ {
+#if defined(ICE_USE_SELECT)
+ if(status & SocketOperationRead)
+ {
+ FD_SET(fd, &_readFdSet);
+ }
+ else
+ {
+ FD_CLR(fd, &_readFdSet);
+ }
+ if(status & SocketOperationWrite)
+ {
+ FD_SET(fd, &_writeFdSet);
+ }
+ else
+ {
+ FD_CLR(fd, &_writeFdSet);
+ }
+ if(status & SocketOperationConnect)
+ {
+ FD_SET(fd, &_writeFdSet);
+ FD_SET(fd, &_errorFdSet);
+ }
+ else
+ {
+ FD_CLR(fd, &_writeFdSet);
+ FD_CLR(fd, &_errorFdSet);
+ }
+ _handlers[fd] = handler;
+#else
+ short events = 0;
+ if(status & SocketOperationRead)
+ {
+ events |= POLLIN;
+ }
+ if(status & SocketOperationWrite)
+ {
+ events |= POLLOUT;
+ }
+ map<SOCKET, EventHandler*>::const_iterator q = _handlers.find(fd);
+ if(q == _handlers.end())
+ {
+ struct pollfd pollFd;
+ pollFd.fd = fd;
+ pollFd.events = events;
+ pollFd.revents = 0;
+ _pollFdSet.push_back(pollFd);
+ _handlers.insert(make_pair(fd, handler));
+ }
+ else
+ {
+ for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r)
+ {
+ if(r->fd == fd)
+ {
+ r->events = events;
+ break;
+ }
+ }
+ }
#endif
- handlers.push_back(p);
+ }
+ else
+ {
+#if defined(ICE_USE_SELECT)
+ FD_CLR(fd, &_readFdSet);
+ FD_CLR(fd, &_writeFdSet);
+ FD_CLR(fd, &_errorFdSet);
+#else
+ for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r)
+ {
+ if(r->fd == fd)
+ {
+ _pollFdSet.erase(r);
+ break;
+ }
+ }
+#endif
+ _handlers.erase(fd);
+ }
}
+ _changes.clear();
+#endif
}
#elif defined(ICE_USE_CFSTREAM)
@@ -667,15 +1284,15 @@ EventHandlerWrapper::updateRunLoop()
}
else
{
- SocketOperation readyOp = _nativeInfo->registerWithRunLoop(op);
+ SocketOperation readyOp = _streamNativeInfo->registerWithRunLoop(op);
if(!(op & (SocketOperationWrite | SocketOperationConnect)) || _ready & SocketOperationWrite)
{
- _nativeInfo->unregisterFromRunLoop(SocketOperationWrite, false);
+ _streamNativeInfo->unregisterFromRunLoop(SocketOperationWrite, false);
}
if(!(op & (SocketOperationRead | SocketOperationConnect)) || _ready & SocketOperationRead)
{
- _nativeInfo->unregisterFromRunLoop(SocketOperationRead, false);
+ _streamNativeInfo->unregisterFromRunLoop(SocketOperationRead, false);
}
if(readyOp)
@@ -685,7 +1302,7 @@ EventHandlerWrapper::updateRunLoop()
if(_finish)
{
- _nativeInfo->closeStreams();
+ _streamNativeInfo->closeStreams();
}
}
}
@@ -707,7 +1324,7 @@ EventHandlerWrapper::ready(SocketOperation op, int error)
// stream (which can't be used from another thread than the run loop thread if
// it's registered with a run loop).
//
- op = _nativeInfo->unregisterFromRunLoop(op, error != 0);
+ op = _streamNativeInfo->unregisterFromRunLoop(op, error != 0);
}
op = static_cast<SocketOperation>(_handler->_registered & op);
@@ -720,23 +1337,25 @@ EventHandlerWrapper::ready(SocketOperation op, int error)
{
if(op & SocketOperationConnect)
{
- _nativeInfo->setConnectError(error);
+ _streamNativeInfo->setConnectError(error);
}
}
_ready = static_cast<SocketOperation>(_ready | op);
- if(!(_handler->_disabled & op))
- {
- _selector.addReadyHandler(this);
- }
+ checkReady();
}
-void
+bool
EventHandlerWrapper::checkReady()
{
- if(_ready & _handler->_registered)
+ if((_ready | _handler->_ready) & ~_handler->_disabled & _handler->_registered)
{
_selector.addReadyHandler(this);
+ return false;
+ }
+ else
+ {
+ return _handler->getNativeInfo() && !_finish;
}
}
@@ -744,7 +1363,7 @@ SocketOperation
EventHandlerWrapper::readyOp()
{
assert(!(~_handler->_registered & _ready));
- SocketOperation op = static_cast<SocketOperation>(~_handler->_disabled & _ready);
+ SocketOperation op = static_cast<SocketOperation>(~_handler->_disabled & (_ready | _handler->_ready));
_ready = static_cast<SocketOperation>(~op & _ready);
return op;
}
@@ -762,15 +1381,16 @@ EventHandlerWrapper::update(SocketOperation remove, SocketOperation add)
// Clear ready flags which might not be valid anymore.
_ready = static_cast<SocketOperation>(_ready & _handler->_registered);
- return true;
+ return _handler->getNativeInfo();
}
-void
+bool
EventHandlerWrapper::finish()
{
_finish = true;
_ready = SocketOperationNone;
_handler->_registered = SocketOperationNone;
+ return _handler->getNativeInfo();
}
Selector::Selector(const InstancePtr& instance) : _instance(instance), _destroyed(false)
@@ -799,27 +1419,33 @@ Selector::~Selector()
void
Selector::destroy()
{
- Lock sync(*this);
-
- //
- // Make sure any pending changes are processed to ensure remaining
- // streams/sockets are closed.
- //
- _destroyed = true;
- while(!_changes.empty())
{
+ Lock sync(*this);
+
+ //
+ // Make sure any pending changes are processed to ensure remaining
+ // streams/sockets are closed.
+ //
+ _destroyed = true;
CFRunLoopSourceSignal(_source);
CFRunLoopWakeUp(_runLoop);
- wait();
+ while(!_changes.empty())
+ {
+ CFRunLoopSourceSignal(_source);
+ CFRunLoopWakeUp(_runLoop);
+
+ wait();
+ }
}
_thread->getThreadControl().join();
_thread = 0;
+ Lock sync(*this);
CFRelease(_source);
- assert(_wrappers.empty());
+ //assert(_wrappers.empty());
_readyHandlers.clear();
_selectedHandlers.clear();
}
@@ -836,7 +1462,6 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation
{
Lock sync(*this);
const EventHandlerWrapperPtr& wrapper = _wrappers[handler];
- assert(wrapper);
if(wrapper->update(remove, add))
{
_changes.insert(wrapper);
@@ -878,39 +1503,82 @@ Selector::finish(EventHandler* handler, bool closeNow)
std::map<EventHandler*, EventHandlerWrapperPtr>::iterator p = _wrappers.find(handler);
assert(p != _wrappers.end());
EventHandlerWrapperPtr wrapper = p->second;
- wrapper->finish();
+ if(wrapper->finish())
+ {
+ _changes.insert(wrapper);
+ notify();
+ }
_wrappers.erase(p);
- _changes.insert(wrapper);
- notify();
return closeNow;
}
void
-Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handlers, int timeout)
+Selector::ready(EventHandler* handler, SocketOperation status, bool value)
+{
+ if(((handler->_ready & status) != 0) == value)
+ {
+ return; // Nothing to do if ready state already correctly set.
+ }
+
+ if(value)
+ {
+ handler->_ready = static_cast<SocketOperation>(handler->_ready | status);
+ }
+ else
+ {
+ handler->_ready = static_cast<SocketOperation>(handler->_ready & ~status);
+ }
+
+ Lock sync(*this);
+ std::map<EventHandler*, EventHandlerWrapperPtr>::iterator p = _wrappers.find(handler);
+ assert(p != _wrappers.end());
+ p->second->checkReady();
+}
+
+void
+Selector::startSelect()
{
Lock sync(*this);
//
// Re-enable callbacks for previously selected handlers.
//
- if(!_selectedHandlers.empty())
+ vector<pair<EventHandlerWrapperPtr, SocketOperation> >::const_iterator p;
+ for(p = _selectedHandlers.begin(); p != _selectedHandlers.end(); ++p)
{
- vector<pair<EventHandlerWrapperPtr, SocketOperation> >::const_iterator p;
- for(p = _selectedHandlers.begin(); p != _selectedHandlers.end(); ++p)
+ if(p->first->checkReady())
{
- if(!p->first->_finish)
- {
- _changes.insert(p->first);
- }
+ _changes.insert(p->first);
}
- _selectedHandlers.clear();
}
+ _selectedHandlers.clear();
+}
+void
+Selector::finishSelect(std::vector<std::pair<EventHandler*, SocketOperation> >& handlers)
+{
+ Lock sync(*this);
+ handlers.clear();
+ for(set<EventHandlerWrapperPtr>::const_iterator p = _readyHandlers.begin(); p != _readyHandlers.end(); ++p)
+ {
+ SocketOperation op = (*p)->readyOp();
+ if(op)
+ {
+ _selectedHandlers.push_back(pair<EventHandlerWrapperPtr, SocketOperation>(*p, op));
+ handlers.push_back(pair<EventHandler*, SocketOperation>((*p)->_handler.get(), op));
+ }
+ }
+ _readyHandlers.clear();
+}
+
+void
+Selector::select(int timeout)
+{
//
// Wait for handlers to be ready.
//
- handlers.clear();
- while(_selectedHandlers.empty())
+ Lock sync(*this);
+ while(!_destroyed)
{
while(!_changes.empty())
{
@@ -935,21 +1603,10 @@ Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handle
}
}
- if(!_changes.empty())
- {
- continue; // Make sure to process the changes first.
- }
-
- for(vector<EventHandlerWrapperPtr>::const_iterator p = _readyHandlers.begin(); p != _readyHandlers.end(); ++p)
+ if(_changes.empty())
{
- SocketOperation op = (*p)->readyOp();
- if(op)
- {
- _selectedHandlers.push_back(pair<EventHandlerWrapperPtr, SocketOperation>(*p, op));
- handlers.push_back(pair<EventHandler*, SocketOperation>((*p)->_handler.get(), op));
- }
+ break;
}
- _readyHandlers.clear();
}
}
@@ -973,24 +1630,6 @@ Selector::processInterrupt()
}
void
-Selector::ready(EventHandlerWrapper* wrapper, SocketOperation op, int error)
-{
- Lock sync(*this);
- wrapper->ready(op, error);
-}
-
-void
-Selector::addReadyHandler(EventHandlerWrapper* wrapper)
-{
- // Called from ready()
- _readyHandlers.push_back(wrapper);
- if(_readyHandlers.size() == 1)
- {
- notify();
- }
-}
-
-void
Selector::run()
{
{
@@ -1004,408 +1643,24 @@ Selector::run()
CFRunLoopRemoveSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode);
}
-#elif defined(ICE_USE_SELECT) || defined(ICE_USE_POLL)
-
-Selector::Selector(const InstancePtr& instance) : _instance(instance), _selecting(false), _interrupted(false)
-{
- SOCKET fds[2];
- createPipe(fds);
- _fdIntrRead = fds[0];
- _fdIntrWrite = fds[1];
-#if defined(ICE_USE_SELECT)
- FD_ZERO(&_readFdSet);
- FD_ZERO(&_writeFdSet);
- FD_ZERO(&_errorFdSet);
- FD_SET(_fdIntrRead, &_readFdSet);
-#else
- struct pollfd pollFd;
- pollFd.fd = _fdIntrRead;
- pollFd.events = POLLIN;
- _pollFdSet.push_back(pollFd);
-#endif
-}
-
-Selector::~Selector()
-{
- try
- {
- closeSocket(_fdIntrWrite);
- }
- catch(const Ice::LocalException& ex)
- {
- Ice::Error out(_instance->initializationData().logger);
- out << "exception in selector while calling closeSocket():\n" << ex;
- }
-
- try
- {
- closeSocket(_fdIntrRead);
- }
- catch(const Ice::LocalException& ex)
- {
- Ice::Error out(_instance->initializationData().logger);
- out << "exception in selector while calling closeSocket():\n" << ex;
- }
-}
-
void
-Selector::destroy()
-{
-#if !defined(ICE_USE_SELECT) && !defined(ICE_USE_POLL)
- assert(_events.empty());
-#endif
-}
-
-void
-Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation add)
-{
- SocketOperation previous = handler->_registered;
- handler->_registered = static_cast<SocketOperation>(handler->_registered & ~remove);
- handler->_registered = static_cast<SocketOperation>(handler->_registered | add);
- if(previous == handler->_registered)
- {
- return;
- }
-
- updateImpl(handler);
-}
-
-void
-Selector::enable(EventHandler* handler, SocketOperation status)
-{
- if(!(handler->_disabled & status))
- {
- return;
- }
- handler->_disabled = static_cast<SocketOperation>(handler->_disabled & ~status);
-
- if(handler->_registered & status)
- {
- updateImpl(handler);
- }
-}
-
-void
-Selector::disable(EventHandler* handler, SocketOperation status)
-{
- if(handler->_disabled & status)
- {
- return;
- }
- handler->_disabled = static_cast<SocketOperation>(handler->_disabled | status);
-
- if(handler->_registered & status)
- {
- updateImpl(handler);
- }
-}
-
-bool
-Selector::finish(EventHandler* handler, bool closeNow)
-{
- if(handler->_registered)
- {
- update(handler, handler->_registered, SocketOperationNone);
- return false; // Don't close now if selecting.
- }
- return closeNow;
-}
-
-void
-Selector::startSelect()
-{
- if(_interrupted)
- {
- char c;
- while(true)
- {
- ssize_t ret;
-#ifdef _WIN32
- ret = ::recv(_fdIntrRead, &c, 1, 0);
-#else
- ret = ::read(_fdIntrRead, &c, 1);
-#endif
- if(ret == SOCKET_ERROR)
- {
- if(interrupted())
- {
- continue;
- }
-
- Ice::SocketException ex(__FILE__, __LINE__);
- ex.error = IceInternal::getSocketErrno();
- throw ex;
- }
- break;
- }
- _interrupted = false;
-
- if(!_changes.empty())
- {
- updateSelector();
- }
- }
- _selecting = true;
-}
-
-void
-Selector::finishSelect()
+Selector::ready(EventHandlerWrapper* wrapper, SocketOperation op, int error)
{
- _selecting = false;
+ Lock sync(*this);
+ wrapper->ready(op, error);
}
void
-Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int timeout)
+Selector::addReadyHandler(EventHandlerWrapper* wrapper)
{
- int ret = 0;
- while(true)
- {
-#if defined(ICE_USE_SELECT)
- fd_set* rFdSet = fdSetCopy(_selectedReadFdSet, _readFdSet);
- fd_set* wFdSet = fdSetCopy(_selectedWriteFdSet, _writeFdSet);
- fd_set* eFdSet = fdSetCopy(_selectedErrorFdSet, _errorFdSet);
- if(timeout > 0)
- {
- struct timeval tv;
- tv.tv_sec = timeout;
- tv.tv_usec = 0;
- ret = ::select(0, rFdSet, wFdSet, eFdSet, &tv); // The first parameter is ignored on Windows
- }
- else
- {
- ret = ::select(0, rFdSet, wFdSet, eFdSet, 0); // The first parameter is ignored on Windows
- }
-#else
- ret = poll(&_pollFdSet[0], _pollFdSet.size(), timeout > 0 ? timeout * 1000 : -1);
-#endif
- if(ret == SOCKET_ERROR)
- {
- if(interrupted())
- {
- continue;
- }
-
- {
- Ice::SocketException ex(__FILE__, __LINE__, IceInternal::getSocketErrno());
- Ice::Error out(_instance->initializationData().logger);
- out << "fatal error: selector failed:\n" << ex;
- }
- abort();
- }
- break;
- }
-
- if(ret == 0)
- {
- throw SelectorTimeoutException();
- }
-
- assert(ret > 0);
-
-#if defined(ICE_USE_SELECT)
- if(_selectedReadFdSet.fd_count == 0 && _selectedWriteFdSet.fd_count == 0 && _selectedErrorFdSet.fd_count == 0)
- {
- Ice::Error out(_instance->initializationData().logger);
- out << "select() in selector returned " << ret << " but no filedescriptor is ready";
- return;
- }
-
- for(unsigned int i = 0; i < static_cast<unsigned int>(ret); ++i)
- {
- pair<EventHandler*, SocketOperation> p;
-
- //
- // Round robin for the filedescriptors.
- //
- SOCKET fd;
- p.second = SocketOperationNone;
- if(i < _selectedReadFdSet.fd_count)
- {
- fd = _selectedReadFdSet.fd_array[i];
- p.second = static_cast<SocketOperation>(p.second | SocketOperationRead);
- }
- else if(i < _selectedWriteFdSet.fd_count + _selectedReadFdSet.fd_count)
- {
- fd = _selectedWriteFdSet.fd_array[i - _selectedReadFdSet.fd_count];
- p.second = static_cast<SocketOperation>(p.second | SocketOperationWrite);
- }
- else
- {
- fd = _selectedErrorFdSet.fd_array[i - _selectedReadFdSet.fd_count - _selectedWriteFdSet.fd_count];
- p.second = static_cast<SocketOperation>(p.second | SocketOperationConnect);
- }
-
- if(fd == _fdIntrRead) // Interrupted, we have to process the interrupt before returning any handlers
- {
- handlers.clear();
- return;
- }
-
- assert(_handlers.find(fd) != _handlers.end());
- p.first = _handlers[fd];
- handlers.push_back(p);
- }
-#else
- if(_pollFdSet[0].revents == POLLIN) // Interrupted, we have to process the interrupt before returning any handlers
- {
- return;
- }
-
- for(vector<struct pollfd>::const_iterator q = _pollFdSet.begin(); q != _pollFdSet.end(); ++q)
+ // Called from ready()
+ _readyHandlers.insert(wrapper);
+ if(_readyHandlers.size() == 1)
{
- pair<EventHandler*, SocketOperation> p;
- if(q->revents != 0)
- {
- SOCKET fd = q->fd;
- assert(fd != _fdIntrRead);
- assert(_handlers.find(fd) != _handlers.end());
- p.first = _handlers[fd];
- p.second = SocketOperationNone;
- if(q->revents & (POLLIN | POLLERR | POLLHUP))
- {
- p.second = static_cast<SocketOperation>(p.second | SocketOperationRead);
- }
- if(q->revents & POLLOUT)
- {
- p.second = static_cast<SocketOperation>(p.second | SocketOperationWrite);
- }
- assert(p.second);
- handlers.push_back(p);
- }
+ notify();
}
-#endif
}
-void
-Selector::updateImpl(EventHandler* handler)
-{
- SocketOperation status = static_cast<SocketOperation>(handler->_registered & ~handler->_disabled);
- _changes.push_back(make_pair(handler, status));
- if(_selecting)
- {
- if(!_interrupted)
- {
- char c = 0;
- while(true)
- {
-#ifdef _WIN32
- if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR)
-#else
- if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR)
#endif
- {
- if(interrupted())
- {
- continue;
- }
-
- Ice::SocketException ex(__FILE__, __LINE__);
- ex.error = IceInternal::getSocketErrno();
- throw ex;
- }
- break;
- }
- _interrupted = true;
- }
- }
- else
- {
- updateSelector();
- }
-}
-void
-Selector::updateSelector()
-{
- for(vector<pair<EventHandler*, SocketOperation> >::const_iterator p = _changes.begin(); p != _changes.end(); ++p)
- {
- EventHandler* handler = p->first;
- SocketOperation status = p->second;
-
- SOCKET fd = handler->getNativeInfo()->fd();
- if(status)
- {
-#if defined(ICE_USE_SELECT)
- if(status & SocketOperationRead)
- {
- FD_SET(fd, &_readFdSet);
- }
- else
- {
- FD_CLR(fd, &_readFdSet);
- }
- if(status & SocketOperationWrite)
- {
- FD_SET(fd, &_writeFdSet);
- }
- else
- {
- FD_CLR(fd, &_writeFdSet);
- }
- if(status & SocketOperationConnect)
- {
- FD_SET(fd, &_writeFdSet);
- FD_SET(fd, &_errorFdSet);
- }
- else
- {
- FD_CLR(fd, &_writeFdSet);
- FD_CLR(fd, &_errorFdSet);
- }
- _handlers[fd] = handler;
-#else
- short events = 0;
- if(status & SocketOperationRead)
- {
- events |= POLLIN;
- }
- if(status & SocketOperationWrite)
- {
- events |= POLLOUT;
- }
- map<SOCKET, EventHandler*>::const_iterator q = _handlers.find(fd);
- if(q == _handlers.end())
- {
- struct pollfd pollFd;
- pollFd.fd = fd;
- pollFd.events = events;
- pollFd.revents = 0;
- _pollFdSet.push_back(pollFd);
- _handlers.insert(make_pair(fd, handler));
- }
- else
- {
- for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r)
- {
- if(r->fd == fd)
- {
- r->events = events;
- break;
- }
- }
- }
-#endif
- }
- else
- {
-#if defined(ICE_USE_SELECT)
- FD_CLR(fd, &_readFdSet);
- FD_CLR(fd, &_writeFdSet);
- FD_CLR(fd, &_errorFdSet);
-#else
- for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r)
- {
- if(r->fd == fd)
- {
- _pollFdSet.erase(r);
- break;
- }
- }
-#endif
- _handlers.erase(fd);
- }
- }
- _changes.clear();
-}
-#endif
diff --git a/cpp/src/Ice/Selector.h b/cpp/src/Ice/Selector.h
index e890db6e2bf..4098da30171 100644
--- a/cpp/src/Ice/Selector.h
+++ b/cpp/src/Ice/Selector.h
@@ -57,67 +57,59 @@ class SelectorTimeoutException
{
};
-#if defined(ICE_OS_WINRT)
-
-struct SelectEvent
-{
- SelectEvent(const EventHandlerPtr& handler, SocketOperation status) : handler(handler), status(status)
- {
- }
- EventHandlerPtr handler;
- SocketOperation status;
-};
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
-class Selector : IceUtil::Monitor<IceUtil::Mutex>
+class Selector
{
-public:
-
- Selector(const InstancePtr&);
-
- void destroy();
-
- void initialize(EventHandler*);
- void update(EventHandler*, SocketOperation, SocketOperation);
- void finish(EventHandler*);
-
- EventHandlerPtr getNextHandler(SocketOperation&, int);
-
- void completed(const EventHandlerPtr&, SocketOperation);
-
-private:
-
- const InstancePtr _instance;
- std::deque<SelectEvent> _events;
-};
+#if defined(ICE_OS_WINRT)
+ struct SelectEvent
+ {
+ SelectEvent(const EventHandlerPtr& handler, SocketOperation status) : handler(handler), status(status)
+ {
+ }
-#elif defined(ICE_USE_IOCP)
+ EventHandlerPtr handler;
+ SocketOperation status;
+ };
+#endif
-class Selector
-{
public:
Selector(const InstancePtr&);
~Selector();
+#ifdef ICE_USE_IOCP
void setup(int);
- void destroy();
+#endif
+ void destroy();
void initialize(EventHandler*);
void update(EventHandler*, SocketOperation, SocketOperation);
void finish(EventHandler*);
+ void ready(EventHandler*, SocketOperation, bool);
+
+#ifdef ICE_USE_IOCP
EventHandler* getNextHandler(SocketOperation&, DWORD&, int&, int);
+#else
+ EventHandler* getNextHandler(SocketOperation&, int);
+#endif
+
+ void completed(EventHandler*, SocketOperation);
- HANDLE getIOCPHandle() { return _handle; }
-
private:
const InstancePtr _instance;
+#ifdef ICE_USE_IOCP
HANDLE _handle;
+#else
+ IceUtil::Monitor<IceUtil::Mutex> _monitor;
+ std::deque<SelectEvent> _events;
+#endif
};
-#elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL)
+#elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL) || defined(ICE_USE_SELECT) || defined(ICE_USE_POLL)
class Selector
{
@@ -126,7 +118,7 @@ public:
Selector(const InstancePtr&);
~Selector();
- void destroy();
+ void destroy();
void initialize(EventHandler*)
{
@@ -137,43 +129,62 @@ public:
void disable(EventHandler*, SocketOperation);
bool finish(EventHandler*, bool);
-#if defined(ICE_USE_KQUEUE)
- void updateSelector();
-#endif
-
- void
- startSelect()
- {
-#ifdef ICE_USE_KQUEUE
- _selecting = true;
- if(!_changes.empty())
- {
- updateSelector();
- }
-#endif
- }
-
- void
- finishSelect()
- {
-#ifdef ICE_USE_KQUEUE
- _selecting = false;
-#endif
- }
+ void ready(EventHandler*, SocketOperation, bool);
- void select(std::vector<std::pair<EventHandler*, SocketOperation> >&, int);
+ void startSelect();
+ void finishSelect(std::vector<std::pair<EventHandler*, SocketOperation> >&);
+ void select(int);
private:
+ void wakeup();
+ void checkReady(EventHandler*);
+ void updateSelector();
+
const InstancePtr _instance;
+
+ SOCKET _fdIntrRead;
+ SOCKET _fdIntrWrite;
+ bool _interrupted;
+ bool _selectNow;
+ int _count;
+ bool _selecting;
+ std::map<EventHandlerPtr, SocketOperation> _readyHandlers;
+
#if defined(ICE_USE_EPOLL)
std::vector<struct epoll_event> _events;
-#else
+ int _queueFd;
+#elif defined(ICE_USE_KQUEUE)
std::vector<struct kevent> _events;
std::vector<struct kevent> _changes;
- bool _selecting;
-#endif
int _queueFd;
+#elif defined(ICE_USE_SELECT)
+ std::vector<std::pair<EventHandler*, SocketOperation> > _changes;
+ std::map<SOCKET, EventHandler*> _handlers;
+
+ fd_set _readFdSet;
+ fd_set _writeFdSet;
+ fd_set _errorFdSet;
+ fd_set _selectedReadFdSet;
+ fd_set _selectedWriteFdSet;
+ fd_set _selectedErrorFdSet;
+
+ fd_set*
+ fdSetCopy(fd_set& dest, fd_set& src)
+ {
+ if(src.fd_count > 0)
+ {
+ dest.fd_count = src.fd_count;
+ memcpy(dest.fd_array, src.fd_array, sizeof(SOCKET) * src.fd_count);
+ return &dest;
+ }
+ return 0;
+ }
+#elif defined(ICE_USE_POLL)
+ std::vector<std::pair<EventHandler*, SocketOperation> > _changes;
+ std::map<SOCKET, EventHandler*> _handlers;
+ std::vector<struct pollfd> _pollFdSet;
+#endif
};
#elif defined(ICE_USE_CFSTREAM)
@@ -182,7 +193,7 @@ class Selector;
class SelectorReadyCallback : public IceUtil::Shared
{
-public:
+public:
virtual ~SelectorReadyCallback() { }
virtual void readyCallback(SocketOperation, int = 0) = 0;
@@ -201,9 +212,9 @@ public:
virtual SocketOperation unregisterFromRunLoop(SocketOperation, bool) = 0;
virtual void closeStreams() = 0;
- void setConnectError(int error)
+ void setConnectError(int error)
{
- _connectError = error;
+ _connectError = error;
}
private:
@@ -222,14 +233,13 @@ public:
void updateRunLoop();
virtual void readyCallback(SocketOperation, int = 0);
-
void ready(SocketOperation, int);
SocketOperation readyOp();
- void checkReady();
+ bool checkReady();
bool update(SocketOperation, SocketOperation);
- void finish();
+ bool finish();
bool operator<(const EventHandlerWrapper& o)
{
@@ -241,7 +251,7 @@ private:
friend class Selector;
EventHandlerPtr _handler;
- StreamNativeInfoPtr _nativeInfo;
+ StreamNativeInfoPtr _streamNativeInfo;
Selector& _selector;
SocketOperation _ready;
bool _finish;
@@ -266,17 +276,22 @@ public:
void disable(EventHandler*, SocketOperation);
bool finish(EventHandler*, bool);
- void startSelect() { }
- void finishSelect() { }
- void select(std::vector<std::pair<EventHandler*, SocketOperation> >&, int);
+ void ready(EventHandler*, SocketOperation, bool);
+
+ void startSelect();
+ void finishSelect(std::vector<std::pair<EventHandler*, SocketOperation> >&);
+ void select(int);
void processInterrupt();
- void ready(EventHandlerWrapper*, SocketOperation, int = 0);
- void addReadyHandler(EventHandlerWrapper*);
void run();
private:
+ void ready(EventHandlerWrapper*, SocketOperation, int = 0);
+ void addReadyHandler(EventHandlerWrapper*);
+
+ friend class EventHandlerWrapper;
+
InstancePtr _instance;
IceUtil::ThreadPtr _thread;
CFRunLoopRef _runLoop;
@@ -285,74 +300,11 @@ private:
std::set<EventHandlerWrapperPtr> _changes;
- std::vector<EventHandlerWrapperPtr> _readyHandlers;
+ std::set<EventHandlerWrapperPtr> _readyHandlers;
std::vector<std::pair<EventHandlerWrapperPtr, SocketOperation> > _selectedHandlers;
std::map<EventHandler*, EventHandlerWrapperPtr> _wrappers;
};
-#elif defined(ICE_USE_SELECT) || defined(ICE_USE_POLL)
-
-class Selector
-{
-public:
-
- Selector(const InstancePtr&);
- ~Selector();
-
- void destroy();
-
- void initialize(EventHandler*)
- {
- // Nothing to do
- }
- void update(EventHandler*, SocketOperation, SocketOperation);
- void enable(EventHandler*, SocketOperation);
- void disable(EventHandler*, SocketOperation);
- bool finish(EventHandler*, bool);
-
- void startSelect();
- void finishSelect();
- void select(std::vector<std::pair<EventHandler*, SocketOperation> >&, int);
-
-private:
-
- void updateSelector();
- void updateImpl(EventHandler*);
-
- const InstancePtr _instance;
-
- SOCKET _fdIntrRead;
- SOCKET _fdIntrWrite;
- bool _selecting;
- bool _interrupted;
-
- std::vector<std::pair<EventHandler*, SocketOperation> > _changes;
- std::map<SOCKET, EventHandler*> _handlers;
-
-#if defined(ICE_USE_SELECT)
- fd_set _readFdSet;
- fd_set _writeFdSet;
- fd_set _errorFdSet;
- fd_set _selectedReadFdSet;
- fd_set _selectedWriteFdSet;
- fd_set _selectedErrorFdSet;
-
- fd_set*
- fdSetCopy(fd_set& dest, fd_set& src)
- {
- if(src.fd_count > 0)
- {
- dest.fd_count = src.fd_count;
- memcpy(dest.fd_array, src.fd_array, sizeof(SOCKET) * src.fd_count);
- return &dest;
- }
- return 0;
- }
-#else
- std::vector<struct pollfd> _pollFdSet;
-#endif
-};
-
#endif
}
diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp
index 703679df189..7ca16363d83 100644
--- a/cpp/src/Ice/TcpTransceiver.cpp
+++ b/cpp/src/Ice/TcpTransceiver.cpp
@@ -25,7 +25,7 @@ IceInternal::TcpTransceiver::getNativeInfo()
}
SocketOperation
-IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool&)
+IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer)
{
return _stream->connect(readBuffer, writeBuffer);
}
@@ -51,7 +51,7 @@ IceInternal::TcpTransceiver::write(Buffer& buf)
}
SocketOperation
-IceInternal::TcpTransceiver::read(Buffer& buf, bool&)
+IceInternal::TcpTransceiver::read(Buffer& buf)
{
return _stream->read(buf);
}
@@ -76,7 +76,7 @@ IceInternal::TcpTransceiver::startRead(Buffer& buf)
}
void
-IceInternal::TcpTransceiver::finishRead(Buffer& buf, bool&)
+IceInternal::TcpTransceiver::finishRead(Buffer& buf)
{
_stream->finishRead(buf);
}
diff --git a/cpp/src/Ice/TcpTransceiver.h b/cpp/src/Ice/TcpTransceiver.h
index 16845238ae4..b192ab1bd44 100644
--- a/cpp/src/Ice/TcpTransceiver.h
+++ b/cpp/src/Ice/TcpTransceiver.h
@@ -28,16 +28,16 @@ public:
virtual NativeInfoPtr getNativeInfo();
- virtual SocketOperation initialize(Buffer&, Buffer&, bool&);
+ virtual SocketOperation initialize(Buffer&, Buffer&);
virtual SocketOperation closing(bool, const Ice::LocalException&);
virtual void close();
virtual SocketOperation write(Buffer&);
- virtual SocketOperation read(Buffer&, bool&);
+ virtual SocketOperation read(Buffer&);
#ifdef ICE_USE_IOCP
virtual bool startWrite(Buffer&);
virtual void finishWrite(Buffer&);
virtual void startRead(Buffer&);
- virtual void finishRead(Buffer&, bool&);
+ virtual void finishRead(Buffer&);
#endif
virtual std::string protocol() const;
virtual std::string toString() const;
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 7ff3f57db4b..0de8b971559 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -99,29 +99,6 @@ private:
IceUtil::ThreadPtr _thread;
};
-class InterruptWorkItem : public ThreadPoolWorkItem
-{
-public:
-
- virtual void
- execute(ThreadPoolCurrent& current)
- {
- // Nothing to do, this is just used to interrupt the thread pool selector.
- }
-};
-ThreadPoolWorkItemPtr interruptWorkItem;
-
-class InterruptWorkItemInit
-{
-public:
-
- InterruptWorkItemInit()
- {
- interruptWorkItem = new InterruptWorkItem;
- }
-};
-InterruptWorkItemInit init;
-
//
// Exception raised by the thread pool work queue when the thread pool
// is destroyed.
@@ -147,77 +124,38 @@ IceInternal::DispatchWorkItem::execute(ThreadPoolCurrent& current)
current.dispatchFromThisThread(this);
}
-IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(const InstancePtr& instance, Selector& selector) :
- _instance(instance),
- _selector(selector),
+IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(ThreadPool& threadPool) :
+ _threadPool(threadPool),
_destroyed(false)
-#ifdef ICE_USE_IOCP
- , _info(SocketOperationRead)
-#endif
{
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- SOCKET fds[2];
- createPipe(fds);
- _fdIntrRead = fds[0];
- _fdIntrWrite = fds[1];
-
- _selector.initialize(this);
- _selector.update(this, SocketOperationNone, SocketOperationRead);
-#endif
-}
-
-IceInternal::ThreadPoolWorkQueue::~ThreadPoolWorkQueue()
-{
- assert(_destroyed);
-
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- try
- {
- closeSocket(_fdIntrRead);
- }
- catch(const LocalException& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "exception in selector while calling closeSocket():\n" << ex;
- }
-
- try
- {
- closeSocket(_fdIntrWrite);
- }
- catch(const LocalException& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "exception in selector while calling closeSocket():\n" << ex;
- }
-#endif
+ _registered = SocketOperationRead;
}
void
IceInternal::ThreadPoolWorkQueue::destroy()
{
- Lock sync(*this);
+ //Lock sync(*this); Called with the thread pool locked
assert(!_destroyed);
_destroyed = true;
- postMessage();
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ _threadPool._selector.completed(this, SocketOperationRead);
+#else
+ _threadPool._selector.ready(this, SocketOperationRead, true);
+#endif
}
void
IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item)
{
- Lock sync(*this);
- if(_destroyed)
- {
- throw CommunicatorDestroyedException(__FILE__, __LINE__);
- }
+ //Lock sync(*this); Called with the thread pool locked
_workItems.push_back(item);
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ _threadPool._selector.completed(this, SocketOperationRead);
+#else
if(_workItems.size() == 1)
{
- postMessage();
+ _threadPool._selector.ready(this, SocketOperationRead, true);
}
-#else
- postMessage();
#endif
}
@@ -242,47 +180,24 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current)
{
ThreadPoolWorkItemPtr workItem;
{
- Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_threadPool);
if(!_workItems.empty())
{
workItem = _workItems.front();
_workItems.pop_front();
-
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- if(_workItems.empty())
- {
- char c;
- while(true)
- {
- ssize_t ret;
-# ifdef _WIN32
- ret = ::recv(_fdIntrRead, &c, 1, 0);
-# else
- ret = ::read(_fdIntrRead, &c, 1);
-# endif
- if(ret == SOCKET_ERROR)
- {
- if(interrupted())
- {
- continue;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- break;
- }
- }
-#endif
}
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
else
{
assert(_destroyed);
-#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- postMessage();
-#endif
+ _threadPool._selector.completed(this, SocketOperationRead);
}
+#else
+ if(_workItems.empty() && !_destroyed)
+ {
+ _threadPool._selector.ready(this, SocketOperationRead, false);
+ }
+#endif
}
if(workItem)
@@ -291,6 +206,7 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current)
}
else
{
+ assert(_destroyed);
current.ioCompleted();
throw ThreadPoolDestroyedException();
}
@@ -311,47 +227,7 @@ IceInternal::ThreadPoolWorkQueue::toString() const
NativeInfoPtr
IceInternal::ThreadPoolWorkQueue::getNativeInfo()
{
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- return new NativeInfo(_fdIntrRead);
-#else
return 0;
-#endif
-}
-
-void
-IceInternal::ThreadPoolWorkQueue::postMessage()
-{
-#if defined(ICE_USE_IOCP)
- if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this), &_info))
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = GetLastError();
- throw ex;
- }
-#elif defined(ICE_OS_WINRT)
- _selector.completed(this, SocketOperationRead);
-#else
- char c = 0;
- while(true)
- {
-# ifdef _WIN32
- if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR)
-# else
- if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR)
-# endif
- {
- if(interrupted())
- {
- continue;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- break;
- }
-#endif
}
IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) :
@@ -469,7 +345,8 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority");
}
- _workQueue = new ThreadPoolWorkQueue(_instance, _selector);
+ _workQueue = new ThreadPoolWorkQueue(*this);
+ _selector.initialize(_workQueue.get());
if(_instance->traceLevels()->threadPool >= 1)
{
@@ -528,7 +405,6 @@ IceInternal::ThreadPool::destroy()
{
return;
}
-
_destroyed = true;
_workQueue->destroy();
}
@@ -549,6 +425,28 @@ IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler)
Lock sync(*this);
assert(!_destroyed);
_selector.initialize(handler.get());
+
+ class ReadyCallbackI : public ReadyCallback
+ {
+ public:
+
+ ReadyCallbackI(const ThreadPoolPtr& threadPool, const EventHandlerPtr& handler) :
+ _threadPool(threadPool), _handler(handler)
+ {
+ }
+
+ virtual void
+ ready(SocketOperation op, bool value)
+ {
+ _threadPool->ready(_handler, op, value);
+ }
+
+ private:
+
+ const ThreadPoolPtr _threadPool;
+ const EventHandlerPtr _handler;
+ };
+ handler->getNativeInfo()->setReadyCallback(new ReadyCallbackI(this, handler));
}
void
@@ -569,20 +467,6 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation
}
_selector.update(handler.get(), remove, add);
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- if(add & SocketOperationRead && handler->_hasMoreData && !(handler->_disabled & SocketOperationRead))
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(handler.get());
- }
- else if(remove & SocketOperationRead)
- {
- _pendingHandlers.erase(handler.get());
- }
-#endif
}
bool
@@ -592,7 +476,6 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow)
assert(!_destroyed);
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
closeNow = _selector.finish(handler.get(), closeNow); // This must be called before!
- _pendingHandlers.erase(handler.get());
_workQueue->queue(new FinishedWorkItem(handler, !closeNow));
return closeNow;
#else
@@ -611,6 +494,17 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow)
}
void
+IceInternal::ThreadPool::ready(const EventHandlerPtr& handler, SocketOperation op, bool value)
+{
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
+ _selector.ready(handler.get(), op, value);
+}
+
+void
IceInternal::ThreadPool::dispatchFromThisThread(const DispatchWorkItemPtr& workItem)
{
if(_dispatcher)
@@ -645,6 +539,11 @@ IceInternal::ThreadPool::dispatchFromThisThread(const DispatchWorkItemPtr& workI
void
IceInternal::ThreadPool::dispatch(const DispatchWorkItemPtr& workItem)
{
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
_workQueue->queue(workItem);
}
@@ -663,10 +562,6 @@ IceInternal::ThreadPool::joinWithAllThreads()
{
(*p)->getThreadControl().join();
}
-
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- _selector.finish(_workQueue.get(), true);
-#endif
_selector.destroy();
}
@@ -682,7 +577,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
ThreadPoolCurrent current(_instance, this, thread);
bool select = false;
- vector<pair<EventHandler*, SocketOperation> > handlers;
while(true)
{
if(current._handler)
@@ -711,7 +605,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
{
try
{
- _selector.select(handlers, _serverIdleTime);
+ _selector.select(_serverIdleTime);
}
catch(const SelectorTimeoutException&)
{
@@ -730,22 +624,8 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
{
if(select)
{
- _handlers.swap(handlers);
- if(!_pendingHandlers.empty())
- {
- for(_nextHandler = _handlers.begin(); _nextHandler != _handlers.end(); ++_nextHandler)
- {
- _pendingHandlers.erase(_nextHandler->first);
- }
- set<EventHandler*>::const_iterator p;
- for(p = _pendingHandlers.begin(); p != _pendingHandlers.end(); ++p)
- {
- _handlers.push_back(make_pair(*p, SocketOperationRead));
- }
- _pendingHandlers.clear();
- }
+ _selector.finishSelect(_handlers);
_nextHandler = _handlers.begin();
- _selector.finishSelect();
select = false;
}
else if(!current._leader && followerWait(current))
@@ -762,14 +642,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
// the IO thread count now.
//
--_inUseIO;
- if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(current._handler.get());
- }
}
else
{
@@ -780,14 +652,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
if(_serialize)
{
_selector.enable(current._handler.get(), current.operation);
- if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(current._handler.get());
- }
}
assert(_inUse > 0);
--_inUse;
@@ -798,19 +662,12 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
return; // Wait timed-out.
}
}
- else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(current._handler.get());
- }
//
// Get the next ready handler.
//
- while(_nextHandler != _handlers.end() && !(_nextHandler->second & _nextHandler->first->_registered))
+ while(_nextHandler != _handlers.end() &&
+ !(_nextHandler->second & ~_nextHandler->first->_disabled & _nextHandler->first->_registered))
{
++_nextHandler;
}
@@ -1007,19 +864,6 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
if(_serialize)
{
_selector.disable(current._handler.get(), current.operation);
-
- // Make sure the handler isn't in the set of pending handlers (this can
- // for example occur if the handler is has more data and its added by
- // ThreadPool::update while we were processing IO).
- _pendingHandlers.erase(current._handler.get());
- }
- else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(current._handler.get());
}
}
@@ -1090,8 +934,8 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
if(current._handler->_started & current.operation)
{
- assert(!(current._handler->_ready & current.operation));
- current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready | current.operation);
+ assert(!(current._handler->_completed & current.operation));
+ current._handler->_completed = static_cast<SocketOperation>(current._handler->_completed | current.operation);
current._handler->_started = static_cast<SocketOperation>(current._handler->_started & ~current.operation);
#ifndef ICE_OS_WINRT
@@ -1105,20 +949,26 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish)
{
+ Lock sync(*this);
_workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
}
return false;
}
}
- else if(!(current._handler->_ready & current.operation) && (current._handler->_registered & current.operation))
+ else if(!(current._handler->_completed & current.operation) && (current._handler->_registered & current.operation))
{
assert(!(current._handler->_started & current.operation));
- if(!current._handler->startAsync(current.operation))
+ if(current._handler->_ready & current.operation)
+ {
+ return true;
+ }
+ else if(!current._handler->startAsync(current.operation))
{
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish)
{
+ Lock sync(*this);
_workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
}
@@ -1133,8 +983,8 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
if(current._handler->_registered & current.operation)
{
- assert(current._handler->_ready & current.operation);
- current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready & ~current.operation);
+ assert(current._handler->_completed & current.operation);
+ current._handler->_completed = static_cast<SocketOperation>(current._handler->_completed & ~current.operation);
return true;
}
else
@@ -1142,6 +992,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish)
{
+ Lock sync(*this);
_workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
}
@@ -1152,10 +1003,14 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
void
IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current)
{
- if(current._handler->_registered & current.operation)
+ if(current._handler->_registered & current.operation && !current._handler->_finish)
{
- assert(!(current._handler->_ready & current.operation));
- if(!current._handler->startAsync(current.operation))
+ assert(!(current._handler->_completed & current.operation));
+ if(current._handler->_ready & current.operation)
+ {
+ _selector.completed(current._handler.get(), current.operation);
+ }
+ else if(!current._handler->startAsync(current.operation))
{
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
}
@@ -1173,6 +1028,7 @@ IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current)
if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish)
{
// There are no more pending async operations, it's time to call finish.
+ Lock sync(*this);
_workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
}
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h
index d3447a1959c..f97ff363f1b 100644
--- a/cpp/src/Ice/ThreadPool.h
+++ b/cpp/src/Ice/ThreadPool.h
@@ -40,7 +40,7 @@ typedef IceUtil::Handle<ThreadPoolWorkQueue> ThreadPoolWorkQueuePtr;
class ThreadPoolWorkItem : virtual public IceUtil::Shared
{
public:
-
+
virtual void execute(ThreadPoolCurrent&) = 0;
};
typedef IceUtil::Handle<ThreadPoolWorkItem> ThreadPoolWorkItemPtr;
@@ -51,8 +51,8 @@ public:
DispatchWorkItem();
DispatchWorkItem(const Ice::ConnectionPtr& connection);
-
- const Ice::ConnectionPtr&
+
+ const Ice::ConnectionPtr&
getConnection()
{
return _connection;
@@ -66,18 +66,18 @@ private:
};
typedef IceUtil::Handle<DispatchWorkItem> DispatchWorkItemPtr;
-class ThreadPool : public IceUtil::Shared, public IceUtil::Monitor<IceUtil::Mutex>
+class ThreadPool : public IceUtil::Shared, private IceUtil::Monitor<IceUtil::Mutex>
{
class EventHandlerThread : public IceUtil::Thread
{
public:
-
+
EventHandlerThread(const ThreadPoolPtr&, const std::string&);
virtual void run();
void updateObserver();
void setState(Ice::Instrumentation::ThreadState);
-
+
private:
ThreadPoolPtr _pool;
@@ -106,6 +106,7 @@ public:
update(handler, status, SocketOperationNone);
}
bool finish(const EventHandlerPtr&, bool);
+ void ready(const EventHandlerPtr&, SocketOperation, bool);
void dispatchFromThisThread(const DispatchWorkItemPtr&);
void dispatch(const DispatchWorkItemPtr&);
@@ -140,6 +141,7 @@ private:
friend class EventHandlerThread;
friend class ThreadPoolCurrent;
+ friend class ThreadPoolWorkQueue;
const int _size; // Number of threads that are pre-created.
const int _sizeIO; // Maximum number of threads that can concurrently perform IO.
@@ -158,7 +160,6 @@ private:
int _inUseIO; // Number of threads that are currently performing IO.
std::vector<std::pair<EventHandler*, SocketOperation> > _handlers;
std::vector<std::pair<EventHandler*, SocketOperation> >::const_iterator _nextHandler;
- std::set<EventHandler*> _pendingHandlers;
#endif
bool _promote;
@@ -215,12 +216,11 @@ private:
friend class ThreadPool;
};
-class ThreadPoolWorkQueue : public EventHandler, public IceUtil::Mutex
+class ThreadPoolWorkQueue : public EventHandler
{
public:
- ThreadPoolWorkQueue(const InstancePtr&, Selector&);
- ~ThreadPoolWorkQueue();
+ ThreadPoolWorkQueue(ThreadPool&);
void destroy();
void queue(const ThreadPoolWorkItemPtr&);
@@ -234,19 +234,11 @@ public:
virtual void finished(ThreadPoolCurrent&, bool);
virtual std::string toString() const;
virtual NativeInfoPtr getNativeInfo();
- virtual void postMessage();
private:
- const InstancePtr _instance;
- Selector& _selector;
+ ThreadPool& _threadPool;
bool _destroyed;
-#ifdef ICE_USE_IOCP
- AsyncInfo _info;
-#elif !defined(ICE_OS_WINRT)
- SOCKET _fdIntrRead;
- SOCKET _fdIntrWrite;
-#endif
std::list<ThreadPoolWorkItemPtr> _workItems;
};
@@ -257,7 +249,7 @@ private:
//
// An instance of the IOScope subclass must be created within the synchronization
// of the event handler. It takes care of calling startMessage/finishMessage for
-// the IOCP implementation and ensures that finishMessage isn't called multiple
+// the IOCP implementation and ensures that finishMessage isn't called multiple
// times.
//
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
@@ -290,7 +282,7 @@ public:
}
private:
-
+
ThreadPoolMessage<T>& _message;
};
friend class IOScope;
@@ -309,7 +301,7 @@ private:
ThreadPoolCurrent& _current;
};
-#else
+#else
template<class T> class ThreadPoolMessage
{
@@ -321,7 +313,7 @@ public:
IOScope(ThreadPoolMessage& message) : _message(message)
{
- // This must be called with the handler locked.
+ // This must be called with the handler locked.
_finish = _message._current.startMessage();
}
@@ -329,7 +321,7 @@ public:
{
if(_finish)
{
- // This must be called with the handler locked.
+ // This must be called with the handler locked.
_message._current.finishMessage();
}
}
@@ -344,7 +336,7 @@ public:
{
//
// Call finishMessage once IO is completed only if serialization is not enabled.
- // Otherwise, finishMessage will be called when the event handler is done with
+ // Otherwise, finishMessage will be called when the event handler is done with
// the message (it will be called from ~ThreadPoolMessage below).
//
assert(_finish);
@@ -358,22 +350,22 @@ public:
private:
ThreadPoolMessage& _message;
- bool _finish;
+ bool _finish;
};
friend class IOScope;
-
- ThreadPoolMessage(ThreadPoolCurrent& current, const T& mutex) :
+
+ ThreadPoolMessage(ThreadPoolCurrent& current, const T& mutex) :
_current(current), _mutex(mutex), _finish(false)
{
}
-
+
~ThreadPoolMessage()
{
if(_finish)
{
//
// A ThreadPoolMessage instance must be created outside the synchronization
- // of the event handler. We need to lock the event handler here to call
+ // of the event handler. We need to lock the event handler here to call
// finishMessage.
//
#if defined(__MINGW32__)
@@ -386,7 +378,7 @@ public:
}
private:
-
+
ThreadPoolCurrent& _current;
const T& _mutex;
bool _finish;
diff --git a/cpp/src/Ice/Transceiver.h b/cpp/src/Ice/Transceiver.h
index b330cab5f4b..d277e9bb746 100644
--- a/cpp/src/Ice/Transceiver.h
+++ b/cpp/src/Ice/Transceiver.h
@@ -27,17 +27,17 @@ public:
virtual NativeInfoPtr getNativeInfo() = 0;
- virtual SocketOperation initialize(Buffer&, Buffer&, bool&) = 0;
+ virtual SocketOperation initialize(Buffer&, Buffer&) = 0;
virtual SocketOperation closing(bool, const Ice::LocalException&) = 0;
virtual void close() = 0;
virtual EndpointIPtr bind();
virtual SocketOperation write(Buffer&) = 0;
- virtual SocketOperation read(Buffer&, bool&) = 0;
+ virtual SocketOperation read(Buffer&) = 0;
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
virtual bool startWrite(Buffer&) = 0;
virtual void finishWrite(Buffer&) = 0;
virtual void startRead(Buffer&) = 0;
- virtual void finishRead(Buffer&, bool&) = 0;
+ virtual void finishRead(Buffer&) = 0;
#endif
virtual std::string protocol() const = 0;
diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp
index c664eeb977e..b1b3c8562b6 100644
--- a/cpp/src/Ice/UdpTransceiver.cpp
+++ b/cpp/src/Ice/UdpTransceiver.cpp
@@ -79,7 +79,7 @@ IceInternal::UdpTransceiver::setCompletedHandler(SocketOperationCompletedHandler
#endif
SocketOperation
-IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/, bool& /*hasMoreData*/)
+IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/)
{
if(_state == StateNeedConnect)
{
@@ -264,7 +264,7 @@ repeat:
}
SocketOperation
-IceInternal::UdpTransceiver::read(Buffer& buf, bool&)
+IceInternal::UdpTransceiver::read(Buffer& buf)
{
if(buf.i == buf.b.end())
{
@@ -681,7 +681,7 @@ IceInternal::UdpTransceiver::startRead(Buffer& buf)
}
void
-IceInternal::UdpTransceiver::finishRead(Buffer& buf, bool&)
+IceInternal::UdpTransceiver::finishRead(Buffer& buf)
{
#ifdef ICE_OS_WINRT
IceUtil::Mutex::Lock lock(_mutex);
diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h
index 83921ad54a5..7c1edfc3c6f 100644
--- a/cpp/src/Ice/UdpTransceiver.h
+++ b/cpp/src/Ice/UdpTransceiver.h
@@ -44,17 +44,17 @@ public:
virtual void setCompletedHandler(SocketOperationCompletedHandler^);
#endif
- virtual SocketOperation initialize(Buffer&, Buffer&, bool&);
+ virtual SocketOperation initialize(Buffer&, Buffer&);
virtual SocketOperation closing(bool, const Ice::LocalException&);
virtual void close();
virtual EndpointIPtr bind();
virtual SocketOperation write(Buffer&);
- virtual SocketOperation read(Buffer&, bool&);
+ virtual SocketOperation read(Buffer&);
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
virtual bool startWrite(Buffer&);
virtual void finishWrite(Buffer&);
virtual void startRead(Buffer&);
- virtual void finishRead(Buffer&, bool&);
+ virtual void finishRead(Buffer&);
#endif
virtual std::string protocol() const;
virtual std::string toString() const;
diff --git a/cpp/src/Ice/WSTransceiver.cpp b/cpp/src/Ice/WSTransceiver.cpp
index e68bccc4668..bd9254a22bb 100644
--- a/cpp/src/Ice/WSTransceiver.cpp
+++ b/cpp/src/Ice/WSTransceiver.cpp
@@ -199,14 +199,14 @@ IceInternal::WSTransceiver::setCompletedHandler(IceInternal::SocketOperationComp
#endif
SocketOperation
-IceInternal::WSTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool& hasMoreData)
+IceInternal::WSTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer)
{
//
// Delegate logs exceptions that occur during initialize(), so there's no need to trap them here.
//
if(_state == StateInitializeDelegate)
{
- SocketOperation op = _delegate->initialize(readBuffer, writeBuffer, hasMoreData);
+ SocketOperation op = _delegate->initialize(readBuffer, writeBuffer);
if(op != SocketOperationNone)
{
return op;
@@ -280,7 +280,7 @@ IceInternal::WSTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer,
{
if(_readBuffer.i < _readBuffer.b.end())
{
- SocketOperation s = _delegate->read(_readBuffer, hasMoreData);
+ SocketOperation s = _delegate->read(_readBuffer);
if(s == SocketOperationWrite || _readBuffer.i == _readBuffer.b.begin())
{
return s;
@@ -384,7 +384,10 @@ IceInternal::WSTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer,
_state = StateOpened;
_nextState = StateOpened;
- hasMoreData |= _readI < _readBuffer.i;
+ if(_readI < _readBuffer.i)
+ {
+ _delegate->getNativeInfo()->ready(SocketOperationRead, true);
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -544,7 +547,7 @@ IceInternal::WSTransceiver::write(Buffer& buf)
}
SocketOperation
-IceInternal::WSTransceiver::read(Buffer& buf, bool& hasMoreData)
+IceInternal::WSTransceiver::read(Buffer& buf)
{
if(_readPending)
{
@@ -555,11 +558,11 @@ IceInternal::WSTransceiver::read(Buffer& buf, bool& hasMoreData)
{
if(_state < StateConnected)
{
- return _delegate->read(buf, hasMoreData);
+ return _delegate->read(buf);
}
else
{
- if(_delegate->read(_readBuffer, hasMoreData) == SocketOperationWrite)
+ if(_delegate->read(_readBuffer) == SocketOperationWrite)
{
return SocketOperationWrite;
}
@@ -576,7 +579,10 @@ IceInternal::WSTransceiver::read(Buffer& buf, bool& hasMoreData)
//
if(buf.i == buf.b.end())
{
- hasMoreData |= _readI < _readBuffer.i;
+ if(_readI < _readBuffer.i)
+ {
+ _delegate->getNativeInfo()->ready(SocketOperationRead, true);
+ }
return SocketOperationNone;
}
@@ -597,17 +603,17 @@ IceInternal::WSTransceiver::read(Buffer& buf, bool& hasMoreData)
{
size_t size = buf.b.size();
buf.b.resize(buf.i - buf.b.begin() + readSz);
- s = _delegate->read(buf, hasMoreData);
+ s = _delegate->read(buf);
buf.b.resize(size);
}
else
{
- s = _delegate->read(buf, hasMoreData);
+ s = _delegate->read(buf);
}
}
else
{
- s = _delegate->read(_readBuffer, hasMoreData);
+ s = _delegate->read(_readBuffer);
}
if(s == SocketOperationWrite)
@@ -621,12 +627,15 @@ IceInternal::WSTransceiver::read(Buffer& buf, bool& hasMoreData)
if(buf.i == buf.b.end())
{
- hasMoreData |= _readI < _readBuffer.i;
+ if(_readI < _readBuffer.i)
+ {
+ _delegate->getNativeInfo()->ready(SocketOperationRead, true);
+ }
s = SocketOperationNone;
}
else
{
- hasMoreData = false;
+ _delegate->getNativeInfo()->ready(SocketOperationRead, false);
s = SocketOperationRead;
}
@@ -767,18 +776,18 @@ IceInternal::WSTransceiver::startRead(Buffer& buf)
}
void
-IceInternal::WSTransceiver::finishRead(Buffer& buf, bool& hasMoreData)
+IceInternal::WSTransceiver::finishRead(Buffer& buf)
{
_readPending = false;
if(_state < StateOpened)
{
if(_state < StateConnected)
{
- _delegate->finishRead(buf, hasMoreData);
+ _delegate->finishRead(buf);
}
else
{
- _delegate->finishRead(_readBuffer, hasMoreData);
+ _delegate->finishRead(_readBuffer);
}
return;
}
@@ -789,11 +798,11 @@ IceInternal::WSTransceiver::finishRead(Buffer& buf, bool& hasMoreData)
}
else if(_readState == ReadStatePayload)
{
- _delegate->finishRead(buf, hasMoreData);
+ _delegate->finishRead(buf);
}
else
{
- _delegate->finishRead(_readBuffer, hasMoreData);
+ _delegate->finishRead(_readBuffer);
}
if(_state == StateClosed)
diff --git a/cpp/src/Ice/WSTransceiver.h b/cpp/src/Ice/WSTransceiver.h
index c3d8d760e22..b7771ecece9 100644
--- a/cpp/src/Ice/WSTransceiver.h
+++ b/cpp/src/Ice/WSTransceiver.h
@@ -46,16 +46,16 @@ public:
virtual void setCompletedHandler(SocketOperationCompletedHandler^);
#endif
- virtual SocketOperation initialize(Buffer&, Buffer&, bool&);
+ virtual SocketOperation initialize(Buffer&, Buffer&);
virtual SocketOperation closing(bool, const Ice::LocalException&);
virtual void close();
virtual SocketOperation write(Buffer&);
- virtual SocketOperation read(Buffer&, bool&);
+ virtual SocketOperation read(Buffer&);
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
virtual bool startWrite(Buffer&);
virtual void finishWrite(Buffer&);
virtual void startRead(Buffer&);
- virtual void finishRead(Buffer&, bool&);
+ virtual void finishRead(Buffer&);
#endif
virtual std::string protocol() const;
virtual std::string toString() const;
diff --git a/cpp/src/Ice/winrt/StreamTransceiver.cpp b/cpp/src/Ice/winrt/StreamTransceiver.cpp
index e0a7acf6b9a..039643dd3ae 100644
--- a/cpp/src/Ice/winrt/StreamTransceiver.cpp
+++ b/cpp/src/Ice/winrt/StreamTransceiver.cpp
@@ -67,7 +67,7 @@ IceInternal::StreamTransceiver::setCompletedHandler(SocketOperationCompletedHand
}
SocketOperation
-IceInternal::StreamTransceiver::initialize(Buffer&, Buffer&,bool&)
+IceInternal::StreamTransceiver::initialize(Buffer&, Buffer&)
{
if(_state == StateNeedConnect)
{
@@ -123,7 +123,7 @@ IceInternal::StreamTransceiver::write(Buffer& buf)
}
SocketOperation
-IceInternal::StreamTransceiver::read(Buffer& buf, bool&)
+IceInternal::StreamTransceiver::read(Buffer& buf)
{
return buf.i == buf.b.end() ? SocketOperationNone : SocketOperationRead;
}
@@ -254,7 +254,7 @@ IceInternal::StreamTransceiver::startRead(Buffer& buf)
}
void
-IceInternal::StreamTransceiver::finishRead(Buffer& buf, bool& hasMoreData)
+IceInternal::StreamTransceiver::finishRead(Buffer& buf)
{
if(_read.count == SOCKET_ERROR)
{
diff --git a/cpp/src/Ice/winrt/StreamTransceiver.h b/cpp/src/Ice/winrt/StreamTransceiver.h
index 2fc822868f0..307ac1e0a78 100644
--- a/cpp/src/Ice/winrt/StreamTransceiver.h
+++ b/cpp/src/Ice/winrt/StreamTransceiver.h
@@ -35,16 +35,16 @@ public:
virtual NativeInfoPtr getNativeInfo();
virtual void setCompletedHandler(SocketOperationCompletedHandler^);
- virtual SocketOperation initialize(Buffer&, Buffer&, bool&);
+ virtual SocketOperation initialize(Buffer&, Buffer&);
virtual SocketOperation closing(bool, const Ice::LocalException&);
virtual void close();
virtual SocketOperation write(Buffer&);
- virtual SocketOperation read(Buffer&, bool&);
+ virtual SocketOperation read(Buffer&);
virtual bool startWrite(Buffer&);
virtual void finishWrite(Buffer&);
virtual void startRead(Buffer&);
- virtual void finishRead(Buffer&, bool&);
+ virtual void finishRead(Buffer&);
virtual std::string protocol() const;
virtual std::string toString() const;
diff --git a/cpp/src/IceGrid/Makefile b/cpp/src/IceGrid/Makefile
index 14da8e051d9..ed6f7fcaa01 100644
--- a/cpp/src/IceGrid/Makefile
+++ b/cpp/src/IceGrid/Makefile
@@ -114,7 +114,7 @@ $(ADMIN): $(ADMIN_OBJS) $(LIBTARGETS)
$(REGISTRY_SERVER): $(REGISTRY_SVR_OBJS) $(LIBTARGETS)
rm -f $@
$(CXX) $(LDFLAGS) $(LDEXEFLAGS) -o $@ $(REGISTRY_SVR_OBJS) -lIceGrid -lIceStorm -lIceStormService -lGlacier2 -lIcePatch2 \
- $(DB_RPATH_LINK) -lFreeze -lIceBox $(EXPAT_RPATH_LINK) -lIceXML -lIceSSL $(OPENSSL_RPATH_LINK) $(LIBS)
+ $(RPATH_LINK) -lFreeze -lIceBox $(EXPAT_RPATH_LINK) -lIceXML -lIceSSL $(OPENSSL_RPATH_LINK) $(LIBS)
$(NODE_SERVER): $(NODE_SVR_OBJS) $(LIBTARGETS)
rm -f $@
diff --git a/cpp/src/IceSSL/OpenSSLTransceiverI.cpp b/cpp/src/IceSSL/OpenSSLTransceiverI.cpp
index 244a79d4c6e..ff9563db0de 100644
--- a/cpp/src/IceSSL/OpenSSLTransceiverI.cpp
+++ b/cpp/src/IceSSL/OpenSSLTransceiverI.cpp
@@ -83,7 +83,7 @@ IceSSL::TransceiverI::getNativeInfo()
}
IceInternal::SocketOperation
-IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool&)
+IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer)
{
IceInternal::SocketOperation status = _stream->connect(readBuffer, writeBuffer);
if(status != IceInternal::SocketOperationNone)
@@ -440,7 +440,7 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf)
}
IceInternal::SocketOperation
-IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&)
+IceSSL::TransceiverI::read(IceInternal::Buffer& buf)
{
if(!_stream->isConnected())
{
@@ -448,8 +448,7 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&)
}
//
- // Note: we don't set the hasMoreData flag in this implementation.
- // We assume that OpenSSL doesn't read more SSL records than
+ // Note: We assume that OpenSSL doesn't read more SSL records than
// necessary to fill the requested data and that the sender sends
// Ice messages in individual SSL records.
//
diff --git a/cpp/src/IceSSL/OpenSSLTransceiverI.h b/cpp/src/IceSSL/OpenSSLTransceiverI.h
index a64e932a0f5..eb81b12ec12 100644
--- a/cpp/src/IceSSL/OpenSSLTransceiverI.h
+++ b/cpp/src/IceSSL/OpenSSLTransceiverI.h
@@ -38,11 +38,11 @@ public:
virtual IceInternal::NativeInfoPtr getNativeInfo();
- virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&);
+ virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&);
virtual IceInternal::SocketOperation closing(bool, const Ice::LocalException&);
virtual void close();
virtual IceInternal::SocketOperation write(IceInternal::Buffer&);
- virtual IceInternal::SocketOperation read(IceInternal::Buffer&, bool&);
+ virtual IceInternal::SocketOperation read(IceInternal::Buffer&);
virtual std::string protocol() const;
virtual std::string toString() const;
virtual std::string toDetailedString() const;
diff --git a/cpp/src/IceSSL/SChannelTransceiverI.cpp b/cpp/src/IceSSL/SChannelTransceiverI.cpp
index 2b842769b09..238bf3981da 100644
--- a/cpp/src/IceSSL/SChannelTransceiverI.cpp
+++ b/cpp/src/IceSSL/SChannelTransceiverI.cpp
@@ -622,7 +622,7 @@ IceSSL::TransceiverI::encryptMessage(IceInternal::Buffer& buffer)
}
IceInternal::SocketOperation
-IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool& hasMoreData)
+IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer)
{
IceInternal::SocketOperation op = _stream->connect(readBuffer, writeBuffer);
if(op != IceInternal::SocketOperationNone)
@@ -746,7 +746,8 @@ IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::B
}
out << toString();
}
- hasMoreData = !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin();
+ _stream->ready(IceInternal::SocketOperationRead,
+ !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin());
return IceInternal::SocketOperationNone;
}
@@ -819,7 +820,7 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf)
}
IceInternal::SocketOperation
-IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool& hasMoreData)
+IceSSL::TransceiverI::read(IceInternal::Buffer& buf)
{
if(!_stream->isConnected())
{
@@ -832,7 +833,7 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool& hasMoreData)
}
assert(_state == StateHandshakeComplete);
- hasMoreData = false;
+ _stream->ready(IceInternal::SocketOperationRead, false);
while(buf.i != buf.b.end())
{
if(_readUnprocessed.b.empty() && _readBuffer.i == _readBuffer.b.begin() && !readRaw(_readBuffer))
@@ -852,7 +853,8 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool& hasMoreData)
buf.i += decrypted;
}
- hasMoreData = !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin();
+ _stream->ready(IceInternal::SocketOperationRead,
+ !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin());
return IceInternal::SocketOperationNone;
}
@@ -909,7 +911,7 @@ IceSSL::TransceiverI::startRead(IceInternal::Buffer& buffer)
}
void
-IceSSL::TransceiverI::finishRead(IceInternal::Buffer& buf, bool& hasMoreData)
+IceSSL::TransceiverI::finishRead(IceInternal::Buffer& buf)
{
if(!_stream->isConnected())
{
@@ -924,11 +926,12 @@ IceSSL::TransceiverI::finishRead(IceInternal::Buffer& buf, bool& hasMoreData)
if(decrypted > 0)
{
buf.i += decrypted;
- hasMoreData = !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin();
+ _stream->ready(IceInternal::SocketOperationRead,
+ !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin());
}
else
{
- hasMoreData = false;
+ _stream->ready(IceInternal::SocketOperationRead, false);
}
}
}
diff --git a/cpp/src/IceSSL/SChannelTransceiverI.h b/cpp/src/IceSSL/SChannelTransceiverI.h
index f60d54ff6d6..a4eb31ab46f 100644
--- a/cpp/src/IceSSL/SChannelTransceiverI.h
+++ b/cpp/src/IceSSL/SChannelTransceiverI.h
@@ -49,16 +49,16 @@ public:
virtual IceInternal::NativeInfoPtr getNativeInfo();
- virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&);
+ virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&);
virtual IceInternal::SocketOperation closing(bool, const Ice::LocalException&);
virtual void close();
virtual IceInternal::SocketOperation write(IceInternal::Buffer&);
- virtual IceInternal::SocketOperation read(IceInternal::Buffer&, bool&);
+ virtual IceInternal::SocketOperation read(IceInternal::Buffer&);
#ifdef ICE_USE_IOCP
virtual bool startWrite(IceInternal::Buffer&);
virtual void finishWrite(IceInternal::Buffer&);
virtual void startRead(IceInternal::Buffer&);
- virtual void finishRead(IceInternal::Buffer&, bool&);
+ virtual void finishRead(IceInternal::Buffer&);
#endif
virtual std::string protocol() const;
virtual std::string toString() const;
diff --git a/cpp/src/IceSSL/SecureTransportTransceiverI.cpp b/cpp/src/IceSSL/SecureTransportTransceiverI.cpp
index 211a4ca3fe8..08bce7548a1 100644
--- a/cpp/src/IceSSL/SecureTransportTransceiverI.cpp
+++ b/cpp/src/IceSSL/SecureTransportTransceiverI.cpp
@@ -175,7 +175,7 @@ IceSSL::TransceiverI::getNativeInfo()
}
IceInternal::SocketOperation
-IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool&)
+IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer)
{
IceInternal::SocketOperation status = _stream->connect(readBuffer, writeBuffer);
if(status != IceInternal::SocketOperationNone)
@@ -396,7 +396,7 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf)
}
IceInternal::SocketOperation
-IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&)
+IceSSL::TransceiverI::read(IceInternal::Buffer& buf)
{
if(!_stream->isConnected())
{
@@ -404,10 +404,9 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&)
}
//
- // Note: we don't set the hasMoreData flag in this implementation.
- // We assume that SecureTransport doesn't read more SSL records
- // than necessary to fill the requested data and that the sender
- // sends Ice messages in individual SSL records.
+ // Note: we assume that SecureTransport doesn't read more SSL records
+ // than necessary to fill the requested data and that the sender sends
+ // Ice messages in individual SSL records.
//
if(buf.i == buf.b.end())
diff --git a/cpp/src/IceSSL/SecureTransportTransceiverI.h b/cpp/src/IceSSL/SecureTransportTransceiverI.h
index c81ee7aaef1..f50a17de13d 100644
--- a/cpp/src/IceSSL/SecureTransportTransceiverI.h
+++ b/cpp/src/IceSSL/SecureTransportTransceiverI.h
@@ -37,11 +37,11 @@ public:
virtual IceInternal::NativeInfoPtr getNativeInfo();
- virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&);
+ virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&);
virtual IceInternal::SocketOperation closing(bool, const Ice::LocalException&);
virtual void close();
virtual IceInternal::SocketOperation write(IceInternal::Buffer&);
- virtual IceInternal::SocketOperation read(IceInternal::Buffer&, bool&);
+ virtual IceInternal::SocketOperation read(IceInternal::Buffer&);
virtual std::string protocol() const;
virtual std::string toString() const;
@@ -74,7 +74,7 @@ private:
SSLContextRef _ssl;
SecTrustRef _trust;
bool _verified;
-
+
size_t _buffered;
enum SSLWantFlags
{