summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2008-03-06 10:13:42 +0100
committerBenoit Foucher <benoit@zeroc.com>2008-03-06 10:13:42 +0100
commitc6dbd090d9691cc0116a2967b2827b858b184dfe (patch)
tree6d2ad80c98665c9090b16f97c400ab4b33c7ab73 /cpp/src/Ice/ThreadPool.cpp
parentMerge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff)
downloadice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.bz2
ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.xz
ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.zip
Removed thread-per-connection and added serialize mode
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp205
1 files changed, 111 insertions, 94 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index d0db3c20276..07d7b08492e 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -33,6 +33,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
_size(0),
_sizeMax(0),
_sizeWarn(0),
+ _serialize(_instance->initializationData().properties->getPropertyAsInt(_prefix + ".Serialize") > 0),
_stackSize(0),
_running(0),
_inUse(0),
@@ -125,7 +126,6 @@ IceInternal::ThreadPool::destroy()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
- assert(_handlerMap.empty());
_destroyed = true;
_selector.setInterrupt();
}
@@ -133,9 +133,7 @@ IceInternal::ThreadPool::destroy()
void
IceInternal::ThreadPool::incFdsInUse()
{
- // This is windows specific since every other platform uses an API
- // that doesn't have a specific FD limit.
-#ifdef _WIN32
+#ifdef ICE_USE_SELECT
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
_selector.incFdsInUse();
#endif
@@ -144,29 +142,56 @@ IceInternal::ThreadPool::incFdsInUse()
void
IceInternal::ThreadPool::decFdsInUse()
{
- // This is windows specific since every other platform uses an API
- // that doesn't have a specific FD limit.
-#ifdef _WIN32
+#ifdef ICE_USE_SELECT
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
_selector.decFdsInUse();
#endif
}
void
-IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
+IceInternal::ThreadPool::_register(const EventHandlerPtr& handler)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(!_destroyed);
- _changes.push_back(make_pair(fd, handler));
- _selector.setInterrupt();
+ assert(!_destroyed && handler->_fd != INVALID_SOCKET);
+ if(!handler->_registered)
+ {
+ if(!handler->_serializing)
+ {
+ _selector.add(handler.get(), NeedRead);
+ }
+ handler->_registered = true;
+ }
}
void
-IceInternal::ThreadPool::unregister(SOCKET fd)
+IceInternal::ThreadPool::unregister(const EventHandlerPtr& handler)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(!_destroyed);
- _changes.push_back(make_pair(fd, EventHandlerPtr(0)));
+ assert(!_destroyed && handler->_fd != INVALID_SOCKET);
+ if(handler->_registered)
+ {
+ if(!handler->_serializing)
+ {
+ _selector.remove(handler.get(), NeedRead);
+ }
+ handler->_registered = false;
+ }
+}
+
+void
+IceInternal::ThreadPool::finish(const EventHandlerPtr& handler)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(!_destroyed && handler->_fd != INVALID_SOCKET);
+ if(handler->_registered)
+ {
+ if(!handler->_serializing)
+ {
+ _selector.remove(handler.get(), NeedRead);
+ }
+ handler->_registered = false;
+ }
+ _finished.push_back(handler);
_selector.setInterrupt();
}
@@ -183,12 +208,21 @@ IceInternal::ThreadPool::execute(const ThreadPoolWorkItemPtr& workItem)
}
void
-IceInternal::ThreadPool::promoteFollower()
+IceInternal::ThreadPool::promoteFollower(EventHandler* handler)
{
if(_sizeMax > 1)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_serialize && handler)
+ {
+ handler->_serializing = true;
+ if(handler->_registered)
+ {
+ _selector.remove(handler, NeedRead, true); // No interrupt, no thread is blocked on select().
+ }
+ }
+
assert(!_promote);
_promote = true;
notify();
@@ -283,85 +317,63 @@ IceInternal::ThreadPool::run()
bool finished = false;
bool shutdown = false;
+ if(ret == 0) // We initiate a shutdown if there is a thread pool timeout.
+ {
+ shutdown = true;
+ }
+ else
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(ret == 0) // We initiate a shutdown if there is a thread pool timeout.
+ if(_selector.isInterrupted())
{
- shutdown = true;
- }
- else
- {
- if(_selector.isInterrupted())
+ if(_selector.processInterrupt())
+ {
+ continue;
+ }
+
+ //
+ // There are three possiblities for an interrupt:
+ //
+ // 1. An event handler is being finished (closed).
+ //
+ // 2. The thread pool has been destroyed.
+ //
+ // 3. A work item has been schedulded.
+ //
+ if(!_finished.empty())
+ {
+ _selector.clearInterrupt();
+ handler = _finished.front();
+ _finished.pop_front();
+ finished = true;
+ }
+ else if(!_workItems.empty())
{
//
- // There are two possiblities for an interrupt:
- //
- // 1. The thread pool has been destroyed.
+ // Work items must be executed first even if the thread pool is destroyed.
//
- // 2. An event handler was registered or unregistered.
+ _selector.clearInterrupt();
+ workItem = _workItems.front();
+ _workItems.pop_front();
+ }
+ else if(_destroyed)
+ {
//
- // 3. A work item has been schedulded.
+ // Don't clear the interrupt if destroyed, so that the other threads exit as well.
//
-
- if(!_workItems.empty())
- {
- //
- // Work items must be executed first even if the thread pool is destroyed.
- //
- _selector.clearInterrupt();
- workItem = _workItems.front();
- _workItems.pop_front();
- }
- else if(_destroyed)
- {
- //
- // Don't clear the interrupt if destroyed, so that the other threads exit as well.
- //
- return true;
- }
- else
- {
- //
- // An event handler must have been registered or unregistered.
- //
- _selector.clearInterrupt();
-
- assert(!_changes.empty());
- pair<SOCKET, EventHandlerPtr> change = _changes.front();
- _changes.pop_front();
-
- if(change.second) // Addition if handler is set.
- {
- _handlerMap.insert(change);
- _selector.add(change.first, NeedRead);
- continue;
- }
- else // Removal if handler is not set.
- {
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first);
- assert(p != _handlerMap.end());
- handler = p->second;
- finished = true;
- _handlerMap.erase(p);
- _selector.remove(change.first, NeedRead);
- // Don't continue; we have to call finished() on the event handler below,
- // outside the thread synchronization.
- }
- }
+ return true;
}
else
{
- SOCKET fd = _selector.getNextSelected();
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(fd);
- if(p == _handlerMap.end())
- {
- Error out(_instance->initializationData().logger);
- out << "filedescriptor " << fd << " not registered with `" << _prefix << "'";
- continue;
- }
-
- handler = p->second;
+ assert(false);
+ }
+ }
+ else
+ {
+ handler = _selector.getNextSelected();
+ if(!handler)
+ {
+ continue;
}
}
}
@@ -423,14 +435,12 @@ IceInternal::ThreadPool::run()
if(finished)
{
//
- // Notify a handler about it's removal from the thread
- // pool.
+ // Notify a handler about it's removal from the thread pool.
//
try
{
//
- // "self" is faster than "this", as the reference
- // count is not modified.
+ // "self" is faster than "this", as the reference count is not modified.
//
handler->finished(self);
}
@@ -451,8 +461,7 @@ IceInternal::ThreadPool::run()
else
{
//
- // If the handler is "readable", try to read a
- // message.
+ // If the handler is "readable", try to read a message.
//
BasicStream stream(_instance.get());
if(handler->readable())
@@ -505,8 +514,7 @@ IceInternal::ThreadPool::run()
try
{
//
- // "self" is faster than "this", as the reference
- // count is not modified.
+ // "self" is faster than "this", as the reference count is not modified.
//
handler->message(stream, self);
}
@@ -529,9 +537,19 @@ IceInternal::ThreadPool::run()
if(_sizeMax > 1)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
+
if(!_destroyed)
{
+ if(_serialize && handler && handler->_serializing)
+ {
+ if(handler->_registered)
+ {
+ // No interrupt if no thread is blocked on select (_promote == true)
+ _selector.add(handler.get(), NeedRead, _promote);
+ }
+ handler->_serializing = false;
+ }
+
//
// First we reap threads that have been destroyed before.
//
@@ -599,8 +617,7 @@ IceInternal::ThreadPool::run()
assert(_inUse > 0);
--_inUse;
}
-
-
+
while(!_promote)
{
wait();