diff options
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 44 |
1 files changed, 35 insertions, 9 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 014311c8c2f..4484d8ea99d 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -28,7 +28,9 @@ IceInternal::ThreadPool::_register(int fd, const EventHandlerPtr& handler) { JTCSyncT<JTCMonitorT<JTCMutex> > sync(*this); if (handler->server()) + { ++_servers; + } _adds.push_back(make_pair(fd, handler)); setInterrupt(); } @@ -58,7 +60,7 @@ IceInternal::ThreadPool::waitUntilServerFinished() { wait(); } - catch(const JTCInterruptedException&) + catch (const JTCInterruptedException&) { } } @@ -75,7 +77,7 @@ IceInternal::ThreadPool::waitUntilFinished() { wait(); } - catch(const JTCInterruptedException&) + catch (const JTCInterruptedException&) { } } @@ -114,12 +116,14 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) : try { int threadNum = 10; - string value = _instance->properties()->getProperty("ice.thread_pool.size"); + string value = _instance->properties()->getProperty("Ice.ThreadPool.Size"); if (!value.empty()) { threadNum = atoi(value.c_str()); if (threadNum < 1) + { threadNum = 1; + } } for (int i = 0 ; i < threadNum ; ++i) @@ -129,7 +133,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) : _threads.push_back(thread); } } - catch(const JTCException&) + catch (const JTCException&) { destroy(); throw; @@ -192,7 +196,9 @@ IceInternal::ThreadPool::run() if (::select(_maxFd + 1, &fdSet, 0, 0, 0) == SOCKET_ERROR) { if (interrupted()) + { goto repeatSelect; + } _threadMutex.unlock(); throw SocketException(__FILE__, __LINE__); @@ -259,20 +265,28 @@ IceInternal::ThreadPool::run() #endif q->second->finished(); if (q->second->server()) + { --_servers; + } _handlers.erase(q); } _removes.clear(); _maxFd = _fdIntrRead; if (!_handlers.empty()) + { _maxFd = max(_maxFd, (--_handlers.end())->first); + } again = true; if (_handlers.empty() || _servers == 0) + { notifyAll(); // For waitUntil...Finished() methods + } } if (again) + { goto repeatSelect; + } // // Round robin for the filedescriptors @@ -280,7 +294,9 @@ IceInternal::ThreadPool::run() do { if (++_lastFd > _maxFd) + { _lastFd = 0; + } } while (!FD_ISSET(_lastFd, &fdSet)); @@ -299,11 +315,11 @@ IceInternal::ThreadPool::run() { read(handler); } - catch(const TimeoutException&) // Expected + catch (const TimeoutException&) // Expected { goto repeatSelect; } - catch(const LocalException& ex) + catch (const LocalException& ex) { handler->exception(ex); goto repeatSelect; @@ -332,7 +348,9 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler) handler->read(stream); if (stream.i != stream.b.end()) + { return; + } } if (stream.b.size() >= 8) // Interpret header? @@ -345,23 +363,31 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler) Byte protVer; stream.read(protVer); if (protVer != 0) + { throw UnsupportedProtocolException(__FILE__, __LINE__); + } Byte encVer; stream.read(encVer); if (encVer != 0) + { throw UnsupportedEncodingException(__FILE__, __LINE__); + } Byte messageType; stream.read(messageType); Int size; stream.read(size); if (size > 1024 * 1024) // TODO: configurable + { throw ::Ice::MemoryLimitException(__FILE__, __LINE__); + } stream.b.resize(size); stream.i = stream.b.begin() + pos; } if (stream.b.size() > 8 && stream.i != stream.b.end()) + { handler->read(stream); + } } void @@ -371,15 +397,15 @@ IceInternal::ThreadPool::EventHandlerThread::run() { _pool->run(); } - catch(const LocalException& ex) + catch (const LocalException& ex) { cerr << ex << endl; } - catch(const JTCException& ex) + catch (const JTCException& ex) { cerr << ex << endl; } - catch(...) + catch (...) { cerr << "unknown exception" << endl; } |