summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2013-07-26 09:09:03 +0200
committerBenoit Foucher <benoit@zeroc.com>2013-07-26 09:09:03 +0200
commit1e6a694714985125f37bf67dc5d35fd76fc7a33f (patch)
tree92dbf46286fcd8d02010b00093907511be8ab08d /cpp/src
parentICE-5313 - more fixes for checksums (diff)
downloadice-1e6a694714985125f37bf67dc5d35fd76fc7a33f.tar.bz2
ice-1e6a694714985125f37bf67dc5d35fd76fc7a33f.tar.xz
ice-1e6a694714985125f37bf67dc5d35fd76fc7a33f.zip
Revert "Fix to allow transceivers to read more data than requested."
This reverts commit 9c4e79ce6760badf047568fd300fcbe3455f31b7.
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp11
-rw-r--r--cpp/src/Ice/EventHandler.cpp1
-rw-r--r--cpp/src/Ice/EventHandler.h1
-rw-r--r--cpp/src/Ice/Network.cpp18
-rw-r--r--cpp/src/Ice/Network.h7
-rw-r--r--cpp/src/Ice/Selector.cpp3
-rw-r--r--cpp/src/Ice/TcpTransceiver.cpp6
-rw-r--r--cpp/src/Ice/TcpTransceiver.h4
-rw-r--r--cpp/src/Ice/ThreadPool.cpp108
-rw-r--r--cpp/src/Ice/ThreadPool.h1
-rw-r--r--cpp/src/Ice/Transceiver.h4
-rw-r--r--cpp/src/Ice/UdpTransceiver.cpp6
-rw-r--r--cpp/src/Ice/UdpTransceiver.h4
-rw-r--r--cpp/src/IceSSL/TransceiverI.cpp4
-rw-r--r--cpp/src/IceSSL/TransceiverI.h4
15 files changed, 30 insertions, 152 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 9abb600eae2..8091d2f2233 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -1366,7 +1366,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
{
if(_readHeader) // Read header if necessary.
{
- if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream, _hasMoreData))
+ if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream))
{
return;
}
@@ -1437,7 +1437,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
_observer.startRead(_readStream.i);
}
- if(!_transceiver->read(_readStream, _hasMoreData))
+ if(!_transceiver->read(_readStream))
{
assert(!_readStream.b.empty());
scheduleTimeout(SocketOperationRead, _endpoint->timeout());
@@ -2135,8 +2135,7 @@ Ice::ConnectionI::setState(State state)
}
if(_state == StateHolding)
{
- // We need to continue to read in closing state.
- _threadPool->_register(this, SocketOperationRead);
+ _threadPool->_register(this, SocketOperationRead); // We need to continue to read in closing state.
}
break;
}
@@ -2281,7 +2280,7 @@ Ice::ConnectionI::initiateShutdown()
bool
Ice::ConnectionI::initialize(SocketOperation operation)
{
- SocketOperation s = _transceiver->initialize(_readStream, _writeStream, _hasMoreData);
+ SocketOperation s = _transceiver->initialize(_readStream, _writeStream);
if(s != SocketOperationNone)
{
scheduleTimeout(s, connectTimeout());
@@ -2350,7 +2349,7 @@ Ice::ConnectionI::validate(SocketOperation operation)
_observer.startRead(_readStream.i);
}
- if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream, _hasMoreData))
+ if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream))
{
scheduleTimeout(SocketOperationRead, connectTimeout());
_threadPool->update(this, operation, SocketOperationRead);
diff --git a/cpp/src/Ice/EventHandler.cpp b/cpp/src/Ice/EventHandler.cpp
index 413d44c2e32..f0505a8b7db 100644
--- a/cpp/src/Ice/EventHandler.cpp
+++ b/cpp/src/Ice/EventHandler.cpp
@@ -25,7 +25,6 @@ IceInternal::EventHandler::EventHandler() :
#else
_disabled(SocketOperationNone),
#endif
- _hasMoreData(false),
_registered(SocketOperationNone)
{
}
diff --git a/cpp/src/Ice/EventHandler.h b/cpp/src/Ice/EventHandler.h
index 565e963a11f..7e51f0e9159 100644
--- a/cpp/src/Ice/EventHandler.h
+++ b/cpp/src/Ice/EventHandler.h
@@ -65,7 +65,6 @@ protected:
#else
SocketOperation _disabled;
#endif
- bool _hasMoreData;
SocketOperation _registered;
friend class ThreadPool;
diff --git a/cpp/src/Ice/Network.cpp b/cpp/src/Ice/Network.cpp
index 0b280166a37..987ba372326 100644
--- a/cpp/src/Ice/Network.cpp
+++ b/cpp/src/Ice/Network.cpp
@@ -543,24 +543,6 @@ IceInternal::AsyncInfo::AsyncInfo(SocketOperation s)
ZeroMemory(this, sizeof(AsyncInfo));
status = s;
}
-
-void
-IceInternal::NativeInfo::initialize(HANDLE handle, ULONG_PTR key)
-{
- _handle = handle;
- _key = key;
-}
-
-void
-IceInternal::NativeInfo::completed(SocketOperation operation)
-{
- if(!PostQueuedCompletionStatus(_handle, 0, _key, getAsyncInfo(operation)))
- {
- Ice::SocketException ex(__FILE__, __LINE__);
- ex.error = GetLastError();
- throw ex;
- }
-}
#endif
IceUtil::Shared* IceInternal::upCast(NetworkProxy* p) { return p; }
diff --git a/cpp/src/Ice/Network.h b/cpp/src/Ice/Network.h
index c9416ee4ef6..4f045031dc5 100644
--- a/cpp/src/Ice/Network.h
+++ b/cpp/src/Ice/Network.h
@@ -178,8 +178,6 @@ public:
//
#if defined(ICE_USE_IOCP)
virtual AsyncInfo* getAsyncInfo(SocketOperation) = 0;
- void initialize(HANDLE, ULONG_PTR);
- void completed(SocketOperation operation);
#elif defined(ICE_OS_WINRT)
virtual void setCompletedHandler(SocketOperationCompletedHandler^) = 0;
#endif
@@ -187,11 +185,6 @@ public:
protected:
SOCKET _fd;
-
-#if defined(ICE_USE_IOCP)
- HANDLE _handle;
- ULONG_PTR _key;
-#endif
};
typedef IceUtil::Handle<NativeInfo> NativeInfoPtr;
diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp
index 48de01df197..c9763d6e2a9 100644
--- a/cpp/src/Ice/Selector.cpp
+++ b/cpp/src/Ice/Selector.cpp
@@ -140,7 +140,6 @@ Selector::initialize(EventHandler* handler)
throw ex;
}
handler->__incRef();
- handler->getNativeInfo()->initialize(_handle, reinterpret_cast<ULONG_PTR>(handler));
}
void
@@ -483,6 +482,7 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti
}
assert(ret > 0);
+ handlers.clear();
for(int i = 0; i < ret; ++i)
{
pair<EventHandler*, SocketOperation> p;
@@ -701,6 +701,7 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti
}
assert(ret > 0);
+ handlers.clear();
#if defined(ICE_USE_SELECT)
if(_selectedReadFdSet.fd_count == 0 && _selectedWriteFdSet.fd_count == 0 && _selectedErrorFdSet.fd_count == 0)
diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp
index c29391fa9da..d82fafa17ce 100644
--- a/cpp/src/Ice/TcpTransceiver.cpp
+++ b/cpp/src/Ice/TcpTransceiver.cpp
@@ -46,7 +46,7 @@ IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status)
#endif
SocketOperation
-IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool& hasMoreData)
+IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer)
{
try
{
@@ -93,7 +93,7 @@ IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer,
//
// Try to read the response.
//
- if(read(readBuffer, hasMoreData))
+ if(read(readBuffer))
{
//
// Read completed without blocking - fall through.
@@ -268,7 +268,7 @@ IceInternal::TcpTransceiver::write(Buffer& buf)
}
bool
-IceInternal::TcpTransceiver::read(Buffer& buf, bool&)
+IceInternal::TcpTransceiver::read(Buffer& buf)
{
//
// It's impossible for packetSize to be more than an Int.
diff --git a/cpp/src/Ice/TcpTransceiver.h b/cpp/src/Ice/TcpTransceiver.h
index 5faad7b9a46..a2afe4bb514 100644
--- a/cpp/src/Ice/TcpTransceiver.h
+++ b/cpp/src/Ice/TcpTransceiver.h
@@ -41,10 +41,10 @@ public:
virtual AsyncInfo* getAsyncInfo(SocketOperation);
#endif
- virtual SocketOperation initialize(Buffer&, Buffer&, bool&);
+ virtual SocketOperation initialize(Buffer&, Buffer&);
virtual void close();
virtual bool write(Buffer&);
- virtual bool read(Buffer&, bool&);
+ virtual bool read(Buffer&);
#ifdef ICE_USE_IOCP
virtual bool startWrite(Buffer&);
virtual void finishWrite(Buffer&);
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 32b7bbddde6..62652c786d2 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -108,29 +108,6 @@ private:
IceUtil::ThreadPtr _thread;
};
-class InterruptWorkItem : public ThreadPoolWorkItem
-{
-public:
-
- virtual void
- execute(ThreadPoolCurrent& current)
- {
- // Nothing to do, this is just used to interrupt the thread pool selector.
- }
-};
-ThreadPoolWorkItemPtr interruptWorkItem;
-
-class InterruptWorkItemInit
-{
-public:
-
- InterruptWorkItemInit()
- {
- interruptWorkItem = new InterruptWorkItem;
- }
-};
-InterruptWorkItemInit init;
-
//
// Exception raised by the thread pool work queue when the thread pool
// is destroyed.
@@ -585,20 +562,6 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation
Lock sync(*this);
assert(!_destroyed);
_selector.update(handler.get(), remove, add);
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- if(add & SocketOperationRead && handler->_hasMoreData)
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(handler.get());
- }
- else if(remove & SocketOperationRead)
- {
- _pendingHandlers.erase(handler.get());
- }
-#endif
}
void
@@ -614,8 +577,8 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler)
// Clear the current ready handlers. The handlers from this vector can't be
// reference counted and a handler might get destroyed once it's finished.
//
- //_handlers.clear();
- //_nextHandler = _handlers.end();
+ _handlers.clear();
+ _nextHandler = _handlers.end();
#else
// If there are no pending asynchronous operations, we can call finish on the handler now.
if(!handler->_pending)
@@ -719,19 +682,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
if(select)
{
_handlers.swap(handlers);
- if(!_pendingHandlers.empty())
- {
- for(_nextHandler = _handlers.begin(); _nextHandler != _handlers.end(); ++_nextHandler)
- {
- _pendingHandlers.erase(_nextHandler->first);
- }
- set<EventHandler*>::const_iterator p;
- for(p = _pendingHandlers.begin(); p != _pendingHandlers.end(); ++p)
- {
- _handlers.push_back(make_pair(*p, SocketOperationRead));
- }
- _pendingHandlers.clear();
- }
_nextHandler = _handlers.begin();
_selector.finishSelect();
select = false;
@@ -750,14 +700,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
// the IO thread count now.
//
--_inUseIO;
- if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(current._handler.get());
- }
}
else
{
@@ -765,18 +707,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
// If the handler called ioCompleted(), we re-enable the handler in
// case it was disabled and we decrease the number of thread in use.
//
- if(_serialize)
- {
- _selector.enable(current._handler.get(), current.operation);
- if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(current._handler.get());
- }
- }
+ _selector.enable(current._handler.get(), current.operation);
assert(_inUse > 0);
--_inUse;
}
@@ -786,22 +717,10 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
return; // Wait timed-out.
}
}
- else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(current._handler.get());
- }
//
// Get the next ready handler.
//
- while(_nextHandler != _handlers.end() && !(_nextHandler->second & _nextHandler->first->_registered))
- {
- ++_nextHandler;
- }
if(_nextHandler != _handlers.end())
{
current._ioCompleted = false;
@@ -829,7 +748,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
}
else
{
- _handlers.clear();
_selector.startSelect();
select = true;
thread->setState(ThreadStateIdle);
@@ -971,25 +889,13 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
if(_sizeMax > 1)
{
-
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
--_inUseIO;
-
- if(!_destroyed)
+
+ if(_serialize && !_destroyed)
{
- if(_serialize)
- {
- _selector.disable(current._handler.get(), current.operation);
- }
- else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(current._handler.get());
- }
- }
+ _selector.disable(current._handler.get(), current.operation);
+ }
if(current._leader)
{
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h
index cf8a6d926b4..43a8f2fb361 100644
--- a/cpp/src/Ice/ThreadPool.h
+++ b/cpp/src/Ice/ThreadPool.h
@@ -127,7 +127,6 @@ private:
std::vector<std::pair<EventHandler*, SocketOperation> > _handlers;
std::vector<std::pair<EventHandler*, SocketOperation> >::const_iterator _nextHandler;
#endif
- std::set<EventHandler*> _pendingHandlers;
bool _promote;
};
diff --git a/cpp/src/Ice/Transceiver.h b/cpp/src/Ice/Transceiver.h
index c1f6aa64b03..c2c00cf151d 100644
--- a/cpp/src/Ice/Transceiver.h
+++ b/cpp/src/Ice/Transceiver.h
@@ -25,10 +25,10 @@ class ICE_API Transceiver : virtual public ::IceUtil::Shared
public:
virtual NativeInfoPtr getNativeInfo() = 0;
- virtual SocketOperation initialize(Buffer&, Buffer&, bool&) = 0;
+ virtual SocketOperation initialize(Buffer&, Buffer&) = 0;
virtual void close() = 0;
virtual bool write(Buffer&) = 0;
- virtual bool read(Buffer&, bool&) = 0;
+ virtual bool read(Buffer&) = 0;
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
virtual bool startWrite(Buffer&) = 0;
virtual void finishWrite(Buffer&) = 0;
diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp
index 53d0901228c..ded61171571 100644
--- a/cpp/src/Ice/UdpTransceiver.cpp
+++ b/cpp/src/Ice/UdpTransceiver.cpp
@@ -80,7 +80,7 @@ IceInternal::UdpTransceiver::setCompletedHandler(SocketOperationCompletedHandler
#endif
SocketOperation
-IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/, bool& /*hasMoreData*/)
+IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/)
{
if(_state == StateNeedConnect)
{
@@ -243,13 +243,13 @@ repeat:
#ifdef ICE_OS_WINRT
bool
-IceInternal::UdpTransceiver::read(Buffer&, bool&)
+IceInternal::UdpTransceiver::read(Buffer&)
{
return false;
}
#else
bool
-IceInternal::UdpTransceiver::read(Buffer& buf, bool&)
+IceInternal::UdpTransceiver::read(Buffer& buf)
{
assert(buf.i == buf.b.begin());
assert(_fd != INVALID_SOCKET);
diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h
index 9768ff79a2d..ad6bf128619 100644
--- a/cpp/src/Ice/UdpTransceiver.h
+++ b/cpp/src/Ice/UdpTransceiver.h
@@ -47,10 +47,10 @@ public:
virtual void setCompletedHandler(SocketOperationCompletedHandler^);
#endif
- virtual SocketOperation initialize(Buffer&, Buffer&, bool&);
+ virtual SocketOperation initialize(Buffer&, Buffer&);
virtual void close();
virtual bool write(Buffer&);
- virtual bool read(Buffer&, bool&);
+ virtual bool read(Buffer&);
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
virtual bool startWrite(Buffer&);
virtual void finishWrite(Buffer&);
diff --git a/cpp/src/IceSSL/TransceiverI.cpp b/cpp/src/IceSSL/TransceiverI.cpp
index f48395deda9..fbf683d947d 100644
--- a/cpp/src/IceSSL/TransceiverI.cpp
+++ b/cpp/src/IceSSL/TransceiverI.cpp
@@ -50,7 +50,7 @@ IceSSL::TransceiverI::getAsyncInfo(IceInternal::SocketOperation status)
#endif
IceInternal::SocketOperation
-IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool&)
+IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer)
{
try
{
@@ -557,7 +557,7 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf)
}
bool
-IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&)
+IceSSL::TransceiverI::read(IceInternal::Buffer& buf)
{
if(_state == StateProxyConnectRequestPending)
{
diff --git a/cpp/src/IceSSL/TransceiverI.h b/cpp/src/IceSSL/TransceiverI.h
index 3b2b3a73937..2c475c8638e 100644
--- a/cpp/src/IceSSL/TransceiverI.h
+++ b/cpp/src/IceSSL/TransceiverI.h
@@ -46,10 +46,10 @@ public:
virtual IceInternal::AsyncInfo* getAsyncInfo(IceInternal::SocketOperation);
#endif
- virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&);
+ virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&);
virtual void close();
virtual bool write(IceInternal::Buffer&);
- virtual bool read(IceInternal::Buffer&, bool&);
+ virtual bool read(IceInternal::Buffer&);
#ifdef ICE_USE_IOCP
virtual bool startWrite(IceInternal::Buffer&);
virtual void finishWrite(IceInternal::Buffer&);