summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorDwayne Boone <dwayne@zeroc.com>2009-10-02 09:35:28 -0230
committerDwayne Boone <dwayne@zeroc.com>2009-10-02 09:35:28 -0230
commit6d8d16b3761eaa24c9c754dd0f2cc1a70de8fad0 (patch)
treeff37056717cff166bc705112f54e98074f8d1f40 /cpp/src/Ice/ThreadPool.cpp
parent3772. Recovering from Glacier2 / Ice router session failure. (diff)
downloadice-6d8d16b3761eaa24c9c754dd0f2cc1a70de8fad0.tar.bz2
ice-6d8d16b3761eaa24c9c754dd0f2cc1a70de8fad0.tar.xz
ice-6d8d16b3761eaa24c9c754dd0f2cc1a70de8fad0.zip
C++Builder 2010 support
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp315
1 files changed, 151 insertions, 164 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index d5ff021a660..3834d0898dc 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -112,230 +112,217 @@ class ThreadPoolDestroyedException
}
-namespace IceInternal
-{
-
-class ThreadPoolWorkQueue : public EventHandler, public IceUtil::Mutex
-{
-public:
-
- ThreadPoolWorkQueue(ThreadPool* threadPool, const InstancePtr& instance, Selector& selector) :
- _threadPool(threadPool),
- _instance(instance),
- _selector(selector),
- _destroyed(false)
+IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(ThreadPool* threadPool,
+ const InstancePtr& instance,
+ Selector& selector) :
+ _threadPool(threadPool),
+ _instance(instance),
+ _selector(selector),
+ _destroyed(false)
#ifdef ICE_USE_IOCP
- , _info(SocketOperationRead)
+ , _info(SocketOperationRead)
#endif
- {
+{
#ifndef ICE_USE_IOCP
- SOCKET fds[2];
- createPipe(fds);
- _fdIntrRead = fds[0];
- _fdIntrWrite = fds[1];
+ SOCKET fds[2];
+ createPipe(fds);
+ _fdIntrRead = fds[0];
+ _fdIntrWrite = fds[1];
- _selector.initialize(this);
- _selector.update(this, SocketOperationNone, SocketOperationRead);
+ _selector.initialize(this);
+ _selector.update(this, SocketOperationNone, SocketOperationRead);
#endif
- }
+}
- ~ThreadPoolWorkQueue()
- {
- assert(_destroyed);
+IceInternal::ThreadPoolWorkQueue::~ThreadPoolWorkQueue()
+{
+ assert(_destroyed);
#ifndef ICE_USE_IOCP
- try
- {
- closeSocket(_fdIntrRead);
- }
- catch(const LocalException& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "exception in selector while calling closeSocket():\n" << ex;
- }
-
- try
- {
- closeSocket(_fdIntrWrite);
- }
- catch(const LocalException& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "exception in selector while calling closeSocket():\n" << ex;
- }
-#endif
+ try
+ {
+ closeSocket(_fdIntrRead);
}
-
- void destroy()
+ catch(const LocalException& ex)
{
- Lock sync(*this);
- assert(!_destroyed);
- _destroyed = true;
- postMessage();
+ Error out(_instance->initializationData().logger);
+ out << "exception in selector while calling closeSocket():\n" << ex;
+ }
+
+ try
+ {
+ closeSocket(_fdIntrWrite);
}
+ catch(const LocalException& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in selector while calling closeSocket():\n" << ex;
+ }
+#endif
+}
+
+void
+IceInternal::ThreadPoolWorkQueue::destroy()
+{
+ Lock sync(*this);
+ assert(!_destroyed);
+ _destroyed = true;
+ postMessage();
+}
- void queue(const ThreadPoolWorkItemPtr& item)
+void
+IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item)
+{
+ Lock sync(*this);
+ if(_destroyed)
{
- Lock sync(*this);
- if(_destroyed)
- {
- throw CommunicatorDestroyedException(__FILE__, __LINE__);
- }
- _workItems.push_back(item);
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
+ _workItems.push_back(item);
#ifndef ICE_USE_IOCP
- if(_workItems.size() == 1)
- {
- postMessage();
- }
-#else
+ if(_workItems.size() == 1)
+ {
postMessage();
-#endif
}
+#else
+ postMessage();
+#endif
+}
#ifdef ICE_USE_IOCP
- bool startAsync(SocketOperation)
- {
- assert(false);
- return false;
- }
+bool
+IceInternal::ThreadPoolWorkQueue::startAsync(SocketOperation)
+{
+ assert(false);
+ return false;
+}
- bool finishAsync(SocketOperation)
- {
- assert(false);
- return false;
- }
+bool
+IceInternal::ThreadPoolWorkQueue::finishAsync(SocketOperation)
+{
+ assert(false);
+ return false;
+}
#endif
- virtual void message(ThreadPoolCurrent& current)
- {
- ThreadPoolWorkItemPtr workItem;
- {
- Lock sync(*this);
- if(!_workItems.empty())
- {
- workItem = _workItems.front();
- _workItems.pop_front();
+void
+IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current)
+{
+ ThreadPoolWorkItemPtr workItem;
+ {
+ Lock sync(*this);
+ if(!_workItems.empty())
+ {
+ workItem = _workItems.front();
+ _workItems.pop_front();
#ifndef ICE_USE_IOCP
- if(_workItems.empty())
+ if(_workItems.empty())
+ {
+ char c;
+ while(true)
{
- char c;
- while(true)
- {
- ssize_t ret;
+ ssize_t ret;
#ifdef _WIN32
- ret = ::recv(_fdIntrRead, &c, 1, 0);
+ ret = ::recv(_fdIntrRead, &c, 1, 0);
#else
- ret = ::read(_fdIntrRead, &c, 1);
+ ret = ::read(_fdIntrRead, &c, 1);
#endif
- if(ret == SOCKET_ERROR)
+ if(ret == SOCKET_ERROR)
+ {
+ if(interrupted())
{
- if(interrupted())
- {
- continue;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ continue;
}
- break;
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
+ break;
}
-#endif
}
- else
- {
- assert(_destroyed);
-#ifdef ICE_USE_IOCP
- postMessage();
#endif
- }
- }
-
- if(workItem)
- {
- workItem->execute(current);
}
else
{
- current.ioCompleted();
- throw ThreadPoolDestroyedException();
+ assert(_destroyed);
+#ifdef ICE_USE_IOCP
+ postMessage();
+#endif
}
}
- virtual void finished(ThreadPoolCurrent&)
+ if(workItem)
{
- assert(false);
+ workItem->execute(current);
}
-
- virtual string toString() const
+ else
{
- return "work queue";
+ current.ioCompleted();
+ throw ThreadPoolDestroyedException();
}
+}
- virtual NativeInfoPtr getNativeInfo()
- {
+void
+IceInternal::ThreadPoolWorkQueue::finished(ThreadPoolCurrent&)
+{
+ assert(false);
+}
+
+string
+IceInternal::ThreadPoolWorkQueue::toString() const
+{
+ return "work queue";
+}
+
+NativeInfoPtr
+IceInternal::ThreadPoolWorkQueue::getNativeInfo()
+{
#ifndef ICE_USE_IOCP
- return new NativeInfo(_fdIntrRead);
+ return new NativeInfo(_fdIntrRead);
#endif
- return 0;
- }
+ return 0;
+}
- virtual void postMessage()
- {
+void
+IceInternal::ThreadPoolWorkQueue::postMessage()
+{
#ifndef ICE_USE_IOCP
- char c = 0;
- while(true)
- {
+ char c = 0;
+ while(true)
+ {
#ifdef _WIN32
- if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR)
+ if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR)
#else
- if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR)
+ if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR)
#endif
+ {
+ if(interrupted())
{
- if(interrupted())
- {
- continue;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ continue;
}
- break;
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
+ break;
+ }
#else
- if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this),
+ if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this),
#if defined(_MSC_VER) && (_MSC_VER < 1300) // COMPILER FIX: VC60
- reinterpret_cast<LPOVERLAPPED>(&_info)
+ reinterpret_cast<LPOVERLAPPED>(&_info)
#else
- &_info
-#endif
- ))
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = GetLastError();
- throw ex;
- }
+ &_info
#endif
+ ))
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = GetLastError();
+ throw ex;
}
-
-private:
-
- const ThreadPool* _threadPool;
- const InstancePtr _instance;
- Selector& _selector;
- bool _destroyed;
-#ifdef ICE_USE_IOCP
- AsyncInfo _info;
-#else
- SOCKET _fdIntrRead;
- SOCKET _fdIntrWrite;
#endif
- list<ThreadPoolWorkItemPtr> _workItems;
-};
-
}
IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) :