summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-10-15 08:20:15 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-10-15 08:20:15 +0200
commit4c6a4a6e77eeb513e2031912698a1fa839d9065a (patch)
treefdfad1d1dceb1fcc614da334b7058efc99925703 /cpp
parentICE-5739 missing AMD test in c++ optional (diff)
downloadice-4c6a4a6e77eeb513e2031912698a1fa839d9065a.tar.bz2
ice-4c6a4a6e77eeb513e2031912698a1fa839d9065a.tar.xz
ice-4c6a4a6e77eeb513e2031912698a1fa839d9065a.zip
Fixed ICE-5454: close acceptor on adapter deactivation
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp40
-rw-r--r--cpp/src/Ice/ConnectionFactory.h2
-rw-r--r--cpp/src/Ice/ConnectionI.cpp63
-rw-r--r--cpp/src/Ice/ConnectionI.h4
-rw-r--r--cpp/src/Ice/EventHandler.h2
-rw-r--r--cpp/src/Ice/Selector.cpp33
-rw-r--r--cpp/src/Ice/Selector.h21
-rw-r--r--cpp/src/Ice/ThreadPool.cpp36
-rw-r--r--cpp/src/Ice/ThreadPool.h4
9 files changed, 113 insertions, 92 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index f1fc0380727..154e1ef193b 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -1526,11 +1526,18 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current)
}
void
-IceInternal::IncomingConnectionFactory::finished(ThreadPoolCurrent&)
+IceInternal::IncomingConnectionFactory::finished(ThreadPoolCurrent&, bool close)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_state == StateClosed);
setState(StateFinished);
+
+ assert(_acceptor);
+
+ if(close)
+ {
+ closeAcceptor(true);
+ }
}
string
@@ -1752,26 +1759,23 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
if(_acceptor)
{
- dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->finish(this);
+ //
+ // If possible, close the acceptor now to prevent new connections from
+ // being accepted while we are deactivating. This is especially useful
+ // if there are no more threads in the thread pool available to dispatch
+ // the finish() call. Not all selector implementations do support this
+ // however.
+ //
+ if(dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->finish(this, true))
+ {
+ closeAcceptor(true);
+ }
}
else
{
state = StateFinished;
}
-#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- //
- // With IOCP and WinRT, we close the acceptor now to cancel all the pending
- // asynchronous operations. It's important to wait for the pending asynchronous
- // operations to return before ConnectionI::finished(). Otherwise, if there was
- // a pending message waiting to be sent, the connection wouldn't know whether
- // or not the send failed or succeeded, potentially breaking at-most-once semantics.
- //
- if(_acceptor)
- {
- closeAcceptor(true);
- }
-#endif
for_each(_connections.begin(), _connections.end(),
bind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated));
break;
@@ -1780,12 +1784,6 @@ IceInternal::IncomingConnectionFactory::setState(State state)
case StateFinished:
{
assert(_state == StateClosed);
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- if(_acceptor)
- {
- closeAcceptor(true);
- }
-#endif
break;
}
}
diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h
index 92603a55b04..c6b688225b5 100644
--- a/cpp/src/Ice/ConnectionFactory.h
+++ b/cpp/src/Ice/ConnectionFactory.h
@@ -190,7 +190,7 @@ public:
#endif
virtual void message(ThreadPoolCurrent&);
- virtual void finished(ThreadPoolCurrent&);
+ virtual void finished(ThreadPoolCurrent&, bool);
virtual std::string toString() const;
virtual NativeInfoPtr getNativeInfo();
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 92dc4a1693d..01df99084be 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -97,16 +97,16 @@ public:
private:
- ConnectionIPtr _connection;
- ConnectionI::StartCallbackPtr _startCB;
- vector<ConnectionI::OutgoingMessage> _sentCBs;
- Byte _compress;
- Int _requestId;
- Int _invokeNum;
- ServantManagerPtr _servantManager;
- ObjectAdapterPtr _adapter;
- OutgoingAsyncPtr _outAsync;
- ConnectionCallbackPtr _heartbeatCallback;
+ const ConnectionIPtr _connection;
+ const ConnectionI::StartCallbackPtr _startCB;
+ const vector<ConnectionI::OutgoingMessage> _sentCBs;
+ const Byte _compress;
+ const Int _requestId;
+ const Int _invokeNum;
+ const ServantManagerPtr _servantManager;
+ const ObjectAdapterPtr _adapter;
+ const OutgoingAsyncPtr _outAsync;
+ const ConnectionCallbackPtr _heartbeatCallback;
BasicStream _stream;
};
@@ -114,19 +114,21 @@ class FinishCall : public DispatchWorkItem
{
public:
- FinishCall(const Ice::ConnectionIPtr& connection) : DispatchWorkItem(connection), _connection(connection)
+ FinishCall(const Ice::ConnectionIPtr& connection, bool close) :
+ DispatchWorkItem(connection), _connection(connection), _close(close)
{
}
virtual void
run()
{
- _connection->finish();
+ _connection->finish(_close);
}
private:
-
- ConnectionIPtr _connection;
+
+ const ConnectionIPtr _connection;
+ const bool _close;
};
ConnectionState connectionStateMap[] = {
@@ -2092,7 +2094,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
}
void
-Ice::ConnectionI::finished(ThreadPoolCurrent& current)
+Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close)
{
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -2107,23 +2109,23 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current)
//
if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_callback)
{
- finish();
+ finish(close);
return;
}
current.ioCompleted();
if(!_dispatcher) // Optimization, call finish() directly if there's no dispatcher.
{
- finish();
+ finish(close);
}
else
{
- _threadPool->dispatchFromThisThread(new FinishCall(this));
+ _threadPool->dispatchFromThisThread(new FinishCall(this, close));
}
}
void
-Ice::ConnectionI::finish()
+Ice::ConnectionI::finish(bool close)
{
if(!_initialized)
{
@@ -2131,7 +2133,8 @@ Ice::ConnectionI::finish()
{
string verb = _connector ? "establish" : "accept";
Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
- out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString() << "\n" << *_exception.get();
+ out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString()
+ << "\n" << *_exception.get();
}
}
else
@@ -2143,6 +2146,11 @@ Ice::ConnectionI::finish()
}
}
+ if(close)
+ {
+ _transceiver->close();
+ }
+
if(_startCallback)
{
_startCallback->connectionStartFailed(this, *_exception.get());
@@ -2574,19 +2582,20 @@ Ice::ConnectionI::setState(State state)
return;
}
- _threadPool->finish(this);
-#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- _transceiver->close();
-#endif
+ //
+ // Don't need to close now for connections so only close the transceiver
+ // if the selector request it.
+ //
+ if(_threadPool->finish(this, false))
+ {
+ _transceiver->close();
+ }
break;
}
case StateFinished:
{
assert(_state == StateClosed);
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- _transceiver->close();
-#endif
_communicator = 0;
break;
}
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 3ecec79a247..6ed4618eed7 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -212,7 +212,7 @@ public:
#endif
virtual void message(IceInternal::ThreadPoolCurrent&);
- virtual void finished(IceInternal::ThreadPoolCurrent&);
+ virtual void finished(IceInternal::ThreadPoolCurrent&, bool);
virtual std::string toString() const; // From Connection and EvantHandler.
virtual IceInternal::NativeInfoPtr getNativeInfo();
@@ -228,7 +228,7 @@ public:
void dispatch(const StartCallbackPtr&, const std::vector<OutgoingMessage>&, Byte, Int, Int,
const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, const IceInternal::OutgoingAsyncPtr&,
const ConnectionCallbackPtr&, IceInternal::BasicStream&);
- void finish();
+ void finish(bool);
void closeCallback(const ConnectionCallbackPtr&);
diff --git a/cpp/src/Ice/EventHandler.h b/cpp/src/Ice/EventHandler.h
index 705efd208d9..2ec0a6841b1 100644
--- a/cpp/src/Ice/EventHandler.h
+++ b/cpp/src/Ice/EventHandler.h
@@ -40,7 +40,7 @@ public:
//
// Called when the event handler is unregistered.
//
- virtual void finished(ThreadPoolCurrent&) = 0;
+ virtual void finished(ThreadPoolCurrent&, bool) = 0;
//
// Get a textual representation of the event handler.
diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp
index e7cc3e1bfc7..ef36a25c71f 100644
--- a/cpp/src/Ice/Selector.cpp
+++ b/cpp/src/Ice/Selector.cpp
@@ -173,7 +173,7 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation
}
void
-Selector::finish(EventHandler* handler)
+Selector::finish(IceInternal::EventHandler* handler)
{
handler->_registered = SocketOperationNone;
handler->__decRef();
@@ -414,13 +414,30 @@ Selector::disable(EventHandler* handler, SocketOperation status)
}
}
-void
-Selector::finish(EventHandler* handler)
+bool
+Selector::finish(EventHandler* handler, bool closeNow)
{
if(handler->_registered)
{
- update(handler, handler->_registered, SocketOperationNone);
+ if(closeNow)
+ {
+ //
+ // Don't bother to un-register if the call wants to close
+ // the FD now, kqueue/epoll will automatically unregister
+ // the FD when it's closed.
+ //
+ handler->_registered = SocketOperationNone;
+ }
+ else
+ {
+ //
+ // If close on finish is requested, we can safely
+ // unregister the FD from the selector.
+ //
+ update(handler, handler->_registered, SocketOperationNone);
+ }
}
+ return closeNow;
}
#if defined(ICE_USE_KQUEUE)
@@ -603,13 +620,15 @@ Selector::disable(EventHandler* handler, SocketOperation status)
}
}
-void
-Selector::finish(EventHandler* handler)
+bool
+Selector::finish(EventHandler* handler, bool closeNow)
{
if(handler->_registered)
{
update(handler, handler->_registered, SocketOperationNone);
+ return false; // Don't close now if selecting.
}
+ return closeNow;
}
void
@@ -798,7 +817,7 @@ Selector::updateImpl(EventHandler* handler)
if(interrupted())
{
continue;
- }
+ }
Ice::SocketException ex(__FILE__, __LINE__);
ex.error = IceInternal::getSocketErrno();
diff --git a/cpp/src/Ice/Selector.h b/cpp/src/Ice/Selector.h
index 331814b16f7..196d96ccf54 100644
--- a/cpp/src/Ice/Selector.h
+++ b/cpp/src/Ice/Selector.h
@@ -46,12 +46,11 @@ class SelectorTimeoutException
struct SelectEvent
{
- SelectEvent(IceInternal::EventHandler* handler, SocketOperation status) :
- handler(handler), status(status)
+ SelectEvent(EventHandler* handler, SocketOperation status) : handler(handler), status(status)
{
}
- IceInternal::EventHandler* handler;
+ EventHandler* handler;
SocketOperation status;
};
@@ -63,13 +62,13 @@ public:
void destroy();
- void initialize(IceInternal::EventHandler*);
- void update(IceInternal::EventHandler*, SocketOperation, SocketOperation);
- void finish(IceInternal::EventHandler*);
+ void initialize(EventHandler*);
+ void update(EventHandler*, SocketOperation, SocketOperation);
+ void finish(EventHandler*);
- IceInternal::EventHandler* getNextHandler(SocketOperation&, int);
+ EventHandler* getNextHandler(SocketOperation&, int);
- void completed(IceInternal::EventHandler*, SocketOperation);
+ void completed(EventHandler*, SocketOperation);
private:
@@ -121,11 +120,11 @@ public:
void update(EventHandler*, SocketOperation, SocketOperation);
void enable(EventHandler*, SocketOperation);
void disable(EventHandler*, SocketOperation);
- void finish(EventHandler*);
+ bool finish(EventHandler*, bool);
#if defined(ICE_USE_KQUEUE)
void updateSelector();
-#endif
+#endif
void
startSelect()
@@ -180,7 +179,7 @@ public:
void update(EventHandler*, SocketOperation, SocketOperation);
void enable(EventHandler*, SocketOperation);
void disable(EventHandler*, SocketOperation);
- void finish(EventHandler*);
+ bool finish(EventHandler*, bool);
void startSelect();
void finishSelect();
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 1db49555d9e..d748ec70a7e 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -63,19 +63,20 @@ class FinishedWorkItem : public ThreadPoolWorkItem
{
public:
- FinishedWorkItem(const EventHandlerPtr& handler) : _handler(handler)
+ FinishedWorkItem(const EventHandlerPtr& handler, bool close) : _handler(handler), _close(close)
{
}
virtual void
execute(ThreadPoolCurrent& current)
{
- _handler->finished(current);
+ _handler->finished(current, _close);
}
private:
const EventHandlerPtr _handler;
+ const bool _close;
};
class JoinThreadWorkItem : public ThreadPoolWorkItem
@@ -297,7 +298,7 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current)
}
void
-IceInternal::ThreadPoolWorkQueue::finished(ThreadPoolCurrent&)
+IceInternal::ThreadPoolWorkQueue::finished(ThreadPoolCurrent&, bool)
{
assert(false);
}
@@ -585,32 +586,27 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation
#endif
}
-void
-IceInternal::ThreadPool::finish(const EventHandlerPtr& handler)
+bool
+IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow)
{
Lock sync(*this);
assert(!_destroyed);
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- _selector.finish(handler.get()); // This must be called before!
- _workQueue->queue(new FinishedWorkItem(handler));
-
- //
- // Clear the current ready handlers. The handlers from this vector can't be
- // reference counted and a handler might get destroyed once it's finished.
- //
- //_handlers.clear();
- //_nextHandler = _handlers.end();
+ closeNow = _selector.finish(handler.get(), closeNow); // This must be called before!
+ _workQueue->queue(new FinishedWorkItem(handler, !closeNow));
+ return closeNow;
#else
// If there are no pending asynchronous operations, we can call finish on the handler now.
if(!handler->_pending)
{
- _workQueue->queue(new FinishedWorkItem(handler));
+ _workQueue->queue(new FinishedWorkItem(handler, false));
_selector.finish(handler.get());
}
else
{
handler->_finish = true;
}
+ return true; // Always close now to interrupt the pending call.
#endif
}
@@ -669,7 +665,7 @@ IceInternal::ThreadPool::joinWithAllThreads()
}
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- _selector.finish(_workQueue.get());
+ _selector.finish(_workQueue.get(), true);
#endif
_selector.destroy();
}
@@ -1109,7 +1105,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
if(!current._handler->_pending && current._handler->_finish)
{
- _workQueue->queue(new FinishedWorkItem(current._handler));
+ _workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
}
return false;
@@ -1123,7 +1119,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
if(!current._handler->_pending && current._handler->_finish)
{
- _workQueue->queue(new FinishedWorkItem(current._handler));
+ _workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
}
return false;
@@ -1146,7 +1142,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
if(!current._handler->_pending && current._handler->_finish)
{
- _workQueue->queue(new FinishedWorkItem(current._handler));
+ _workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
}
return false;
@@ -1177,7 +1173,7 @@ IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current)
if(!current._handler->_pending && current._handler->_finish)
{
// There are no more pending async operations, it's time to call finish.
- _workQueue->queue(new FinishedWorkItem(current._handler));
+ _workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
}
}
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h
index 15c913c6c07..b59a2cab1c8 100644
--- a/cpp/src/Ice/ThreadPool.h
+++ b/cpp/src/Ice/ThreadPool.h
@@ -105,7 +105,7 @@ public:
{
update(handler, status, SocketOperationNone);
}
- void finish(const EventHandlerPtr&);
+ bool finish(const EventHandlerPtr&, bool);
void dispatchFromThisThread(const DispatchWorkItemPtr&);
void dispatch(const DispatchWorkItemPtr&);
@@ -231,7 +231,7 @@ public:
#endif
virtual void message(ThreadPoolCurrent&);
- virtual void finished(ThreadPoolCurrent&);
+ virtual void finished(ThreadPoolCurrent&, bool);
virtual std::string toString() const;
virtual NativeInfoPtr getNativeInfo();
virtual void postMessage();