diff options
author | Benoit Foucher <benoit@zeroc.com> | 2015-10-09 15:00:57 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2015-10-09 15:00:57 +0200 |
commit | 20b6c0ccb95118ffc685826904a8edd06a38ac1b (patch) | |
tree | 1b389964fa35ca9de23c548120ecedcc9d82074c /cpp | |
parent | Merge branch '3.6' (diff) | |
download | ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.tar.bz2 ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.tar.xz ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.zip |
Added ready callback to allow transports to signal readiness to the thread pool
Diffstat (limited to 'cpp')
29 files changed, 1226 insertions, 1142 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index a0faafc6145..3dbef2a5a6d 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -1413,7 +1413,6 @@ string IceInternal::IncomingConnectionFactory::toString() const { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_transceiver) { return _transceiver->toString(); diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 49707d9e124..4a078bcc1f8 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -1372,19 +1372,12 @@ Ice::ConnectionI::startAsync(SocketOperation operation) } else if(operation & SocketOperationRead) { - if(!_hasMoreData) + if(_observer && !_readHeader) { - if(_observer && !_readHeader) - { - _observer.startRead(_readStream); - } - - _transceiver->startRead(_readStream); - } - else - { - _transceiver->getNativeInfo()->completed(IceInternal::SocketOperationRead); + _observer.startRead(_readStream); } + + _transceiver->startRead(_readStream); } } catch(const Ice::LocalException& ex) @@ -1422,29 +1415,26 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) } else if(operation & SocketOperationRead) { - if(!_hasMoreData) + Buffer::Container::iterator start = _readStream.i; + _transceiver->finishRead(_readStream); + if(_instance->traceLevels()->network >= 3 && _readStream.i != start) { - Buffer::Container::iterator start = _readStream.i; - _transceiver->finishRead(_readStream, _hasMoreData); - if(_instance->traceLevels()->network >= 3 && _readStream.i != start) + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "received "; + if(_endpoint->datagram()) { - Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); - out << "received "; - if(_endpoint->datagram()) - { - out << _readStream.b.size(); - } - else - { - out << (_readStream.i - start) << " of " << (_readStream.b.end() - start); - } - out << " bytes via " << _endpoint->protocol() << "\n" << toString(); + out << _readStream.b.size(); } - - if(_observer && !_readHeader) + else { - _observer.finishRead(_readStream); + out << (_readStream.i - start) << " of " << (_readStream.b.end() - start); } + out << " bytes via " << _endpoint->protocol() << "\n" << toString(); + } + + if(_observer && !_readHeader) + { + _observer.finishRead(_readStream); } } } @@ -2517,7 +2507,7 @@ Ice::ConnectionI::heartbeat() bool Ice::ConnectionI::initialize(SocketOperation operation) { - SocketOperation s = _transceiver->initialize(_readStream, _writeStream, _hasMoreData); + SocketOperation s = _transceiver->initialize(_readStream, _writeStream); if(s != SocketOperationNone) { scheduleTimeout(s); @@ -3548,7 +3538,7 @@ SocketOperation ConnectionI::read(Buffer& buf) { Buffer::Container::iterator start = buf.i; - SocketOperation op = _transceiver->read(buf, _hasMoreData); + SocketOperation op = _transceiver->read(buf); if(_instance->traceLevels()->network >= 3 && buf.i != start) { Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); diff --git a/cpp/src/Ice/EventHandler.cpp b/cpp/src/Ice/EventHandler.cpp index 8ba48464184..6edea2d9307 100644 --- a/cpp/src/Ice/EventHandler.cpp +++ b/cpp/src/Ice/EventHandler.cpp @@ -18,14 +18,14 @@ IceUtil::Shared* IceInternal::upCast(EventHandler* p) { return p; } IceInternal::EventHandler::EventHandler() : #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - _ready(SocketOperationNone), _pending(SocketOperationNone), _started(SocketOperationNone), + _completed(SocketOperationNone), _finish(false), #else _disabled(SocketOperationNone), #endif - _hasMoreData(false), + _ready(SocketOperationNone), _registered(SocketOperationNone) { } diff --git a/cpp/src/Ice/EventHandler.h b/cpp/src/Ice/EventHandler.h index afbd608b961..a47a1899d10 100644 --- a/cpp/src/Ice/EventHandler.h +++ b/cpp/src/Ice/EventHandler.h @@ -20,7 +20,7 @@ namespace IceInternal { -class ICE_API EventHandler : virtual public ::IceUtil::Shared +class ICE_API EventHandler : virtual public ::Ice::LocalObject { public: @@ -53,19 +53,19 @@ public: virtual NativeInfoPtr getNativeInfo() = 0; protected: - + EventHandler(); virtual ~EventHandler(); #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - SocketOperation _ready; SocketOperation _pending; SocketOperation _started; + SocketOperation _completed; bool _finish; #else SocketOperation _disabled; #endif - bool _hasMoreData; + SocketOperation _ready; SocketOperation _registered; friend class ThreadPool; diff --git a/cpp/src/Ice/Network.cpp b/cpp/src/Ice/Network.cpp index c151c4b9af6..cc7cd41a812 100644 --- a/cpp/src/Ice/Network.cpp +++ b/cpp/src/Ice/Network.cpp @@ -762,7 +762,14 @@ getAddressStorageSize(const Address& addr) } +void +NativeInfo::setReadyCallback(const ReadyCallbackPtr& callback) +{ + _readyCallback = callback; +} + #ifdef ICE_USE_IOCP + IceInternal::AsyncInfo::AsyncInfo(SocketOperation s) { ZeroMemory(this, sizeof(AsyncInfo)); @@ -786,6 +793,7 @@ IceInternal::NativeInfo::completed(SocketOperation operation) throw ex; } } + #elif defined(ICE_OS_WINRT) void diff --git a/cpp/src/Ice/Network.h b/cpp/src/Ice/Network.h index 32182784a6d..a398fa1209c 100644 --- a/cpp/src/Ice/Network.h +++ b/cpp/src/Ice/Network.h @@ -24,8 +24,8 @@ #include <Ice/ProtocolInstanceF.h> #include <Ice/EndpointTypes.h> -#ifdef ICE_OS_WINRT -# include <Ice/EventHandlerF.h> +#if defined(ICE_OS_WINRT) +// Nothing to include #elif defined(_WIN32) # include <winsock2.h> # include <ws2tcpip.h> @@ -186,6 +186,14 @@ struct ICE_API AsyncInfo delegate void SocketOperationCompletedHandler(int); #endif +class ICE_API ReadyCallback : virtual public ::IceUtil::Shared +{ +public: + + virtual void ready(SocketOperation, bool) = 0; +}; +typedef IceUtil::Handle<ReadyCallback> ReadyCallbackPtr; + class ICE_API NativeInfo : virtual public IceUtil::Shared { public: @@ -199,6 +207,14 @@ public: return _fd; } + void setReadyCallback(const ReadyCallbackPtr& callback); + + void ready(SocketOperation operation, bool value) + { + assert(_readyCallback); + _readyCallback->ready(operation, value); + } + // // This is implemented by transceiver and acceptor implementations. // @@ -214,6 +230,7 @@ public: protected: SOCKET _fd; + ReadyCallbackPtr _readyCallback; #if defined(ICE_USE_IOCP) HANDLE _handle; diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp index af4a5b07c52..e01f3d7f65a 100644 --- a/cpp/src/Ice/Selector.cpp +++ b/cpp/src/Ice/Selector.cpp @@ -23,97 +23,14 @@ using namespace std; using namespace IceInternal; #ifdef ICE_OS_WINRT -using namespace Windows::Foundation; +//using namespace Windows::Foundation; using namespace Windows::Storage::Streams; using namespace Windows::Networking; using namespace Windows::Networking::Sockets; -Selector::Selector(const InstancePtr& instance) : _instance(instance) -{ -} - -void -Selector::destroy() -{ -} - -void -Selector::initialize(IceInternal::EventHandler* handler) -{ - EventHandlerPtr h = handler; - handler->__incRef(); - handler->getNativeInfo()->setCompletedHandler( - ref new SocketOperationCompletedHandler([=](int operation) - { - // - // Use the reference counted handler to ensure it's not - // destroyed as long as the callback lambda exists. - // - completed(h, static_cast<SocketOperation>(operation)); - })); -} - -void -Selector::update(IceInternal::EventHandler* handler, SocketOperation remove, SocketOperation add) -{ - handler->_registered = static_cast<SocketOperation>(handler->_registered & ~remove); - handler->_registered = static_cast<SocketOperation>(handler->_registered | add); - if(add & SocketOperationRead && !(handler->_pending & SocketOperationRead)) - { - handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationRead); - completed(handler, SocketOperationRead); // Start an asynchrnous read - } - else if(add & SocketOperationWrite && !(handler->_pending & SocketOperationWrite)) - { - handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationWrite); - completed(handler, SocketOperationWrite); // Start an asynchrnous write - } -} - -void -Selector::finish(IceInternal::EventHandler* handler) -{ - handler->_registered = SocketOperationNone; - handler->_finish = false; // Ensures that finished() is only called once on the event handler. - handler->__decRef(); -} - -IceInternal::EventHandlerPtr -Selector::getNextHandler(SocketOperation& status, int timeout) -{ - Lock lock(*this); - while(_events.empty()) - { - if(timeout > 0) - { - timedWait(IceUtil::Time::seconds(timeout)); - if(_events.empty()) - { - throw SelectorTimeoutException(); - } - } - else - { - wait(); - } - } - assert(!_events.empty()); - IceInternal::EventHandlerPtr handler = _events.front().handler; - const SelectEvent& event = _events.front(); - status = event.status; - _events.pop_front(); - return handler; -} - -void -Selector::completed(const IceInternal::EventHandlerPtr& handler, SocketOperation op) -{ - Lock lock(*this); - _events.push_back(SelectEvent(handler, op)); - notify(); -} +#endif -#elif defined(ICE_USE_IOCP) +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) Selector::Selector(const InstancePtr& instance) : _instance(instance) { @@ -123,6 +40,7 @@ Selector::~Selector() { } +#ifdef ICE_USE_IOCP void Selector::setup(int sizeIO) { @@ -134,16 +52,24 @@ Selector::setup(int sizeIO) throw ex; } } +#endif void Selector::destroy() { +#ifdef ICE_USE_IOCP CloseHandle(_handle); +#endif } void Selector::initialize(EventHandler* handler) { + if(!handler->getNativeInfo()) + { + return; + } +#ifdef ICE_USE_IOCP HANDLE socket = reinterpret_cast<HANDLE>(handler->getNativeInfo()->fd()); if(CreateIoCompletionPort(socket, _handle, reinterpret_cast<ULONG_PTR>(handler), 0) == NULL) { @@ -153,6 +79,20 @@ Selector::initialize(EventHandler* handler) } handler->__incRef(); handler->getNativeInfo()->initialize(_handle, reinterpret_cast<ULONG_PTR>(handler)); +#else + EventHandlerPtr h = handler; + handler->__incRef(); + handler->getNativeInfo()->setCompletedHandler( + ref new SocketOperationCompletedHandler( + [=](int operation) + { + // + // Use the reference counted handler to ensure it's not + // destroyed as long as the callback lambda exists. + // + completed(h.get(), static_cast<SocketOperation>(operation)); + })); +#endif } void @@ -160,26 +100,15 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation { handler->_registered = static_cast<SocketOperation>(handler->_registered & ~remove); handler->_registered = static_cast<SocketOperation>(handler->_registered | add); - AsyncInfo* info = 0; if(add & SocketOperationRead && !(handler->_pending & SocketOperationRead)) { handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationRead); - info = handler->getNativeInfo()->getAsyncInfo(SocketOperationRead); + completed(handler, SocketOperationRead); // Start an asynchrnous read } else if(add & SocketOperationWrite && !(handler->_pending & SocketOperationWrite)) { handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationWrite); - info = handler->getNativeInfo()->getAsyncInfo(SocketOperationWrite); - } - - if(info) - { - if(!PostQueuedCompletionStatus(_handle, 0, reinterpret_cast<ULONG_PTR>(handler), info)) - { - Ice::SocketException ex(__FILE__, __LINE__); - ex.error = GetLastError(); - throw ex; - } + completed(handler, SocketOperationWrite); // Start an asynchrnous write } } @@ -187,12 +116,36 @@ void Selector::finish(IceInternal::EventHandler* handler) { handler->_registered = SocketOperationNone; + handler->_finish = false; // Ensures that finished() is only called once on the event handler. handler->__decRef(); } +void +Selector::ready(EventHandler* handler, SocketOperation status, bool value) +{ + if(((handler->_ready & status) != 0) == value) + { + return; // Nothing to do if ready state already correctly set. + } + + if(value) + { + handler->_ready = static_cast<SocketOperation>(handler->_ready | status); + } + else + { + handler->_ready = static_cast<SocketOperation>(handler->_ready & ~status); + } +} + EventHandler* +#ifdef ICE_USE_IOCP Selector::getNextHandler(SocketOperation& status, DWORD& count, int& error, int timeout) +#else +Selector::getNextHandler(SocketOperation& status, int timeout) +#endif { +#ifdef ICE_USE_IOCP ULONG_PTR key; LPOVERLAPPED ol; error = 0; @@ -217,24 +170,86 @@ Selector::getNextHandler(SocketOperation& status, DWORD& count, int& error, int } } AsyncInfo* info = static_cast<AsyncInfo*>(ol); - status = info->status; + if(info) + { + status = info->status; + } count = SOCKET_ERROR; error = WSAGetLastError(); return reinterpret_cast<EventHandler*>(key); } - assert(ol); AsyncInfo* info = static_cast<AsyncInfo*>(ol); - status = info->status; + if(info) + { + status = info->status; + } + else + { + status = reinterpret_cast<EventHandler*>(key)->_ready; + } return reinterpret_cast<EventHandler*>(key); +#else + IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor); + while(_events.empty()) + { + if(timeout > 0) + { + _monitor.timedWait(IceUtil::Time::seconds(timeout)); + if(_events.empty()) + { + throw SelectorTimeoutException(); + } + } + else + { + _monitor.wait(); + } + } + assert(!_events.empty()); + IceInternal::EventHandlerPtr handler = _events.front().handler; + const SelectEvent& event = _events.front(); + status = event.status; + _events.pop_front(); + return handler.get(); +#endif } -#elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL) +void +Selector::completed(EventHandler* handler, SocketOperation op) +{ +#ifdef ICE_USE_IOCP + AsyncInfo* info = 0; + NativeInfoPtr nativeInfo = handler->getNativeInfo(); + if(nativeInfo) + { + info = nativeInfo->getAsyncInfo(op); + } + if(!PostQueuedCompletionStatus(_handle, 0, reinterpret_cast<ULONG_PTR>(handler), info)) + { + Ice::SocketException ex(__FILE__, __LINE__); + ex.error = GetLastError(); + throw ex; + } +#else + IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor); + _events.push_back(SelectEvent(handler, op)); + _monitor.notify(); +#endif +} -Selector::Selector(const InstancePtr& instance) : _instance(instance) +#elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL) || defined(ICE_USE_SELECT) || defined(ICE_USE_POLL) + +Selector::Selector(const InstancePtr& instance) : _instance(instance), _interrupted(false) { - _events.resize(256); + SOCKET fds[2]; + createPipe(fds); + _fdIntrRead = fds[0]; + _fdIntrWrite = fds[1]; + _selecting = false; + #if defined(ICE_USE_EPOLL) + _events.resize(256); _queueFd = epoll_create(1); if(_queueFd < 0) { @@ -242,7 +257,18 @@ Selector::Selector(const InstancePtr& instance) : _instance(instance) ex.error = IceInternal::getSocketErrno(); throw ex; } -#else + + epoll_event event; + memset(&event, 0, sizeof(epoll_event)); + event.data.ptr = 0; + event.events = EPOLLIN; + if(epoll_ctl(_queueFd, EPOLL_CTL_ADD, _fdIntrRead, &event) != 0) + { + Ice::Error out(_instance->initializationData().logger); + out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno()); + } +#elif defined(ICE_USE_KQUEUE) + _events.resize(256); _queueFd = kqueue(); if(_queueFd < 0) { @@ -250,7 +276,25 @@ Selector::Selector(const InstancePtr& instance) : _instance(instance) ex.error = getSocketErrno(); throw ex; } - _selecting = false; + + struct kevent ev; + EV_SET(&ev, _fdIntrRead, EVFILT_READ, EV_ADD, 0, 0, 0); + int rs = kevent(_queueFd, &ev, 1, 0, 0, 0); + if(rs < 0) + { + Ice::Error out(_instance->initializationData().logger); + out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno()); + } +#elif defined(ICE_USE_SELECT) + FD_ZERO(&_readFdSet); + FD_ZERO(&_writeFdSet); + FD_ZERO(&_errorFdSet); + FD_SET(_fdIntrRead, &_readFdSet); +#else + struct pollfd pollFd; + pollFd.fd = _fdIntrRead; + pollFd.events = POLLIN; + _pollFdSet.push_back(pollFd); #endif } @@ -261,6 +305,7 @@ Selector::~Selector() void Selector::destroy() { +#if defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL) try { closeSocket(_queueFd); @@ -270,6 +315,27 @@ Selector::destroy() Ice::Error out(_instance->initializationData().logger); out << "exception in selector while calling closeSocket():\n" << ex; } +#endif + + try + { + closeSocket(_fdIntrWrite); + } + catch(const Ice::LocalException& ex) + { + Ice::Error out(_instance->initializationData().logger); + out << "exception in selector while calling closeSocket():\n" << ex; + } + + try + { + closeSocket(_fdIntrRead); + } + catch(const Ice::LocalException& ex) + { + Ice::Error out(_instance->initializationData().logger); + out << "exception in selector while calling closeSocket():\n" << ex; + } } void @@ -282,9 +348,16 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation { return; } + checkReady(handler); + + NativeInfoPtr nativeInfo = handler->getNativeInfo(); + if(!nativeInfo || nativeInfo->fd() == INVALID_SOCKET) + { + return; + } - SOCKET fd = handler->getNativeInfo()->fd(); #if defined(ICE_USE_EPOLL) + SOCKET fd = nativeInfo->fd(); epoll_event event; memset(&event, 0, sizeof(epoll_event)); event.data.ptr = handler; @@ -318,7 +391,8 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation Ice::Error out(_instance->initializationData().logger); out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno()); } -#else // ICE_USE_KQUEUE +#elif defined(ICE_USE_KQUEUE) + SOCKET fd = nativeInfo->fd(); if(remove & SocketOperationRead) { struct kevent ev; @@ -349,22 +423,34 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation { updateSelector(); } +#else + _changes.push_back(make_pair(handler, static_cast<SocketOperation>(handler->_registered & ~handler->_disabled))); + wakeup(); #endif + checkReady(handler); } void Selector::enable(EventHandler* handler, SocketOperation status) { - if(!(handler->_disabled & status)) + NativeInfoPtr nativeInfo = handler->getNativeInfo(); + if(!nativeInfo || !(handler->_disabled & status)) { return; } handler->_disabled = static_cast<SocketOperation>(handler->_disabled & ~status); + checkReady(handler); + + NativeInfoPtr nativeInfo = handler->getNativeInfo(); + if(!nativeInfo || nativeInfo->fd() == INVALID_SOCKET) + { + return; + } if(handler->_registered & status) { - SOCKET fd = handler->getNativeInfo()->fd(); #if defined(ICE_USE_EPOLL) + SOCKET fd = nativeInfo->fd(); SocketOperation previous = static_cast<SocketOperation>(handler->_registered & ~(handler->_disabled | status)); SocketOperation newStatus = static_cast<SocketOperation>(handler->_registered & ~handler->_disabled); epoll_event event; @@ -377,31 +463,44 @@ Selector::enable(EventHandler* handler, SocketOperation status) Ice::Error out(_instance->initializationData().logger); out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno()); } -#else // ICE_USE_KQUEUE +#elif defined(ICE_USE_KQUEUE) struct kevent ev; + SOCKET fd = handler->getNativeInfo()->fd(); EV_SET(&ev, fd, status == SocketOperationRead ? EVFILT_READ : EVFILT_WRITE, EV_ENABLE, 0, 0, handler); _changes.push_back(ev); if(_selecting) { updateSelector(); } +#else + _changes.push_back(make_pair(handler, static_cast<SocketOperation>(handler->_registered & ~handler->_disabled))); + wakeup(); #endif } + checkReady(handler); } void Selector::disable(EventHandler* handler, SocketOperation status) { - if(handler->_disabled & status) + NativeInfoPtr nativeInfo = handler->getNativeInfo(); + if(!nativeInfo || handler->_disabled & status) { return; } handler->_disabled = static_cast<SocketOperation>(handler->_disabled | status); + checkReady(handler); + + NativeInfoPtr nativeInfo = handler->getNativeInfo(); + if(!nativeInfo || nativeInfo->fd() == INVALID_SOCKET) + { + return; + } if(handler->_registered & status) { - SOCKET fd = handler->getNativeInfo()->fd(); #if defined(ICE_USE_EPOLL) + SOCKET fd = nativeInfo->fd(); SocketOperation newStatus = static_cast<SocketOperation>(handler->_registered & ~handler->_disabled); epoll_event event; memset(&event, 0, sizeof(epoll_event)); @@ -413,7 +512,8 @@ Selector::disable(EventHandler* handler, SocketOperation status) Ice::Error out(_instance->initializationData().logger); out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno()); } -#else // ICE_USE_KQUEUE +#elif defined(ICE_USE_KQUEUE) + SOCKET fd = nativeInfo->fd(); struct kevent ev; EV_SET(&ev, fd, status == SocketOperationRead ? EVFILT_READ : EVFILT_WRITE, EV_DISABLE, 0, 0, handler); _changes.push_back(ev); @@ -421,8 +521,12 @@ Selector::disable(EventHandler* handler, SocketOperation status) { updateSelector(); } +#else + _changes.push_back(make_pair(handler, static_cast<SocketOperation>(handler->_registered & ~handler->_disabled))); + wakeup(); #endif } + checkReady(handler); } bool @@ -431,7 +535,11 @@ Selector::finish(EventHandler* handler, bool closeNow) if(handler->_registered) { update(handler, handler->_registered, SocketOperationNone); +#if !defined(ICE_USE_EPOLL) && !defined(ICE_USE_KQUEUE) + return false; // Don't close now if selecting +#endif } + #if defined(ICE_USE_KQUEUE) if(closeNow && !_changes.empty()) { @@ -446,43 +554,261 @@ Selector::finish(EventHandler* handler, bool closeNow) return closeNow; } -#if defined(ICE_USE_KQUEUE) void -Selector::updateSelector() +Selector::ready(EventHandler* handler, SocketOperation status, bool value) { - int rs = kevent(_queueFd, &_changes[0], _changes.size(), 0, 0, 0); - if(rs < 0) + if(((handler->_ready & status) != 0) == value) { - Ice::Error out(_instance->initializationData().logger); - out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno()); + return; // Nothing to do if ready state already correctly set. } - _changes.clear(); + + if(value) + { + handler->_ready = static_cast<SocketOperation>(handler->_ready | status); + } + else + { + handler->_ready = static_cast<SocketOperation>(handler->_ready & ~status); + } + checkReady(handler); } + +void +Selector::wakeup() +{ + if(_selecting && !_interrupted) + { + char c = 0; + while(true) + { + if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR) + { + if(interrupted()) + { + continue; + } + + Ice::SocketException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + break; + } + _interrupted = true; + } +} + +void +Selector::startSelect() +{ + if(_interrupted) + { + char c; + while(true) + { + ssize_t ret = ::read(_fdIntrRead, &c, 1); + if(ret == SOCKET_ERROR) + { + if(interrupted()) + { + continue; + } + Ice::SocketException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + break; + } + _interrupted = false; + } +} + +#if !defined(ICE_USE_EPOLL) + if(!_changes.empty()) + { + updateSelector(); + } #endif + _selecting = true; + + // + // If there are ready handlers, don't block in select, just do a non-blocking + // select to retrieve new ready handlers from the Java selector. + // + _selectNow = !_readyHandlers.empty(); +} void -Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int timeout) +Selector::finishSelect(vector<pair<EventHandler*, SocketOperation> >& handlers) { - int ret = 0; - while(true) + _selecting = false; + + assert(handlers.empty()); + +#if defined(ICE_USE_POLL) || defined(ICE_USE_SELECT) + if(_interrupted) // Interrupted, we have to process the interrupt before returning any handlers { + return; + } +#endif + +#if defined(ICE_USE_POLL) + for(vector<struct pollfd>::const_iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r) +#else + for(int i = 0; i < _count; ++i) +#endif + { + pair<EventHandler*, SocketOperation> p; + #if defined(ICE_USE_EPOLL) - ret = epoll_wait(_queueFd, &_events[0], _events.size(), timeout > 0 ? timeout * 1000 : -1); + struct epoll_event& ev = _events[i]; + p.first = reinterpret_cast<EventHandler*>(ev.data.ptr); + p.second = static_cast<SocketOperation>(((ev.events & (EPOLLIN | EPOLLERR)) ? + SocketOperationRead : SocketOperationNone) | + ((ev.events & (EPOLLOUT | EPOLLERR)) ? + SocketOperationWrite : SocketOperationNone)); +#elif defined(ICE_USE_KQUEUE) + struct kevent& ev = _events[i]; + if(ev.flags & EV_ERROR) + { + Ice::Error out(_instance->initializationData().logger); + out << "selector returned error:\n" << IceUtilInternal::errorToString(ev.data); + continue; + } + p.first = reinterpret_cast<EventHandler*>(ev.udata); + p.second = (ev.filter == EVFILT_READ) ? SocketOperationRead : SocketOperationWrite; +#elif defined(ICE_USE_SELECT) + // + // Round robin for the filedescriptors. + // + SOCKET fd; + p.second = SocketOperationNone; + if(i < _selectedReadFdSet.fd_count) + { + fd = _selectedReadFdSet.fd_array[i]; + p.second = static_cast<SocketOperation>(p.second | SocketOperationRead); + } + else if(i < _selectedWriteFdSet.fd_count + _selectedReadFdSet.fd_count) + { + fd = _selectedWriteFdSet.fd_array[i - _selectedReadFdSet.fd_count]; + p.second = static_cast<SocketOperation>(p.second | SocketOperationWrite); + } + else + { + fd = _selectedErrorFdSet.fd_array[i - _selectedReadFdSet.fd_count - _selectedWriteFdSet.fd_count]; + p.second = static_cast<SocketOperation>(p.second | SocketOperationConnect); + } + + assert(fd != _fdIntrRead); + p.first = _handlers[fd]; #else + if(r->revents == 0) + { + continue; + } + + SOCKET fd = r->fd; + assert(_handlers.find(fd) != _handlers.end()); + p.first = _handlers[fd]; + p.second = SocketOperationNone; + if(r->revents & (POLLIN | POLLERR | POLLHUP)) + { + p.second = static_cast<SocketOperation>(p.second | SocketOperationRead); + } + if(r->revents & (POLLOUT | POLLERR | POLLHUP)) + { + p.second = static_cast<SocketOperation>(p.second | SocketOperationWrite); + } + assert(p.second); +#endif + if(!p.first) + { + continue; // Interrupted + } + + map<EventHandlerPtr, SocketOperation>::iterator q = _readyHandlers.find(p.first); + if(q != _readyHandlers.end()) // Handler will be added by the loop below + { + q->second = p.second; // We just remember which operations are ready here. + } + else + { + handlers.push_back(p); + } + } + + for(map<EventHandlerPtr, SocketOperation>::iterator q = _readyHandlers.begin(); q != _readyHandlers.end(); ++q) + { + pair<EventHandler*, SocketOperation> p; + p.first = q->first.get(); + p.second = static_cast<SocketOperation>(p.first->_ready & ~p.first->_disabled & p.first->_registered); + p.second = static_cast<SocketOperation>(p.second | q->second); + if(p.second) + { + handlers.push_back(p); + } + + // + // Reset the operation, it's only used by this method to temporarly store the socket status + // return by the select operation above. + // + q->second = SocketOperationNone; + } +} + +void +Selector::select(int timeout) +{ + if(_selectNow) + { + timeout = 0; + } + else if(timeout > 0) + { + timeout = timeout * 1000; + } + else + { + timeout = -1; + } + + while(true) + { +#if defined(ICE_USE_EPOLL) + _count = epoll_wait(_queueFd, &_events[0], _events.size(), timeout); +#elif defined(ICE_USE_KQUEUE) assert(!_events.empty()); - if(timeout > 0) + if(timeout >= 0) { struct timespec ts; ts.tv_sec = timeout; ts.tv_nsec = 0; - ret = kevent(_queueFd, 0, 0, &_events[0], _events.size(), &ts); + _count = kevent(_queueFd, 0, 0, &_events[0], _events.size(), &ts); } else { - ret = kevent(_queueFd, 0, 0, &_events[0], _events.size(), 0); + _count = kevent(_queueFd, 0, 0, &_events[0], _events.size(), 0); } +#elif defined(ICE_USE_SELECT) + fd_set* rFdSet = fdSetCopy(_selectedReadFdSet, _readFdSet); + fd_set* wFdSet = fdSetCopy(_selectedWriteFdSet, _writeFdSet); + fd_set* eFdSet = fdSetCopy(_selectedErrorFdSet, _errorFdSet); + if(timeout >= 0) + { + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + _count = ::select(0, rFdSet, wFdSet, eFdSet, &tv); // The first parameter is ignored on Windows + } + else + { + _count = ::select(0, rFdSet, wFdSet, eFdSet, 0); // The first parameter is ignored on Windows + } +#else + _count = poll(&_pollFdSet[0], _pollFdSet.size(), timeout); #endif - if(ret == SOCKET_ERROR) + + if(_count == SOCKET_ERROR) { if(interrupted()) { @@ -499,35 +825,326 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti break; } - if(ret == 0) + if(_count == 0 && !_selectNow) { throw SelectorTimeoutException(); } +} - assert(ret > 0); - for(int i = 0; i < ret; ++i) +void +Selector::checkReady(EventHandler* handler) +{ + if(handler->_ready & ~handler->_disabled & handler->_registered) { - pair<EventHandler*, SocketOperation> p; -#if defined(ICE_USE_EPOLL) - struct epoll_event& ev = _events[i]; - p.first = reinterpret_cast<EventHandler*>(ev.data.ptr); - p.second = static_cast<SocketOperation>(((ev.events & (EPOLLIN | EPOLLERR)) ? - SocketOperationRead : SocketOperationNone) | - ((ev.events & (EPOLLOUT | EPOLLERR)) ? - SocketOperationWrite : SocketOperationNone)); + _readyHandlers.insert(make_pair(handler, SocketOperationNone)); + wakeup(); + } + else + { + map<EventHandlerPtr, SocketOperation>::iterator p = _readyHandlers.find(handler); + if(p != _readyHandlers.end()) + { + _readyHandlers.erase(p); + } + } +} + +void +Selector::updateSelector() +{ +#if defined(ICE_USE_KQUEUE) + int rs = kevent(_queueFd, &_changes[0], _changes.size(), 0, 0, 0); + if(rs < 0) + { + Ice::Error out(_instance->initializationData().logger); + out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno()); + } + _changes.clear(); +#elif !defined(ICE_USE_EPOLL) + assert(!_selecting); + + for(vector<pair<EventHandler*, SocketOperation> >::const_iterator p = _changes.begin(); p != _changes.end(); ++p) + { + EventHandler* handler = p->first; + SocketOperation status = p->second; + + SOCKET fd = handler->getNativeInfo()->fd(); + if(status) + { +#if defined(ICE_USE_SELECT) + if(status & SocketOperationRead) + { + FD_SET(fd, &_readFdSet); + } + else + { + FD_CLR(fd, &_readFdSet); + } + if(status & SocketOperationWrite) + { + FD_SET(fd, &_writeFdSet); + } + else + { + FD_CLR(fd, &_writeFdSet); + } + if(status & SocketOperationConnect) + { + FD_SET(fd, &_writeFdSet); + FD_SET(fd, &_errorFdSet); + } + else + { + FD_CLR(fd, &_writeFdSet); + FD_CLR(fd, &_errorFdSet); + } + _handlers[fd] = handler; #else - struct kevent& ev = _events[i]; - if(ev.flags & EV_ERROR) + short events = 0; + if(status & SocketOperationRead) + { + events |= POLLIN; + } + if(status & SocketOperationWrite) + { + events |= POLLOUT; + } + map<SOCKET, EventHandler*>::const_iterator q = _handlers.find(fd); + if(q == _handlers.end()) + { + struct pollfd pollFd; + pollFd.fd = fd; + pollFd.events = events; + pollFd.revents = 0; + _pollFdSet.push_back(pollFd); + _handlers.insert(make_pair(fd, handler)); + } + else + { + for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r) + { + if(r->fd == fd) + { + r->events = events; + break; + } + } + } +#endif + } + else { - Ice::Error out(_instance->initializationData().logger); - out << "selector returned error:\n" << IceUtilInternal::errorToString(ev.data); - continue; +#if defined(ICE_USE_SELECT) + FD_CLR(fd, &_readFdSet); + FD_CLR(fd, &_writeFdSet); + FD_CLR(fd, &_errorFdSet); +#else + for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r) + { + if(r->fd == fd) + { + _pollFdSet.erase(r); + break; + } + } +#endif + _handlers.erase(fd); } - p.first = reinterpret_cast<EventHandler*>(ev.udata); - p.second = (ev.filter == EVFILT_READ) ? SocketOperationRead : SocketOperationWrite; + } + _changes.clear(); +#endif +} + +#elif defined(ICE_USE_CFSTREAM) + +namespace +{ + +void selectorInterrupt(void* info) +{ + reinterpret_cast<Selector*>(info)->processInterrupt(); +} + +void eventHandlerSocketCallback(CFSocketRef, CFSocketCallBackType callbackType, CFDataRef, const void* d, void* info) +{ + if(callbackType == kCFSocketReadCallBack) + { + reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationRead); + } + else if(callbackType == kCFSocketWriteCallBack) + { + reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationWrite); + } + else if(callbackType == kCFSocketConnectCallBack) + { + reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationConnect, + d ? *reinterpret_cast<const SInt32*>(d) : 0); + } +} + +class SelectorHelperThread : public IceUtil::Thread +{ +public: + + SelectorHelperThread(Selector& selector) : _selector(selector) + { + } + + virtual void run() + { + _selector.run(); + } + +private: + + Selector& _selector; +}; + +CFOptionFlags +toCFCallbacks(SocketOperation op) +{ + CFOptionFlags cbs = 0; + if(op & SocketOperationRead) + { + cbs |= kCFSocketReadCallBack; + } + if(op & SocketOperationWrite) + { + cbs |= kCFSocketWriteCallBack; + } + + if(_count == 0 && !_selectNow) + { + throw SelectorTimeoutException(); + } +} + +} + +EventHandlerWrapper::EventHandlerWrapper(const EventHandlerPtr& handler, Selector& selector) : + _handler(handler), + _streamNativeInfo(StreamNativeInfoPtr::dynamicCast(handler->getNativeInfo())), + _selector(selector), + _ready(SocketOperationNone), + _finish(false), + _socket(0), + _source(0) +{ + if(_streamNativeInfo) + { + _streamNativeInfo->initStreams(this); + } + else if(handler->getNativeInfo()) + { + _readyHandlers.insert(make_pair(handler, SocketOperationNone)); + wakeup(); + } +} + +void +Selector::updateSelector() +{ +#if defined(ICE_USE_KQUEUE) + int rs = kevent(_queueFd, &_changes[0], _changes.size(), 0, 0, 0); + if(rs < 0) + { + Ice::Error out(_instance->initializationData().logger); + out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno()); + } + _changes.clear(); +#elif !defined(ICE_USE_EPOLL) + assert(!_selecting); + + for(vector<pair<EventHandler*, SocketOperation> >::const_iterator p = _changes.begin(); p != _changes.end(); ++p) + { + EventHandler* handler = p->first; + SocketOperation status = p->second; + + SOCKET fd = handler->getNativeInfo()->fd(); + if(status) + { +#if defined(ICE_USE_SELECT) + if(status & SocketOperationRead) + { + FD_SET(fd, &_readFdSet); + } + else + { + FD_CLR(fd, &_readFdSet); + } + if(status & SocketOperationWrite) + { + FD_SET(fd, &_writeFdSet); + } + else + { + FD_CLR(fd, &_writeFdSet); + } + if(status & SocketOperationConnect) + { + FD_SET(fd, &_writeFdSet); + FD_SET(fd, &_errorFdSet); + } + else + { + FD_CLR(fd, &_writeFdSet); + FD_CLR(fd, &_errorFdSet); + } + _handlers[fd] = handler; +#else + short events = 0; + if(status & SocketOperationRead) + { + events |= POLLIN; + } + if(status & SocketOperationWrite) + { + events |= POLLOUT; + } + map<SOCKET, EventHandler*>::const_iterator q = _handlers.find(fd); + if(q == _handlers.end()) + { + struct pollfd pollFd; + pollFd.fd = fd; + pollFd.events = events; + pollFd.revents = 0; + _pollFdSet.push_back(pollFd); + _handlers.insert(make_pair(fd, handler)); + } + else + { + for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r) + { + if(r->fd == fd) + { + r->events = events; + break; + } + } + } #endif - handlers.push_back(p); + } + else + { +#if defined(ICE_USE_SELECT) + FD_CLR(fd, &_readFdSet); + FD_CLR(fd, &_writeFdSet); + FD_CLR(fd, &_errorFdSet); +#else + for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r) + { + if(r->fd == fd) + { + _pollFdSet.erase(r); + break; + } + } +#endif + _handlers.erase(fd); + } } + _changes.clear(); +#endif } #elif defined(ICE_USE_CFSTREAM) @@ -667,15 +1284,15 @@ EventHandlerWrapper::updateRunLoop() } else { - SocketOperation readyOp = _nativeInfo->registerWithRunLoop(op); + SocketOperation readyOp = _streamNativeInfo->registerWithRunLoop(op); if(!(op & (SocketOperationWrite | SocketOperationConnect)) || _ready & SocketOperationWrite) { - _nativeInfo->unregisterFromRunLoop(SocketOperationWrite, false); + _streamNativeInfo->unregisterFromRunLoop(SocketOperationWrite, false); } if(!(op & (SocketOperationRead | SocketOperationConnect)) || _ready & SocketOperationRead) { - _nativeInfo->unregisterFromRunLoop(SocketOperationRead, false); + _streamNativeInfo->unregisterFromRunLoop(SocketOperationRead, false); } if(readyOp) @@ -685,7 +1302,7 @@ EventHandlerWrapper::updateRunLoop() if(_finish) { - _nativeInfo->closeStreams(); + _streamNativeInfo->closeStreams(); } } } @@ -707,7 +1324,7 @@ EventHandlerWrapper::ready(SocketOperation op, int error) // stream (which can't be used from another thread than the run loop thread if // it's registered with a run loop). // - op = _nativeInfo->unregisterFromRunLoop(op, error != 0); + op = _streamNativeInfo->unregisterFromRunLoop(op, error != 0); } op = static_cast<SocketOperation>(_handler->_registered & op); @@ -720,23 +1337,25 @@ EventHandlerWrapper::ready(SocketOperation op, int error) { if(op & SocketOperationConnect) { - _nativeInfo->setConnectError(error); + _streamNativeInfo->setConnectError(error); } } _ready = static_cast<SocketOperation>(_ready | op); - if(!(_handler->_disabled & op)) - { - _selector.addReadyHandler(this); - } + checkReady(); } -void +bool EventHandlerWrapper::checkReady() { - if(_ready & _handler->_registered) + if((_ready | _handler->_ready) & ~_handler->_disabled & _handler->_registered) { _selector.addReadyHandler(this); + return false; + } + else + { + return _handler->getNativeInfo() && !_finish; } } @@ -744,7 +1363,7 @@ SocketOperation EventHandlerWrapper::readyOp() { assert(!(~_handler->_registered & _ready)); - SocketOperation op = static_cast<SocketOperation>(~_handler->_disabled & _ready); + SocketOperation op = static_cast<SocketOperation>(~_handler->_disabled & (_ready | _handler->_ready)); _ready = static_cast<SocketOperation>(~op & _ready); return op; } @@ -762,15 +1381,16 @@ EventHandlerWrapper::update(SocketOperation remove, SocketOperation add) // Clear ready flags which might not be valid anymore. _ready = static_cast<SocketOperation>(_ready & _handler->_registered); - return true; + return _handler->getNativeInfo(); } -void +bool EventHandlerWrapper::finish() { _finish = true; _ready = SocketOperationNone; _handler->_registered = SocketOperationNone; + return _handler->getNativeInfo(); } Selector::Selector(const InstancePtr& instance) : _instance(instance), _destroyed(false) @@ -799,27 +1419,33 @@ Selector::~Selector() void Selector::destroy() { - Lock sync(*this); - - // - // Make sure any pending changes are processed to ensure remaining - // streams/sockets are closed. - // - _destroyed = true; - while(!_changes.empty()) { + Lock sync(*this); + + // + // Make sure any pending changes are processed to ensure remaining + // streams/sockets are closed. + // + _destroyed = true; CFRunLoopSourceSignal(_source); CFRunLoopWakeUp(_runLoop); - wait(); + while(!_changes.empty()) + { + CFRunLoopSourceSignal(_source); + CFRunLoopWakeUp(_runLoop); + + wait(); + } } _thread->getThreadControl().join(); _thread = 0; + Lock sync(*this); CFRelease(_source); - assert(_wrappers.empty()); + //assert(_wrappers.empty()); _readyHandlers.clear(); _selectedHandlers.clear(); } @@ -836,7 +1462,6 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation { Lock sync(*this); const EventHandlerWrapperPtr& wrapper = _wrappers[handler]; - assert(wrapper); if(wrapper->update(remove, add)) { _changes.insert(wrapper); @@ -878,39 +1503,82 @@ Selector::finish(EventHandler* handler, bool closeNow) std::map<EventHandler*, EventHandlerWrapperPtr>::iterator p = _wrappers.find(handler); assert(p != _wrappers.end()); EventHandlerWrapperPtr wrapper = p->second; - wrapper->finish(); + if(wrapper->finish()) + { + _changes.insert(wrapper); + notify(); + } _wrappers.erase(p); - _changes.insert(wrapper); - notify(); return closeNow; } void -Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handlers, int timeout) +Selector::ready(EventHandler* handler, SocketOperation status, bool value) +{ + if(((handler->_ready & status) != 0) == value) + { + return; // Nothing to do if ready state already correctly set. + } + + if(value) + { + handler->_ready = static_cast<SocketOperation>(handler->_ready | status); + } + else + { + handler->_ready = static_cast<SocketOperation>(handler->_ready & ~status); + } + + Lock sync(*this); + std::map<EventHandler*, EventHandlerWrapperPtr>::iterator p = _wrappers.find(handler); + assert(p != _wrappers.end()); + p->second->checkReady(); +} + +void +Selector::startSelect() { Lock sync(*this); // // Re-enable callbacks for previously selected handlers. // - if(!_selectedHandlers.empty()) + vector<pair<EventHandlerWrapperPtr, SocketOperation> >::const_iterator p; + for(p = _selectedHandlers.begin(); p != _selectedHandlers.end(); ++p) { - vector<pair<EventHandlerWrapperPtr, SocketOperation> >::const_iterator p; - for(p = _selectedHandlers.begin(); p != _selectedHandlers.end(); ++p) + if(p->first->checkReady()) { - if(!p->first->_finish) - { - _changes.insert(p->first); - } + _changes.insert(p->first); } - _selectedHandlers.clear(); } + _selectedHandlers.clear(); +} +void +Selector::finishSelect(std::vector<std::pair<EventHandler*, SocketOperation> >& handlers) +{ + Lock sync(*this); + handlers.clear(); + for(set<EventHandlerWrapperPtr>::const_iterator p = _readyHandlers.begin(); p != _readyHandlers.end(); ++p) + { + SocketOperation op = (*p)->readyOp(); + if(op) + { + _selectedHandlers.push_back(pair<EventHandlerWrapperPtr, SocketOperation>(*p, op)); + handlers.push_back(pair<EventHandler*, SocketOperation>((*p)->_handler.get(), op)); + } + } + _readyHandlers.clear(); +} + +void +Selector::select(int timeout) +{ // // Wait for handlers to be ready. // - handlers.clear(); - while(_selectedHandlers.empty()) + Lock sync(*this); + while(!_destroyed) { while(!_changes.empty()) { @@ -935,21 +1603,10 @@ Selector::select(std::vector<std::pair<EventHandler*, SocketOperation> >& handle } } - if(!_changes.empty()) - { - continue; // Make sure to process the changes first. - } - - for(vector<EventHandlerWrapperPtr>::const_iterator p = _readyHandlers.begin(); p != _readyHandlers.end(); ++p) + if(_changes.empty()) { - SocketOperation op = (*p)->readyOp(); - if(op) - { - _selectedHandlers.push_back(pair<EventHandlerWrapperPtr, SocketOperation>(*p, op)); - handlers.push_back(pair<EventHandler*, SocketOperation>((*p)->_handler.get(), op)); - } + break; } - _readyHandlers.clear(); } } @@ -973,24 +1630,6 @@ Selector::processInterrupt() } void -Selector::ready(EventHandlerWrapper* wrapper, SocketOperation op, int error) -{ - Lock sync(*this); - wrapper->ready(op, error); -} - -void -Selector::addReadyHandler(EventHandlerWrapper* wrapper) -{ - // Called from ready() - _readyHandlers.push_back(wrapper); - if(_readyHandlers.size() == 1) - { - notify(); - } -} - -void Selector::run() { { @@ -1004,408 +1643,24 @@ Selector::run() CFRunLoopRemoveSource(CFRunLoopGetCurrent(), _source, kCFRunLoopDefaultMode); } -#elif defined(ICE_USE_SELECT) || defined(ICE_USE_POLL) - -Selector::Selector(const InstancePtr& instance) : _instance(instance), _selecting(false), _interrupted(false) -{ - SOCKET fds[2]; - createPipe(fds); - _fdIntrRead = fds[0]; - _fdIntrWrite = fds[1]; -#if defined(ICE_USE_SELECT) - FD_ZERO(&_readFdSet); - FD_ZERO(&_writeFdSet); - FD_ZERO(&_errorFdSet); - FD_SET(_fdIntrRead, &_readFdSet); -#else - struct pollfd pollFd; - pollFd.fd = _fdIntrRead; - pollFd.events = POLLIN; - _pollFdSet.push_back(pollFd); -#endif -} - -Selector::~Selector() -{ - try - { - closeSocket(_fdIntrWrite); - } - catch(const Ice::LocalException& ex) - { - Ice::Error out(_instance->initializationData().logger); - out << "exception in selector while calling closeSocket():\n" << ex; - } - - try - { - closeSocket(_fdIntrRead); - } - catch(const Ice::LocalException& ex) - { - Ice::Error out(_instance->initializationData().logger); - out << "exception in selector while calling closeSocket():\n" << ex; - } -} - void -Selector::destroy() -{ -#if !defined(ICE_USE_SELECT) && !defined(ICE_USE_POLL) - assert(_events.empty()); -#endif -} - -void -Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation add) -{ - SocketOperation previous = handler->_registered; - handler->_registered = static_cast<SocketOperation>(handler->_registered & ~remove); - handler->_registered = static_cast<SocketOperation>(handler->_registered | add); - if(previous == handler->_registered) - { - return; - } - - updateImpl(handler); -} - -void -Selector::enable(EventHandler* handler, SocketOperation status) -{ - if(!(handler->_disabled & status)) - { - return; - } - handler->_disabled = static_cast<SocketOperation>(handler->_disabled & ~status); - - if(handler->_registered & status) - { - updateImpl(handler); - } -} - -void -Selector::disable(EventHandler* handler, SocketOperation status) -{ - if(handler->_disabled & status) - { - return; - } - handler->_disabled = static_cast<SocketOperation>(handler->_disabled | status); - - if(handler->_registered & status) - { - updateImpl(handler); - } -} - -bool -Selector::finish(EventHandler* handler, bool closeNow) -{ - if(handler->_registered) - { - update(handler, handler->_registered, SocketOperationNone); - return false; // Don't close now if selecting. - } - return closeNow; -} - -void -Selector::startSelect() -{ - if(_interrupted) - { - char c; - while(true) - { - ssize_t ret; -#ifdef _WIN32 - ret = ::recv(_fdIntrRead, &c, 1, 0); -#else - ret = ::read(_fdIntrRead, &c, 1); -#endif - if(ret == SOCKET_ERROR) - { - if(interrupted()) - { - continue; - } - - Ice::SocketException ex(__FILE__, __LINE__); - ex.error = IceInternal::getSocketErrno(); - throw ex; - } - break; - } - _interrupted = false; - - if(!_changes.empty()) - { - updateSelector(); - } - } - _selecting = true; -} - -void -Selector::finishSelect() +Selector::ready(EventHandlerWrapper* wrapper, SocketOperation op, int error) { - _selecting = false; + Lock sync(*this); + wrapper->ready(op, error); } void -Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int timeout) +Selector::addReadyHandler(EventHandlerWrapper* wrapper) { - int ret = 0; - while(true) - { -#if defined(ICE_USE_SELECT) - fd_set* rFdSet = fdSetCopy(_selectedReadFdSet, _readFdSet); - fd_set* wFdSet = fdSetCopy(_selectedWriteFdSet, _writeFdSet); - fd_set* eFdSet = fdSetCopy(_selectedErrorFdSet, _errorFdSet); - if(timeout > 0) - { - struct timeval tv; - tv.tv_sec = timeout; - tv.tv_usec = 0; - ret = ::select(0, rFdSet, wFdSet, eFdSet, &tv); // The first parameter is ignored on Windows - } - else - { - ret = ::select(0, rFdSet, wFdSet, eFdSet, 0); // The first parameter is ignored on Windows - } -#else - ret = poll(&_pollFdSet[0], _pollFdSet.size(), timeout > 0 ? timeout * 1000 : -1); -#endif - if(ret == SOCKET_ERROR) - { - if(interrupted()) - { - continue; - } - - { - Ice::SocketException ex(__FILE__, __LINE__, IceInternal::getSocketErrno()); - Ice::Error out(_instance->initializationData().logger); - out << "fatal error: selector failed:\n" << ex; - } - abort(); - } - break; - } - - if(ret == 0) - { - throw SelectorTimeoutException(); - } - - assert(ret > 0); - -#if defined(ICE_USE_SELECT) - if(_selectedReadFdSet.fd_count == 0 && _selectedWriteFdSet.fd_count == 0 && _selectedErrorFdSet.fd_count == 0) - { - Ice::Error out(_instance->initializationData().logger); - out << "select() in selector returned " << ret << " but no filedescriptor is ready"; - return; - } - - for(unsigned int i = 0; i < static_cast<unsigned int>(ret); ++i) - { - pair<EventHandler*, SocketOperation> p; - - // - // Round robin for the filedescriptors. - // - SOCKET fd; - p.second = SocketOperationNone; - if(i < _selectedReadFdSet.fd_count) - { - fd = _selectedReadFdSet.fd_array[i]; - p.second = static_cast<SocketOperation>(p.second | SocketOperationRead); - } - else if(i < _selectedWriteFdSet.fd_count + _selectedReadFdSet.fd_count) - { - fd = _selectedWriteFdSet.fd_array[i - _selectedReadFdSet.fd_count]; - p.second = static_cast<SocketOperation>(p.second | SocketOperationWrite); - } - else - { - fd = _selectedErrorFdSet.fd_array[i - _selectedReadFdSet.fd_count - _selectedWriteFdSet.fd_count]; - p.second = static_cast<SocketOperation>(p.second | SocketOperationConnect); - } - - if(fd == _fdIntrRead) // Interrupted, we have to process the interrupt before returning any handlers - { - handlers.clear(); - return; - } - - assert(_handlers.find(fd) != _handlers.end()); - p.first = _handlers[fd]; - handlers.push_back(p); - } -#else - if(_pollFdSet[0].revents == POLLIN) // Interrupted, we have to process the interrupt before returning any handlers - { - return; - } - - for(vector<struct pollfd>::const_iterator q = _pollFdSet.begin(); q != _pollFdSet.end(); ++q) + // Called from ready() + _readyHandlers.insert(wrapper); + if(_readyHandlers.size() == 1) { - pair<EventHandler*, SocketOperation> p; - if(q->revents != 0) - { - SOCKET fd = q->fd; - assert(fd != _fdIntrRead); - assert(_handlers.find(fd) != _handlers.end()); - p.first = _handlers[fd]; - p.second = SocketOperationNone; - if(q->revents & (POLLIN | POLLERR | POLLHUP)) - { - p.second = static_cast<SocketOperation>(p.second | SocketOperationRead); - } - if(q->revents & POLLOUT) - { - p.second = static_cast<SocketOperation>(p.second | SocketOperationWrite); - } - assert(p.second); - handlers.push_back(p); - } + notify(); } -#endif } -void -Selector::updateImpl(EventHandler* handler) -{ - SocketOperation status = static_cast<SocketOperation>(handler->_registered & ~handler->_disabled); - _changes.push_back(make_pair(handler, status)); - if(_selecting) - { - if(!_interrupted) - { - char c = 0; - while(true) - { -#ifdef _WIN32 - if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR) -#else - if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR) #endif - { - if(interrupted()) - { - continue; - } - - Ice::SocketException ex(__FILE__, __LINE__); - ex.error = IceInternal::getSocketErrno(); - throw ex; - } - break; - } - _interrupted = true; - } - } - else - { - updateSelector(); - } -} -void -Selector::updateSelector() -{ - for(vector<pair<EventHandler*, SocketOperation> >::const_iterator p = _changes.begin(); p != _changes.end(); ++p) - { - EventHandler* handler = p->first; - SocketOperation status = p->second; - - SOCKET fd = handler->getNativeInfo()->fd(); - if(status) - { -#if defined(ICE_USE_SELECT) - if(status & SocketOperationRead) - { - FD_SET(fd, &_readFdSet); - } - else - { - FD_CLR(fd, &_readFdSet); - } - if(status & SocketOperationWrite) - { - FD_SET(fd, &_writeFdSet); - } - else - { - FD_CLR(fd, &_writeFdSet); - } - if(status & SocketOperationConnect) - { - FD_SET(fd, &_writeFdSet); - FD_SET(fd, &_errorFdSet); - } - else - { - FD_CLR(fd, &_writeFdSet); - FD_CLR(fd, &_errorFdSet); - } - _handlers[fd] = handler; -#else - short events = 0; - if(status & SocketOperationRead) - { - events |= POLLIN; - } - if(status & SocketOperationWrite) - { - events |= POLLOUT; - } - map<SOCKET, EventHandler*>::const_iterator q = _handlers.find(fd); - if(q == _handlers.end()) - { - struct pollfd pollFd; - pollFd.fd = fd; - pollFd.events = events; - pollFd.revents = 0; - _pollFdSet.push_back(pollFd); - _handlers.insert(make_pair(fd, handler)); - } - else - { - for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r) - { - if(r->fd == fd) - { - r->events = events; - break; - } - } - } -#endif - } - else - { -#if defined(ICE_USE_SELECT) - FD_CLR(fd, &_readFdSet); - FD_CLR(fd, &_writeFdSet); - FD_CLR(fd, &_errorFdSet); -#else - for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r) - { - if(r->fd == fd) - { - _pollFdSet.erase(r); - break; - } - } -#endif - _handlers.erase(fd); - } - } - _changes.clear(); -} -#endif diff --git a/cpp/src/Ice/Selector.h b/cpp/src/Ice/Selector.h index e890db6e2bf..4098da30171 100644 --- a/cpp/src/Ice/Selector.h +++ b/cpp/src/Ice/Selector.h @@ -57,67 +57,59 @@ class SelectorTimeoutException { }; -#if defined(ICE_OS_WINRT) - -struct SelectEvent -{ - SelectEvent(const EventHandlerPtr& handler, SocketOperation status) : handler(handler), status(status) - { - } - EventHandlerPtr handler; - SocketOperation status; -}; +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) -class Selector : IceUtil::Monitor<IceUtil::Mutex> +class Selector { -public: - - Selector(const InstancePtr&); - - void destroy(); - - void initialize(EventHandler*); - void update(EventHandler*, SocketOperation, SocketOperation); - void finish(EventHandler*); - - EventHandlerPtr getNextHandler(SocketOperation&, int); - - void completed(const EventHandlerPtr&, SocketOperation); - -private: - - const InstancePtr _instance; - std::deque<SelectEvent> _events; -}; +#if defined(ICE_OS_WINRT) + struct SelectEvent + { + SelectEvent(const EventHandlerPtr& handler, SocketOperation status) : handler(handler), status(status) + { + } -#elif defined(ICE_USE_IOCP) + EventHandlerPtr handler; + SocketOperation status; + }; +#endif -class Selector -{ public: Selector(const InstancePtr&); ~Selector(); +#ifdef ICE_USE_IOCP void setup(int); - void destroy(); +#endif + void destroy(); void initialize(EventHandler*); void update(EventHandler*, SocketOperation, SocketOperation); void finish(EventHandler*); + void ready(EventHandler*, SocketOperation, bool); + +#ifdef ICE_USE_IOCP EventHandler* getNextHandler(SocketOperation&, DWORD&, int&, int); +#else + EventHandler* getNextHandler(SocketOperation&, int); +#endif + + void completed(EventHandler*, SocketOperation); - HANDLE getIOCPHandle() { return _handle; } - private: const InstancePtr _instance; +#ifdef ICE_USE_IOCP HANDLE _handle; +#else + IceUtil::Monitor<IceUtil::Mutex> _monitor; + std::deque<SelectEvent> _events; +#endif }; -#elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL) +#elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL) || defined(ICE_USE_SELECT) || defined(ICE_USE_POLL) class Selector { @@ -126,7 +118,7 @@ public: Selector(const InstancePtr&); ~Selector(); - void destroy(); + void destroy(); void initialize(EventHandler*) { @@ -137,43 +129,62 @@ public: void disable(EventHandler*, SocketOperation); bool finish(EventHandler*, bool); -#if defined(ICE_USE_KQUEUE) - void updateSelector(); -#endif - - void - startSelect() - { -#ifdef ICE_USE_KQUEUE - _selecting = true; - if(!_changes.empty()) - { - updateSelector(); - } -#endif - } - - void - finishSelect() - { -#ifdef ICE_USE_KQUEUE - _selecting = false; -#endif - } + void ready(EventHandler*, SocketOperation, bool); - void select(std::vector<std::pair<EventHandler*, SocketOperation> >&, int); + void startSelect(); + void finishSelect(std::vector<std::pair<EventHandler*, SocketOperation> >&); + void select(int); private: + void wakeup(); + void checkReady(EventHandler*); + void updateSelector(); + const InstancePtr _instance; + + SOCKET _fdIntrRead; + SOCKET _fdIntrWrite; + bool _interrupted; + bool _selectNow; + int _count; + bool _selecting; + std::map<EventHandlerPtr, SocketOperation> _readyHandlers; + #if defined(ICE_USE_EPOLL) std::vector<struct epoll_event> _events; -#else + int _queueFd; +#elif defined(ICE_USE_KQUEUE) std::vector<struct kevent> _events; std::vector<struct kevent> _changes; - bool _selecting; -#endif int _queueFd; +#elif defined(ICE_USE_SELECT) + std::vector<std::pair<EventHandler*, SocketOperation> > _changes; + std::map<SOCKET, EventHandler*> _handlers; + + fd_set _readFdSet; + fd_set _writeFdSet; + fd_set _errorFdSet; + fd_set _selectedReadFdSet; + fd_set _selectedWriteFdSet; + fd_set _selectedErrorFdSet; + + fd_set* + fdSetCopy(fd_set& dest, fd_set& src) + { + if(src.fd_count > 0) + { + dest.fd_count = src.fd_count; + memcpy(dest.fd_array, src.fd_array, sizeof(SOCKET) * src.fd_count); + return &dest; + } + return 0; + } +#elif defined(ICE_USE_POLL) + std::vector<std::pair<EventHandler*, SocketOperation> > _changes; + std::map<SOCKET, EventHandler*> _handlers; + std::vector<struct pollfd> _pollFdSet; +#endif }; #elif defined(ICE_USE_CFSTREAM) @@ -182,7 +193,7 @@ class Selector; class SelectorReadyCallback : public IceUtil::Shared { -public: +public: virtual ~SelectorReadyCallback() { } virtual void readyCallback(SocketOperation, int = 0) = 0; @@ -201,9 +212,9 @@ public: virtual SocketOperation unregisterFromRunLoop(SocketOperation, bool) = 0; virtual void closeStreams() = 0; - void setConnectError(int error) + void setConnectError(int error) { - _connectError = error; + _connectError = error; } private: @@ -222,14 +233,13 @@ public: void updateRunLoop(); virtual void readyCallback(SocketOperation, int = 0); - void ready(SocketOperation, int); SocketOperation readyOp(); - void checkReady(); + bool checkReady(); bool update(SocketOperation, SocketOperation); - void finish(); + bool finish(); bool operator<(const EventHandlerWrapper& o) { @@ -241,7 +251,7 @@ private: friend class Selector; EventHandlerPtr _handler; - StreamNativeInfoPtr _nativeInfo; + StreamNativeInfoPtr _streamNativeInfo; Selector& _selector; SocketOperation _ready; bool _finish; @@ -266,17 +276,22 @@ public: void disable(EventHandler*, SocketOperation); bool finish(EventHandler*, bool); - void startSelect() { } - void finishSelect() { } - void select(std::vector<std::pair<EventHandler*, SocketOperation> >&, int); + void ready(EventHandler*, SocketOperation, bool); + + void startSelect(); + void finishSelect(std::vector<std::pair<EventHandler*, SocketOperation> >&); + void select(int); void processInterrupt(); - void ready(EventHandlerWrapper*, SocketOperation, int = 0); - void addReadyHandler(EventHandlerWrapper*); void run(); private: + void ready(EventHandlerWrapper*, SocketOperation, int = 0); + void addReadyHandler(EventHandlerWrapper*); + + friend class EventHandlerWrapper; + InstancePtr _instance; IceUtil::ThreadPtr _thread; CFRunLoopRef _runLoop; @@ -285,74 +300,11 @@ private: std::set<EventHandlerWrapperPtr> _changes; - std::vector<EventHandlerWrapperPtr> _readyHandlers; + std::set<EventHandlerWrapperPtr> _readyHandlers; std::vector<std::pair<EventHandlerWrapperPtr, SocketOperation> > _selectedHandlers; std::map<EventHandler*, EventHandlerWrapperPtr> _wrappers; }; -#elif defined(ICE_USE_SELECT) || defined(ICE_USE_POLL) - -class Selector -{ -public: - - Selector(const InstancePtr&); - ~Selector(); - - void destroy(); - - void initialize(EventHandler*) - { - // Nothing to do - } - void update(EventHandler*, SocketOperation, SocketOperation); - void enable(EventHandler*, SocketOperation); - void disable(EventHandler*, SocketOperation); - bool finish(EventHandler*, bool); - - void startSelect(); - void finishSelect(); - void select(std::vector<std::pair<EventHandler*, SocketOperation> >&, int); - -private: - - void updateSelector(); - void updateImpl(EventHandler*); - - const InstancePtr _instance; - - SOCKET _fdIntrRead; - SOCKET _fdIntrWrite; - bool _selecting; - bool _interrupted; - - std::vector<std::pair<EventHandler*, SocketOperation> > _changes; - std::map<SOCKET, EventHandler*> _handlers; - -#if defined(ICE_USE_SELECT) - fd_set _readFdSet; - fd_set _writeFdSet; - fd_set _errorFdSet; - fd_set _selectedReadFdSet; - fd_set _selectedWriteFdSet; - fd_set _selectedErrorFdSet; - - fd_set* - fdSetCopy(fd_set& dest, fd_set& src) - { - if(src.fd_count > 0) - { - dest.fd_count = src.fd_count; - memcpy(dest.fd_array, src.fd_array, sizeof(SOCKET) * src.fd_count); - return &dest; - } - return 0; - } -#else - std::vector<struct pollfd> _pollFdSet; -#endif -}; - #endif } diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp index 703679df189..7ca16363d83 100644 --- a/cpp/src/Ice/TcpTransceiver.cpp +++ b/cpp/src/Ice/TcpTransceiver.cpp @@ -25,7 +25,7 @@ IceInternal::TcpTransceiver::getNativeInfo() } SocketOperation -IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool&) +IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer) { return _stream->connect(readBuffer, writeBuffer); } @@ -51,7 +51,7 @@ IceInternal::TcpTransceiver::write(Buffer& buf) } SocketOperation -IceInternal::TcpTransceiver::read(Buffer& buf, bool&) +IceInternal::TcpTransceiver::read(Buffer& buf) { return _stream->read(buf); } @@ -76,7 +76,7 @@ IceInternal::TcpTransceiver::startRead(Buffer& buf) } void -IceInternal::TcpTransceiver::finishRead(Buffer& buf, bool&) +IceInternal::TcpTransceiver::finishRead(Buffer& buf) { _stream->finishRead(buf); } diff --git a/cpp/src/Ice/TcpTransceiver.h b/cpp/src/Ice/TcpTransceiver.h index 16845238ae4..b192ab1bd44 100644 --- a/cpp/src/Ice/TcpTransceiver.h +++ b/cpp/src/Ice/TcpTransceiver.h @@ -28,16 +28,16 @@ public: virtual NativeInfoPtr getNativeInfo(); - virtual SocketOperation initialize(Buffer&, Buffer&, bool&); + virtual SocketOperation initialize(Buffer&, Buffer&); virtual SocketOperation closing(bool, const Ice::LocalException&); virtual void close(); virtual SocketOperation write(Buffer&); - virtual SocketOperation read(Buffer&, bool&); + virtual SocketOperation read(Buffer&); #ifdef ICE_USE_IOCP virtual bool startWrite(Buffer&); virtual void finishWrite(Buffer&); virtual void startRead(Buffer&); - virtual void finishRead(Buffer&, bool&); + virtual void finishRead(Buffer&); #endif virtual std::string protocol() const; virtual std::string toString() const; diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 7ff3f57db4b..0de8b971559 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -99,29 +99,6 @@ private: IceUtil::ThreadPtr _thread; }; -class InterruptWorkItem : public ThreadPoolWorkItem -{ -public: - - virtual void - execute(ThreadPoolCurrent& current) - { - // Nothing to do, this is just used to interrupt the thread pool selector. - } -}; -ThreadPoolWorkItemPtr interruptWorkItem; - -class InterruptWorkItemInit -{ -public: - - InterruptWorkItemInit() - { - interruptWorkItem = new InterruptWorkItem; - } -}; -InterruptWorkItemInit init; - // // Exception raised by the thread pool work queue when the thread pool // is destroyed. @@ -147,77 +124,38 @@ IceInternal::DispatchWorkItem::execute(ThreadPoolCurrent& current) current.dispatchFromThisThread(this); } -IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(const InstancePtr& instance, Selector& selector) : - _instance(instance), - _selector(selector), +IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(ThreadPool& threadPool) : + _threadPool(threadPool), _destroyed(false) -#ifdef ICE_USE_IOCP - , _info(SocketOperationRead) -#endif { -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - SOCKET fds[2]; - createPipe(fds); - _fdIntrRead = fds[0]; - _fdIntrWrite = fds[1]; - - _selector.initialize(this); - _selector.update(this, SocketOperationNone, SocketOperationRead); -#endif -} - -IceInternal::ThreadPoolWorkQueue::~ThreadPoolWorkQueue() -{ - assert(_destroyed); - -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - try - { - closeSocket(_fdIntrRead); - } - catch(const LocalException& ex) - { - Error out(_instance->initializationData().logger); - out << "exception in selector while calling closeSocket():\n" << ex; - } - - try - { - closeSocket(_fdIntrWrite); - } - catch(const LocalException& ex) - { - Error out(_instance->initializationData().logger); - out << "exception in selector while calling closeSocket():\n" << ex; - } -#endif + _registered = SocketOperationRead; } void IceInternal::ThreadPoolWorkQueue::destroy() { - Lock sync(*this); + //Lock sync(*this); Called with the thread pool locked assert(!_destroyed); _destroyed = true; - postMessage(); +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + _threadPool._selector.completed(this, SocketOperationRead); +#else + _threadPool._selector.ready(this, SocketOperationRead, true); +#endif } void IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item) { - Lock sync(*this); - if(_destroyed) - { - throw CommunicatorDestroyedException(__FILE__, __LINE__); - } + //Lock sync(*this); Called with the thread pool locked _workItems.push_back(item); -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + _threadPool._selector.completed(this, SocketOperationRead); +#else if(_workItems.size() == 1) { - postMessage(); + _threadPool._selector.ready(this, SocketOperationRead, true); } -#else - postMessage(); #endif } @@ -242,47 +180,24 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current) { ThreadPoolWorkItemPtr workItem; { - Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_threadPool); if(!_workItems.empty()) { workItem = _workItems.front(); _workItems.pop_front(); - -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - if(_workItems.empty()) - { - char c; - while(true) - { - ssize_t ret; -# ifdef _WIN32 - ret = ::recv(_fdIntrRead, &c, 1, 0); -# else - ret = ::read(_fdIntrRead, &c, 1); -# endif - if(ret == SOCKET_ERROR) - { - if(interrupted()) - { - continue; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - break; - } - } -#endif } +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) else { assert(_destroyed); -#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - postMessage(); -#endif + _threadPool._selector.completed(this, SocketOperationRead); } +#else + if(_workItems.empty() && !_destroyed) + { + _threadPool._selector.ready(this, SocketOperationRead, false); + } +#endif } if(workItem) @@ -291,6 +206,7 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current) } else { + assert(_destroyed); current.ioCompleted(); throw ThreadPoolDestroyedException(); } @@ -311,47 +227,7 @@ IceInternal::ThreadPoolWorkQueue::toString() const NativeInfoPtr IceInternal::ThreadPoolWorkQueue::getNativeInfo() { -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - return new NativeInfo(_fdIntrRead); -#else return 0; -#endif -} - -void -IceInternal::ThreadPoolWorkQueue::postMessage() -{ -#if defined(ICE_USE_IOCP) - if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this), &_info)) - { - SocketException ex(__FILE__, __LINE__); - ex.error = GetLastError(); - throw ex; - } -#elif defined(ICE_OS_WINRT) - _selector.completed(this, SocketOperationRead); -#else - char c = 0; - while(true) - { -# ifdef _WIN32 - if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR) -# else - if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR) -# endif - { - if(interrupted()) - { - continue; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - break; - } -#endif } IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) : @@ -469,7 +345,8 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority"); } - _workQueue = new ThreadPoolWorkQueue(_instance, _selector); + _workQueue = new ThreadPoolWorkQueue(*this); + _selector.initialize(_workQueue.get()); if(_instance->traceLevels()->threadPool >= 1) { @@ -528,7 +405,6 @@ IceInternal::ThreadPool::destroy() { return; } - _destroyed = true; _workQueue->destroy(); } @@ -549,6 +425,28 @@ IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler) Lock sync(*this); assert(!_destroyed); _selector.initialize(handler.get()); + + class ReadyCallbackI : public ReadyCallback + { + public: + + ReadyCallbackI(const ThreadPoolPtr& threadPool, const EventHandlerPtr& handler) : + _threadPool(threadPool), _handler(handler) + { + } + + virtual void + ready(SocketOperation op, bool value) + { + _threadPool->ready(_handler, op, value); + } + + private: + + const ThreadPoolPtr _threadPool; + const EventHandlerPtr _handler; + }; + handler->getNativeInfo()->setReadyCallback(new ReadyCallbackI(this, handler)); } void @@ -569,20 +467,6 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation } _selector.update(handler.get(), remove, add); -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - if(add & SocketOperationRead && handler->_hasMoreData && !(handler->_disabled & SocketOperationRead)) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(handler.get()); - } - else if(remove & SocketOperationRead) - { - _pendingHandlers.erase(handler.get()); - } -#endif } bool @@ -592,7 +476,6 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow) assert(!_destroyed); #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) closeNow = _selector.finish(handler.get(), closeNow); // This must be called before! - _pendingHandlers.erase(handler.get()); _workQueue->queue(new FinishedWorkItem(handler, !closeNow)); return closeNow; #else @@ -611,6 +494,17 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow) } void +IceInternal::ThreadPool::ready(const EventHandlerPtr& handler, SocketOperation op, bool value) +{ + Lock sync(*this); + if(_destroyed) + { + return; + } + _selector.ready(handler.get(), op, value); +} + +void IceInternal::ThreadPool::dispatchFromThisThread(const DispatchWorkItemPtr& workItem) { if(_dispatcher) @@ -645,6 +539,11 @@ IceInternal::ThreadPool::dispatchFromThisThread(const DispatchWorkItemPtr& workI void IceInternal::ThreadPool::dispatch(const DispatchWorkItemPtr& workItem) { + Lock sync(*this); + if(_destroyed) + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } _workQueue->queue(workItem); } @@ -663,10 +562,6 @@ IceInternal::ThreadPool::joinWithAllThreads() { (*p)->getThreadControl().join(); } - -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - _selector.finish(_workQueue.get(), true); -#endif _selector.destroy(); } @@ -682,7 +577,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) ThreadPoolCurrent current(_instance, this, thread); bool select = false; - vector<pair<EventHandler*, SocketOperation> > handlers; while(true) { if(current._handler) @@ -711,7 +605,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) { try { - _selector.select(handlers, _serverIdleTime); + _selector.select(_serverIdleTime); } catch(const SelectorTimeoutException&) { @@ -730,22 +624,8 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) { if(select) { - _handlers.swap(handlers); - if(!_pendingHandlers.empty()) - { - for(_nextHandler = _handlers.begin(); _nextHandler != _handlers.end(); ++_nextHandler) - { - _pendingHandlers.erase(_nextHandler->first); - } - set<EventHandler*>::const_iterator p; - for(p = _pendingHandlers.begin(); p != _pendingHandlers.end(); ++p) - { - _handlers.push_back(make_pair(*p, SocketOperationRead)); - } - _pendingHandlers.clear(); - } + _selector.finishSelect(_handlers); _nextHandler = _handlers.begin(); - _selector.finishSelect(); select = false; } else if(!current._leader && followerWait(current)) @@ -762,14 +642,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) // the IO thread count now. // --_inUseIO; - if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(current._handler.get()); - } } else { @@ -780,14 +652,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) if(_serialize) { _selector.enable(current._handler.get(), current.operation); - if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(current._handler.get()); - } } assert(_inUse > 0); --_inUse; @@ -798,19 +662,12 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) return; // Wait timed-out. } } - else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(current._handler.get()); - } // // Get the next ready handler. // - while(_nextHandler != _handlers.end() && !(_nextHandler->second & _nextHandler->first->_registered)) + while(_nextHandler != _handlers.end() && + !(_nextHandler->second & ~_nextHandler->first->_disabled & _nextHandler->first->_registered)) { ++_nextHandler; } @@ -1007,19 +864,6 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) if(_serialize) { _selector.disable(current._handler.get(), current.operation); - - // Make sure the handler isn't in the set of pending handlers (this can - // for example occur if the handler is has more data and its added by - // ThreadPool::update while we were processing IO). - _pendingHandlers.erase(current._handler.get()); - } - else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(current._handler.get()); } } @@ -1090,8 +934,8 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) if(current._handler->_started & current.operation) { - assert(!(current._handler->_ready & current.operation)); - current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready | current.operation); + assert(!(current._handler->_completed & current.operation)); + current._handler->_completed = static_cast<SocketOperation>(current._handler->_completed | current.operation); current._handler->_started = static_cast<SocketOperation>(current._handler->_started & ~current.operation); #ifndef ICE_OS_WINRT @@ -1105,20 +949,26 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish) { + Lock sync(*this); _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); } return false; } } - else if(!(current._handler->_ready & current.operation) && (current._handler->_registered & current.operation)) + else if(!(current._handler->_completed & current.operation) && (current._handler->_registered & current.operation)) { assert(!(current._handler->_started & current.operation)); - if(!current._handler->startAsync(current.operation)) + if(current._handler->_ready & current.operation) + { + return true; + } + else if(!current._handler->startAsync(current.operation)) { current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish) { + Lock sync(*this); _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); } @@ -1133,8 +983,8 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) if(current._handler->_registered & current.operation) { - assert(current._handler->_ready & current.operation); - current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready & ~current.operation); + assert(current._handler->_completed & current.operation); + current._handler->_completed = static_cast<SocketOperation>(current._handler->_completed & ~current.operation); return true; } else @@ -1142,6 +992,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish) { + Lock sync(*this); _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); } @@ -1152,10 +1003,14 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) void IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current) { - if(current._handler->_registered & current.operation) + if(current._handler->_registered & current.operation && !current._handler->_finish) { - assert(!(current._handler->_ready & current.operation)); - if(!current._handler->startAsync(current.operation)) + assert(!(current._handler->_completed & current.operation)); + if(current._handler->_ready & current.operation) + { + _selector.completed(current._handler.get(), current.operation); + } + else if(!current._handler->startAsync(current.operation)) { current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); } @@ -1173,6 +1028,7 @@ IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current) if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish) { // There are no more pending async operations, it's time to call finish. + Lock sync(*this); _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); } diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index d3447a1959c..f97ff363f1b 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -40,7 +40,7 @@ typedef IceUtil::Handle<ThreadPoolWorkQueue> ThreadPoolWorkQueuePtr; class ThreadPoolWorkItem : virtual public IceUtil::Shared { public: - + virtual void execute(ThreadPoolCurrent&) = 0; }; typedef IceUtil::Handle<ThreadPoolWorkItem> ThreadPoolWorkItemPtr; @@ -51,8 +51,8 @@ public: DispatchWorkItem(); DispatchWorkItem(const Ice::ConnectionPtr& connection); - - const Ice::ConnectionPtr& + + const Ice::ConnectionPtr& getConnection() { return _connection; @@ -66,18 +66,18 @@ private: }; typedef IceUtil::Handle<DispatchWorkItem> DispatchWorkItemPtr; -class ThreadPool : public IceUtil::Shared, public IceUtil::Monitor<IceUtil::Mutex> +class ThreadPool : public IceUtil::Shared, private IceUtil::Monitor<IceUtil::Mutex> { class EventHandlerThread : public IceUtil::Thread { public: - + EventHandlerThread(const ThreadPoolPtr&, const std::string&); virtual void run(); void updateObserver(); void setState(Ice::Instrumentation::ThreadState); - + private: ThreadPoolPtr _pool; @@ -106,6 +106,7 @@ public: update(handler, status, SocketOperationNone); } bool finish(const EventHandlerPtr&, bool); + void ready(const EventHandlerPtr&, SocketOperation, bool); void dispatchFromThisThread(const DispatchWorkItemPtr&); void dispatch(const DispatchWorkItemPtr&); @@ -140,6 +141,7 @@ private: friend class EventHandlerThread; friend class ThreadPoolCurrent; + friend class ThreadPoolWorkQueue; const int _size; // Number of threads that are pre-created. const int _sizeIO; // Maximum number of threads that can concurrently perform IO. @@ -158,7 +160,6 @@ private: int _inUseIO; // Number of threads that are currently performing IO. std::vector<std::pair<EventHandler*, SocketOperation> > _handlers; std::vector<std::pair<EventHandler*, SocketOperation> >::const_iterator _nextHandler; - std::set<EventHandler*> _pendingHandlers; #endif bool _promote; @@ -215,12 +216,11 @@ private: friend class ThreadPool; }; -class ThreadPoolWorkQueue : public EventHandler, public IceUtil::Mutex +class ThreadPoolWorkQueue : public EventHandler { public: - ThreadPoolWorkQueue(const InstancePtr&, Selector&); - ~ThreadPoolWorkQueue(); + ThreadPoolWorkQueue(ThreadPool&); void destroy(); void queue(const ThreadPoolWorkItemPtr&); @@ -234,19 +234,11 @@ public: virtual void finished(ThreadPoolCurrent&, bool); virtual std::string toString() const; virtual NativeInfoPtr getNativeInfo(); - virtual void postMessage(); private: - const InstancePtr _instance; - Selector& _selector; + ThreadPool& _threadPool; bool _destroyed; -#ifdef ICE_USE_IOCP - AsyncInfo _info; -#elif !defined(ICE_OS_WINRT) - SOCKET _fdIntrRead; - SOCKET _fdIntrWrite; -#endif std::list<ThreadPoolWorkItemPtr> _workItems; }; @@ -257,7 +249,7 @@ private: // // An instance of the IOScope subclass must be created within the synchronization // of the event handler. It takes care of calling startMessage/finishMessage for -// the IOCP implementation and ensures that finishMessage isn't called multiple +// the IOCP implementation and ensures that finishMessage isn't called multiple // times. // #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) @@ -290,7 +282,7 @@ public: } private: - + ThreadPoolMessage<T>& _message; }; friend class IOScope; @@ -309,7 +301,7 @@ private: ThreadPoolCurrent& _current; }; -#else +#else template<class T> class ThreadPoolMessage { @@ -321,7 +313,7 @@ public: IOScope(ThreadPoolMessage& message) : _message(message) { - // This must be called with the handler locked. + // This must be called with the handler locked. _finish = _message._current.startMessage(); } @@ -329,7 +321,7 @@ public: { if(_finish) { - // This must be called with the handler locked. + // This must be called with the handler locked. _message._current.finishMessage(); } } @@ -344,7 +336,7 @@ public: { // // Call finishMessage once IO is completed only if serialization is not enabled. - // Otherwise, finishMessage will be called when the event handler is done with + // Otherwise, finishMessage will be called when the event handler is done with // the message (it will be called from ~ThreadPoolMessage below). // assert(_finish); @@ -358,22 +350,22 @@ public: private: ThreadPoolMessage& _message; - bool _finish; + bool _finish; }; friend class IOScope; - - ThreadPoolMessage(ThreadPoolCurrent& current, const T& mutex) : + + ThreadPoolMessage(ThreadPoolCurrent& current, const T& mutex) : _current(current), _mutex(mutex), _finish(false) { } - + ~ThreadPoolMessage() { if(_finish) { // // A ThreadPoolMessage instance must be created outside the synchronization - // of the event handler. We need to lock the event handler here to call + // of the event handler. We need to lock the event handler here to call // finishMessage. // #if defined(__MINGW32__) @@ -386,7 +378,7 @@ public: } private: - + ThreadPoolCurrent& _current; const T& _mutex; bool _finish; diff --git a/cpp/src/Ice/Transceiver.h b/cpp/src/Ice/Transceiver.h index b330cab5f4b..d277e9bb746 100644 --- a/cpp/src/Ice/Transceiver.h +++ b/cpp/src/Ice/Transceiver.h @@ -27,17 +27,17 @@ public: virtual NativeInfoPtr getNativeInfo() = 0; - virtual SocketOperation initialize(Buffer&, Buffer&, bool&) = 0; + virtual SocketOperation initialize(Buffer&, Buffer&) = 0; virtual SocketOperation closing(bool, const Ice::LocalException&) = 0; virtual void close() = 0; virtual EndpointIPtr bind(); virtual SocketOperation write(Buffer&) = 0; - virtual SocketOperation read(Buffer&, bool&) = 0; + virtual SocketOperation read(Buffer&) = 0; #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) virtual bool startWrite(Buffer&) = 0; virtual void finishWrite(Buffer&) = 0; virtual void startRead(Buffer&) = 0; - virtual void finishRead(Buffer&, bool&) = 0; + virtual void finishRead(Buffer&) = 0; #endif virtual std::string protocol() const = 0; diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp index c664eeb977e..b1b3c8562b6 100644 --- a/cpp/src/Ice/UdpTransceiver.cpp +++ b/cpp/src/Ice/UdpTransceiver.cpp @@ -79,7 +79,7 @@ IceInternal::UdpTransceiver::setCompletedHandler(SocketOperationCompletedHandler #endif SocketOperation -IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/, bool& /*hasMoreData*/) +IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/) { if(_state == StateNeedConnect) { @@ -264,7 +264,7 @@ repeat: } SocketOperation -IceInternal::UdpTransceiver::read(Buffer& buf, bool&) +IceInternal::UdpTransceiver::read(Buffer& buf) { if(buf.i == buf.b.end()) { @@ -681,7 +681,7 @@ IceInternal::UdpTransceiver::startRead(Buffer& buf) } void -IceInternal::UdpTransceiver::finishRead(Buffer& buf, bool&) +IceInternal::UdpTransceiver::finishRead(Buffer& buf) { #ifdef ICE_OS_WINRT IceUtil::Mutex::Lock lock(_mutex); diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h index 83921ad54a5..7c1edfc3c6f 100644 --- a/cpp/src/Ice/UdpTransceiver.h +++ b/cpp/src/Ice/UdpTransceiver.h @@ -44,17 +44,17 @@ public: virtual void setCompletedHandler(SocketOperationCompletedHandler^); #endif - virtual SocketOperation initialize(Buffer&, Buffer&, bool&); + virtual SocketOperation initialize(Buffer&, Buffer&); virtual SocketOperation closing(bool, const Ice::LocalException&); virtual void close(); virtual EndpointIPtr bind(); virtual SocketOperation write(Buffer&); - virtual SocketOperation read(Buffer&, bool&); + virtual SocketOperation read(Buffer&); #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) virtual bool startWrite(Buffer&); virtual void finishWrite(Buffer&); virtual void startRead(Buffer&); - virtual void finishRead(Buffer&, bool&); + virtual void finishRead(Buffer&); #endif virtual std::string protocol() const; virtual std::string toString() const; diff --git a/cpp/src/Ice/WSTransceiver.cpp b/cpp/src/Ice/WSTransceiver.cpp index e68bccc4668..bd9254a22bb 100644 --- a/cpp/src/Ice/WSTransceiver.cpp +++ b/cpp/src/Ice/WSTransceiver.cpp @@ -199,14 +199,14 @@ IceInternal::WSTransceiver::setCompletedHandler(IceInternal::SocketOperationComp #endif SocketOperation -IceInternal::WSTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool& hasMoreData) +IceInternal::WSTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer) { // // Delegate logs exceptions that occur during initialize(), so there's no need to trap them here. // if(_state == StateInitializeDelegate) { - SocketOperation op = _delegate->initialize(readBuffer, writeBuffer, hasMoreData); + SocketOperation op = _delegate->initialize(readBuffer, writeBuffer); if(op != SocketOperationNone) { return op; @@ -280,7 +280,7 @@ IceInternal::WSTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, { if(_readBuffer.i < _readBuffer.b.end()) { - SocketOperation s = _delegate->read(_readBuffer, hasMoreData); + SocketOperation s = _delegate->read(_readBuffer); if(s == SocketOperationWrite || _readBuffer.i == _readBuffer.b.begin()) { return s; @@ -384,7 +384,10 @@ IceInternal::WSTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, _state = StateOpened; _nextState = StateOpened; - hasMoreData |= _readI < _readBuffer.i; + if(_readI < _readBuffer.i) + { + _delegate->getNativeInfo()->ready(SocketOperationRead, true); + } } catch(const Ice::LocalException& ex) { @@ -544,7 +547,7 @@ IceInternal::WSTransceiver::write(Buffer& buf) } SocketOperation -IceInternal::WSTransceiver::read(Buffer& buf, bool& hasMoreData) +IceInternal::WSTransceiver::read(Buffer& buf) { if(_readPending) { @@ -555,11 +558,11 @@ IceInternal::WSTransceiver::read(Buffer& buf, bool& hasMoreData) { if(_state < StateConnected) { - return _delegate->read(buf, hasMoreData); + return _delegate->read(buf); } else { - if(_delegate->read(_readBuffer, hasMoreData) == SocketOperationWrite) + if(_delegate->read(_readBuffer) == SocketOperationWrite) { return SocketOperationWrite; } @@ -576,7 +579,10 @@ IceInternal::WSTransceiver::read(Buffer& buf, bool& hasMoreData) // if(buf.i == buf.b.end()) { - hasMoreData |= _readI < _readBuffer.i; + if(_readI < _readBuffer.i) + { + _delegate->getNativeInfo()->ready(SocketOperationRead, true); + } return SocketOperationNone; } @@ -597,17 +603,17 @@ IceInternal::WSTransceiver::read(Buffer& buf, bool& hasMoreData) { size_t size = buf.b.size(); buf.b.resize(buf.i - buf.b.begin() + readSz); - s = _delegate->read(buf, hasMoreData); + s = _delegate->read(buf); buf.b.resize(size); } else { - s = _delegate->read(buf, hasMoreData); + s = _delegate->read(buf); } } else { - s = _delegate->read(_readBuffer, hasMoreData); + s = _delegate->read(_readBuffer); } if(s == SocketOperationWrite) @@ -621,12 +627,15 @@ IceInternal::WSTransceiver::read(Buffer& buf, bool& hasMoreData) if(buf.i == buf.b.end()) { - hasMoreData |= _readI < _readBuffer.i; + if(_readI < _readBuffer.i) + { + _delegate->getNativeInfo()->ready(SocketOperationRead, true); + } s = SocketOperationNone; } else { - hasMoreData = false; + _delegate->getNativeInfo()->ready(SocketOperationRead, false); s = SocketOperationRead; } @@ -767,18 +776,18 @@ IceInternal::WSTransceiver::startRead(Buffer& buf) } void -IceInternal::WSTransceiver::finishRead(Buffer& buf, bool& hasMoreData) +IceInternal::WSTransceiver::finishRead(Buffer& buf) { _readPending = false; if(_state < StateOpened) { if(_state < StateConnected) { - _delegate->finishRead(buf, hasMoreData); + _delegate->finishRead(buf); } else { - _delegate->finishRead(_readBuffer, hasMoreData); + _delegate->finishRead(_readBuffer); } return; } @@ -789,11 +798,11 @@ IceInternal::WSTransceiver::finishRead(Buffer& buf, bool& hasMoreData) } else if(_readState == ReadStatePayload) { - _delegate->finishRead(buf, hasMoreData); + _delegate->finishRead(buf); } else { - _delegate->finishRead(_readBuffer, hasMoreData); + _delegate->finishRead(_readBuffer); } if(_state == StateClosed) diff --git a/cpp/src/Ice/WSTransceiver.h b/cpp/src/Ice/WSTransceiver.h index c3d8d760e22..b7771ecece9 100644 --- a/cpp/src/Ice/WSTransceiver.h +++ b/cpp/src/Ice/WSTransceiver.h @@ -46,16 +46,16 @@ public: virtual void setCompletedHandler(SocketOperationCompletedHandler^); #endif - virtual SocketOperation initialize(Buffer&, Buffer&, bool&); + virtual SocketOperation initialize(Buffer&, Buffer&); virtual SocketOperation closing(bool, const Ice::LocalException&); virtual void close(); virtual SocketOperation write(Buffer&); - virtual SocketOperation read(Buffer&, bool&); + virtual SocketOperation read(Buffer&); #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) virtual bool startWrite(Buffer&); virtual void finishWrite(Buffer&); virtual void startRead(Buffer&); - virtual void finishRead(Buffer&, bool&); + virtual void finishRead(Buffer&); #endif virtual std::string protocol() const; virtual std::string toString() const; diff --git a/cpp/src/Ice/winrt/StreamTransceiver.cpp b/cpp/src/Ice/winrt/StreamTransceiver.cpp index e0a7acf6b9a..039643dd3ae 100644 --- a/cpp/src/Ice/winrt/StreamTransceiver.cpp +++ b/cpp/src/Ice/winrt/StreamTransceiver.cpp @@ -67,7 +67,7 @@ IceInternal::StreamTransceiver::setCompletedHandler(SocketOperationCompletedHand } SocketOperation -IceInternal::StreamTransceiver::initialize(Buffer&, Buffer&,bool&) +IceInternal::StreamTransceiver::initialize(Buffer&, Buffer&) { if(_state == StateNeedConnect) { @@ -123,7 +123,7 @@ IceInternal::StreamTransceiver::write(Buffer& buf) } SocketOperation -IceInternal::StreamTransceiver::read(Buffer& buf, bool&) +IceInternal::StreamTransceiver::read(Buffer& buf) { return buf.i == buf.b.end() ? SocketOperationNone : SocketOperationRead; } @@ -254,7 +254,7 @@ IceInternal::StreamTransceiver::startRead(Buffer& buf) } void -IceInternal::StreamTransceiver::finishRead(Buffer& buf, bool& hasMoreData) +IceInternal::StreamTransceiver::finishRead(Buffer& buf) { if(_read.count == SOCKET_ERROR) { diff --git a/cpp/src/Ice/winrt/StreamTransceiver.h b/cpp/src/Ice/winrt/StreamTransceiver.h index 2fc822868f0..307ac1e0a78 100644 --- a/cpp/src/Ice/winrt/StreamTransceiver.h +++ b/cpp/src/Ice/winrt/StreamTransceiver.h @@ -35,16 +35,16 @@ public: virtual NativeInfoPtr getNativeInfo(); virtual void setCompletedHandler(SocketOperationCompletedHandler^); - virtual SocketOperation initialize(Buffer&, Buffer&, bool&); + virtual SocketOperation initialize(Buffer&, Buffer&); virtual SocketOperation closing(bool, const Ice::LocalException&); virtual void close(); virtual SocketOperation write(Buffer&); - virtual SocketOperation read(Buffer&, bool&); + virtual SocketOperation read(Buffer&); virtual bool startWrite(Buffer&); virtual void finishWrite(Buffer&); virtual void startRead(Buffer&); - virtual void finishRead(Buffer&, bool&); + virtual void finishRead(Buffer&); virtual std::string protocol() const; virtual std::string toString() const; diff --git a/cpp/src/IceGrid/Makefile b/cpp/src/IceGrid/Makefile index 14da8e051d9..ed6f7fcaa01 100644 --- a/cpp/src/IceGrid/Makefile +++ b/cpp/src/IceGrid/Makefile @@ -114,7 +114,7 @@ $(ADMIN): $(ADMIN_OBJS) $(LIBTARGETS) $(REGISTRY_SERVER): $(REGISTRY_SVR_OBJS) $(LIBTARGETS) rm -f $@ $(CXX) $(LDFLAGS) $(LDEXEFLAGS) -o $@ $(REGISTRY_SVR_OBJS) -lIceGrid -lIceStorm -lIceStormService -lGlacier2 -lIcePatch2 \ - $(DB_RPATH_LINK) -lFreeze -lIceBox $(EXPAT_RPATH_LINK) -lIceXML -lIceSSL $(OPENSSL_RPATH_LINK) $(LIBS) + $(RPATH_LINK) -lFreeze -lIceBox $(EXPAT_RPATH_LINK) -lIceXML -lIceSSL $(OPENSSL_RPATH_LINK) $(LIBS) $(NODE_SERVER): $(NODE_SVR_OBJS) $(LIBTARGETS) rm -f $@ diff --git a/cpp/src/IceSSL/OpenSSLTransceiverI.cpp b/cpp/src/IceSSL/OpenSSLTransceiverI.cpp index 244a79d4c6e..ff9563db0de 100644 --- a/cpp/src/IceSSL/OpenSSLTransceiverI.cpp +++ b/cpp/src/IceSSL/OpenSSLTransceiverI.cpp @@ -83,7 +83,7 @@ IceSSL::TransceiverI::getNativeInfo() } IceInternal::SocketOperation -IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool&) +IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer) { IceInternal::SocketOperation status = _stream->connect(readBuffer, writeBuffer); if(status != IceInternal::SocketOperationNone) @@ -440,7 +440,7 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) } IceInternal::SocketOperation -IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&) +IceSSL::TransceiverI::read(IceInternal::Buffer& buf) { if(!_stream->isConnected()) { @@ -448,8 +448,7 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&) } // - // Note: we don't set the hasMoreData flag in this implementation. - // We assume that OpenSSL doesn't read more SSL records than + // Note: We assume that OpenSSL doesn't read more SSL records than // necessary to fill the requested data and that the sender sends // Ice messages in individual SSL records. // diff --git a/cpp/src/IceSSL/OpenSSLTransceiverI.h b/cpp/src/IceSSL/OpenSSLTransceiverI.h index a64e932a0f5..eb81b12ec12 100644 --- a/cpp/src/IceSSL/OpenSSLTransceiverI.h +++ b/cpp/src/IceSSL/OpenSSLTransceiverI.h @@ -38,11 +38,11 @@ public: virtual IceInternal::NativeInfoPtr getNativeInfo(); - virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&); + virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&); virtual IceInternal::SocketOperation closing(bool, const Ice::LocalException&); virtual void close(); virtual IceInternal::SocketOperation write(IceInternal::Buffer&); - virtual IceInternal::SocketOperation read(IceInternal::Buffer&, bool&); + virtual IceInternal::SocketOperation read(IceInternal::Buffer&); virtual std::string protocol() const; virtual std::string toString() const; virtual std::string toDetailedString() const; diff --git a/cpp/src/IceSSL/SChannelTransceiverI.cpp b/cpp/src/IceSSL/SChannelTransceiverI.cpp index 2b842769b09..238bf3981da 100644 --- a/cpp/src/IceSSL/SChannelTransceiverI.cpp +++ b/cpp/src/IceSSL/SChannelTransceiverI.cpp @@ -622,7 +622,7 @@ IceSSL::TransceiverI::encryptMessage(IceInternal::Buffer& buffer) } IceInternal::SocketOperation -IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool& hasMoreData) +IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer) { IceInternal::SocketOperation op = _stream->connect(readBuffer, writeBuffer); if(op != IceInternal::SocketOperationNone) @@ -746,7 +746,8 @@ IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::B } out << toString(); } - hasMoreData = !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin(); + _stream->ready(IceInternal::SocketOperationRead, + !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin()); return IceInternal::SocketOperationNone; } @@ -819,7 +820,7 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) } IceInternal::SocketOperation -IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool& hasMoreData) +IceSSL::TransceiverI::read(IceInternal::Buffer& buf) { if(!_stream->isConnected()) { @@ -832,7 +833,7 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool& hasMoreData) } assert(_state == StateHandshakeComplete); - hasMoreData = false; + _stream->ready(IceInternal::SocketOperationRead, false); while(buf.i != buf.b.end()) { if(_readUnprocessed.b.empty() && _readBuffer.i == _readBuffer.b.begin() && !readRaw(_readBuffer)) @@ -852,7 +853,8 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool& hasMoreData) buf.i += decrypted; } - hasMoreData = !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin(); + _stream->ready(IceInternal::SocketOperationRead, + !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin()); return IceInternal::SocketOperationNone; } @@ -909,7 +911,7 @@ IceSSL::TransceiverI::startRead(IceInternal::Buffer& buffer) } void -IceSSL::TransceiverI::finishRead(IceInternal::Buffer& buf, bool& hasMoreData) +IceSSL::TransceiverI::finishRead(IceInternal::Buffer& buf) { if(!_stream->isConnected()) { @@ -924,11 +926,12 @@ IceSSL::TransceiverI::finishRead(IceInternal::Buffer& buf, bool& hasMoreData) if(decrypted > 0) { buf.i += decrypted; - hasMoreData = !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin(); + _stream->ready(IceInternal::SocketOperationRead, + !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin()); } else { - hasMoreData = false; + _stream->ready(IceInternal::SocketOperationRead, false); } } } diff --git a/cpp/src/IceSSL/SChannelTransceiverI.h b/cpp/src/IceSSL/SChannelTransceiverI.h index f60d54ff6d6..a4eb31ab46f 100644 --- a/cpp/src/IceSSL/SChannelTransceiverI.h +++ b/cpp/src/IceSSL/SChannelTransceiverI.h @@ -49,16 +49,16 @@ public: virtual IceInternal::NativeInfoPtr getNativeInfo(); - virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&); + virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&); virtual IceInternal::SocketOperation closing(bool, const Ice::LocalException&); virtual void close(); virtual IceInternal::SocketOperation write(IceInternal::Buffer&); - virtual IceInternal::SocketOperation read(IceInternal::Buffer&, bool&); + virtual IceInternal::SocketOperation read(IceInternal::Buffer&); #ifdef ICE_USE_IOCP virtual bool startWrite(IceInternal::Buffer&); virtual void finishWrite(IceInternal::Buffer&); virtual void startRead(IceInternal::Buffer&); - virtual void finishRead(IceInternal::Buffer&, bool&); + virtual void finishRead(IceInternal::Buffer&); #endif virtual std::string protocol() const; virtual std::string toString() const; diff --git a/cpp/src/IceSSL/SecureTransportTransceiverI.cpp b/cpp/src/IceSSL/SecureTransportTransceiverI.cpp index 211a4ca3fe8..08bce7548a1 100644 --- a/cpp/src/IceSSL/SecureTransportTransceiverI.cpp +++ b/cpp/src/IceSSL/SecureTransportTransceiverI.cpp @@ -175,7 +175,7 @@ IceSSL::TransceiverI::getNativeInfo() } IceInternal::SocketOperation -IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool&) +IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer) { IceInternal::SocketOperation status = _stream->connect(readBuffer, writeBuffer); if(status != IceInternal::SocketOperationNone) @@ -396,7 +396,7 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) } IceInternal::SocketOperation -IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&) +IceSSL::TransceiverI::read(IceInternal::Buffer& buf) { if(!_stream->isConnected()) { @@ -404,10 +404,9 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&) } // - // Note: we don't set the hasMoreData flag in this implementation. - // We assume that SecureTransport doesn't read more SSL records - // than necessary to fill the requested data and that the sender - // sends Ice messages in individual SSL records. + // Note: we assume that SecureTransport doesn't read more SSL records + // than necessary to fill the requested data and that the sender sends + // Ice messages in individual SSL records. // if(buf.i == buf.b.end()) diff --git a/cpp/src/IceSSL/SecureTransportTransceiverI.h b/cpp/src/IceSSL/SecureTransportTransceiverI.h index c81ee7aaef1..f50a17de13d 100644 --- a/cpp/src/IceSSL/SecureTransportTransceiverI.h +++ b/cpp/src/IceSSL/SecureTransportTransceiverI.h @@ -37,11 +37,11 @@ public: virtual IceInternal::NativeInfoPtr getNativeInfo(); - virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&); + virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&); virtual IceInternal::SocketOperation closing(bool, const Ice::LocalException&); virtual void close(); virtual IceInternal::SocketOperation write(IceInternal::Buffer&); - virtual IceInternal::SocketOperation read(IceInternal::Buffer&, bool&); + virtual IceInternal::SocketOperation read(IceInternal::Buffer&); virtual std::string protocol() const; virtual std::string toString() const; @@ -74,7 +74,7 @@ private: SSLContextRef _ssl; SecTrustRef _trust; bool _verified; - + size_t _buffered; enum SSLWantFlags { diff --git a/cpp/test/Ice/background/AllTests.cpp b/cpp/test/Ice/background/AllTests.cpp index 85599f4b8f4..70a44976b6d 100644 --- a/cpp/test/Ice/background/AllTests.cpp +++ b/cpp/test/Ice/background/AllTests.cpp @@ -537,8 +537,9 @@ initializeTests(const ConfigurationPtr& configuration, background->op(); configuration->initializeSocketOperation(IceInternal::SocketOperationNone); } - catch(const Ice::LocalException&) + catch(const Ice::LocalException& ex) { + cerr << ex << endl; test(false); } background->ice_getConnection()->close(false); @@ -549,8 +550,9 @@ initializeTests(const ConfigurationPtr& configuration, background->op(); configuration->initializeSocketOperation(IceInternal::SocketOperationNone); } - catch(const Ice::LocalException&) + catch(const Ice::LocalException& ex) { + cerr << ex << endl; test(false); } background->ice_getConnection()->close(false); diff --git a/cpp/test/Ice/background/Transceiver.cpp b/cpp/test/Ice/background/Transceiver.cpp index fc639d39e41..56187dc359c 100644 --- a/cpp/test/Ice/background/Transceiver.cpp +++ b/cpp/test/Ice/background/Transceiver.cpp @@ -18,7 +18,7 @@ Transceiver::getNativeInfo() } IceInternal::SocketOperation -Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool& hasMoreData) +Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer) { #ifndef ICE_USE_IOCP IceInternal::SocketOperation status = _configuration->initializeSocketOperation(); @@ -30,7 +30,7 @@ Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& wr { if(!_initialized) { - status = _transceiver->initialize(readBuffer, writeBuffer, hasMoreData); + status = _transceiver->initialize(readBuffer, writeBuffer); if(status != IceInternal::SocketOperationNone) { return status; @@ -48,7 +48,7 @@ Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& wr _configuration->checkInitializeException(); if(!_initialized) { - IceInternal::SocketOperation status = _transceiver->initialize(readBuffer, writeBuffer, hasMoreData); + IceInternal::SocketOperation status = _transceiver->initialize(readBuffer, writeBuffer); if(status != IceInternal::SocketOperationNone) { return status; @@ -83,7 +83,7 @@ Transceiver::write(IceInternal::Buffer& buf) } IceInternal::SocketOperation -Transceiver::read(IceInternal::Buffer& buf, bool& moreData) +Transceiver::read(IceInternal::Buffer& buf) { if(!_configuration->readReady() && (!buf.b.empty() && buf.i < buf.b.end())) { @@ -99,10 +99,10 @@ Transceiver::read(IceInternal::Buffer& buf, bool& moreData) if(_readBufferPos == _readBuffer.i) { _readBufferPos = _readBuffer.i = _readBuffer.b.begin(); - _transceiver->read(_readBuffer, moreData); + _transceiver->read(_readBuffer); if(_readBufferPos == _readBuffer.i) { - moreData = false; + _transceiver->getNativeInfo()->ready(IceInternal::SocketOperationRead, false); return IceInternal::SocketOperationRead; } } @@ -119,12 +119,15 @@ Transceiver::read(IceInternal::Buffer& buf, bool& moreData) _readBufferPos += available; buf.i += available; } - moreData |= _readBufferPos < _readBuffer.i; + if(_readBufferPos < _readBuffer.i) + { + _transceiver->getNativeInfo()->ready(IceInternal::SocketOperationRead, true); + } return IceInternal::SocketOperationNone; } else { - return _transceiver->read(buf, moreData); + return _transceiver->read(buf); } } @@ -182,14 +185,14 @@ Transceiver::startRead(IceInternal::Buffer& buf) } void -Transceiver::finishRead(IceInternal::Buffer& buf, bool& hasMoreData) +Transceiver::finishRead(IceInternal::Buffer& buf) { _configuration->checkReadException(); if(_buffered && _initialized) { if(buf.i != buf.b.end()) { - _transceiver->finishRead(_readBuffer, hasMoreData); + _transceiver->finishRead(_readBuffer); size_t requested = buf.b.end() - buf.i; size_t available = _readBuffer.i - _readBufferPos; @@ -208,7 +211,7 @@ Transceiver::finishRead(IceInternal::Buffer& buf, bool& hasMoreData) } else { - _transceiver->finishRead(buf, hasMoreData); + _transceiver->finishRead(buf); } } #endif diff --git a/cpp/test/Ice/background/Transceiver.h b/cpp/test/Ice/background/Transceiver.h index d6e203f087f..dac2112769b 100644 --- a/cpp/test/Ice/background/Transceiver.h +++ b/cpp/test/Ice/background/Transceiver.h @@ -22,18 +22,18 @@ public: virtual IceInternal::SocketOperation closing(bool, const Ice::LocalException&); virtual void close(); virtual IceInternal::SocketOperation write(IceInternal::Buffer&); - virtual IceInternal::SocketOperation read(IceInternal::Buffer&, bool&); + virtual IceInternal::SocketOperation read(IceInternal::Buffer&); #ifdef ICE_USE_IOCP virtual bool startWrite(IceInternal::Buffer&); virtual void finishWrite(IceInternal::Buffer&); virtual void startRead(IceInternal::Buffer&); - virtual void finishRead(IceInternal::Buffer&, bool&); + virtual void finishRead(IceInternal::Buffer&); #endif virtual std::string protocol() const; virtual std::string toString() const; virtual std::string toDetailedString() const; virtual Ice::ConnectionInfoPtr getInfo() const; - virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&); + virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&); virtual void checkSendSize(const IceInternal::Buffer&); virtual void setBufferSize(int rcvSize, int sndSize); |