summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ThreadPool.h')
-rw-r--r--cpp/src/Ice/ThreadPool.h78
1 files changed, 37 insertions, 41 deletions
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h
index ba1626ea2e7..d1d6ba8e82c 100644
--- a/cpp/src/Ice/ThreadPool.h
+++ b/cpp/src/Ice/ThreadPool.h
@@ -23,7 +23,7 @@
#include <Ice/PropertiesF.h>
#include <Ice/EventHandler.h>
#include <Ice/Selector.h>
-#include <Ice/BasicStream.h>
+#include <Ice/InputStream.h>
#include <Ice/ObserverHelper.h>
#include <set>
@@ -35,12 +35,12 @@ namespace IceInternal
class ThreadPoolCurrent;
class ThreadPoolWorkQueue;
-typedef IceUtil::Handle<ThreadPoolWorkQueue> ThreadPoolWorkQueuePtr;
+ICE_DEFINE_PTR(ThreadPoolWorkQueuePtr, ThreadPoolWorkQueue);
-class ThreadPoolWorkItem : virtual public IceUtil::Shared
+class ThreadPoolWorkItem : public virtual IceUtil::Shared
{
public:
-
+
virtual void execute(ThreadPoolCurrent&) = 0;
};
typedef IceUtil::Handle<ThreadPoolWorkItem> ThreadPoolWorkItemPtr;
@@ -51,8 +51,8 @@ public:
DispatchWorkItem();
DispatchWorkItem(const Ice::ConnectionPtr& connection);
-
- const Ice::ConnectionPtr&
+
+ const Ice::ConnectionPtr&
getConnection()
{
return _connection;
@@ -66,18 +66,18 @@ private:
};
typedef IceUtil::Handle<DispatchWorkItem> DispatchWorkItemPtr;
-class ThreadPool : public IceUtil::Shared, public IceUtil::Monitor<IceUtil::Mutex>
+class ThreadPool : public IceUtil::Shared, private IceUtil::Monitor<IceUtil::Mutex>
{
class EventHandlerThread : public IceUtil::Thread
{
public:
-
+
EventHandlerThread(const ThreadPoolPtr&, const std::string&);
virtual void run();
void updateObserver();
void setState(Ice::Instrumentation::ThreadState);
-
+
private:
ThreadPoolPtr _pool;
@@ -106,6 +106,7 @@ public:
update(handler, status, SocketOperationNone);
}
bool finish(const EventHandlerPtr&, bool);
+ void ready(const EventHandlerPtr&, SocketOperation, bool);
void dispatchFromThisThread(const DispatchWorkItemPtr&);
void dispatch(const DispatchWorkItemPtr&);
@@ -120,7 +121,7 @@ private:
bool ioCompleted(ThreadPoolCurrent&);
-#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
bool startMessage(ThreadPoolCurrent&);
void finishMessage(ThreadPoolCurrent&);
#else
@@ -131,7 +132,11 @@ private:
std::string nextThreadId();
const InstancePtr _instance;
+#ifdef ICE_CPP11_MAPPING
+ std::function<void(std::function<void()>, const std::shared_ptr<Ice::Connection>&)> _dispatcher;
+#else
const Ice::DispatcherPtr _dispatcher;
+#endif
ThreadPoolWorkQueuePtr _workQueue;
bool _destroyed;
const std::string _prefix;
@@ -140,6 +145,7 @@ private:
friend class EventHandlerThread;
friend class ThreadPoolCurrent;
+ friend class ThreadPoolWorkQueue;
const int _size; // Number of threads that are pre-created.
const int _sizeIO; // Maximum number of threads that can concurrently perform IO.
@@ -154,11 +160,10 @@ private:
std::set<EventHandlerThreadPtr> _threads; // All threads, running or not.
int _inUse; // Number of threads that are currently in use.
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_UWP)
int _inUseIO; // Number of threads that are currently performing IO.
std::vector<std::pair<EventHandler*, SocketOperation> > _handlers;
std::vector<std::pair<EventHandler*, SocketOperation> >::const_iterator _nextHandler;
- std::set<EventHandler*> _pendingHandlers;
#endif
bool _promote;
@@ -171,14 +176,14 @@ public:
ThreadPoolCurrent(const InstancePtr&, const ThreadPoolPtr&, const ThreadPool::EventHandlerThreadPtr&);
SocketOperation operation;
- BasicStream stream; // A per-thread stream to be used by event handlers for optimization.
+ Ice::InputStream stream; // A per-thread stream to be used by event handlers for optimization.
bool ioCompleted() const
{
return _threadPool->ioCompleted(const_cast<ThreadPoolCurrent&>(*this));
}
-#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
bool startMessage()
{
return _threadPool->startMessage(const_cast<ThreadPoolCurrent&>(*this));
@@ -206,7 +211,7 @@ private:
ThreadPool::EventHandlerThreadPtr _thread;
EventHandlerPtr _handler;
bool _ioCompleted;
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_UWP)
bool _leader;
#else
DWORD _count;
@@ -215,17 +220,16 @@ private:
friend class ThreadPool;
};
-class ThreadPoolWorkQueue : public EventHandler, public IceUtil::Mutex
+class ThreadPoolWorkQueue : public EventHandler
{
public:
- ThreadPoolWorkQueue(const InstancePtr&, Selector&);
- ~ThreadPoolWorkQueue();
+ ThreadPoolWorkQueue(ThreadPool&);
void destroy();
void queue(const ThreadPoolWorkItemPtr&);
-#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
bool startAsync(SocketOperation);
bool finishAsync(SocketOperation);
#endif
@@ -234,19 +238,11 @@ public:
virtual void finished(ThreadPoolCurrent&, bool);
virtual std::string toString() const;
virtual NativeInfoPtr getNativeInfo();
- virtual void postMessage();
private:
- const InstancePtr _instance;
- Selector& _selector;
+ ThreadPool& _threadPool;
bool _destroyed;
-#ifdef ICE_USE_IOCP
- AsyncInfo _info;
-#elif !defined(ICE_OS_WINRT)
- SOCKET _fdIntrRead;
- SOCKET _fdIntrWrite;
-#endif
std::list<ThreadPoolWorkItemPtr> _workItems;
};
@@ -257,10 +253,10 @@ private:
//
// An instance of the IOScope subclass must be created within the synchronization
// of the event handler. It takes care of calling startMessage/finishMessage for
-// the IOCP implementation and ensures that finishMessage isn't called multiple
+// the IOCP implementation and ensures that finishMessage isn't called multiple
// times.
//
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_UWP)
template<class T> class ThreadPoolMessage
{
public:
@@ -290,7 +286,7 @@ public:
}
private:
-
+
ThreadPoolMessage<T>& _message;
};
friend class IOScope;
@@ -309,7 +305,7 @@ private:
ThreadPoolCurrent& _current;
};
-#else
+#else
template<class T> class ThreadPoolMessage
{
@@ -321,7 +317,7 @@ public:
IOScope(ThreadPoolMessage& message) : _message(message)
{
- // This must be called with the handler locked.
+ // This must be called with the handler locked.
_finish = _message._current.startMessage();
}
@@ -329,7 +325,7 @@ public:
{
if(_finish)
{
- // This must be called with the handler locked.
+ // This must be called with the handler locked.
_message._current.finishMessage();
}
}
@@ -344,7 +340,7 @@ public:
{
//
// Call finishMessage once IO is completed only if serialization is not enabled.
- // Otherwise, finishMessage will be called when the event handler is done with
+ // Otherwise, finishMessage will be called when the event handler is done with
// the message (it will be called from ~ThreadPoolMessage below).
//
assert(_finish);
@@ -358,22 +354,22 @@ public:
private:
ThreadPoolMessage& _message;
- bool _finish;
+ bool _finish;
};
friend class IOScope;
-
- ThreadPoolMessage(ThreadPoolCurrent& current, const T& mutex) :
+
+ ThreadPoolMessage(ThreadPoolCurrent& current, const T& mutex) :
_current(current), _mutex(mutex), _finish(false)
{
}
-
+
~ThreadPoolMessage()
{
if(_finish)
{
//
// A ThreadPoolMessage instance must be created outside the synchronization
- // of the event handler. We need to lock the event handler here to call
+ // of the event handler. We need to lock the event handler here to call
// finishMessage.
//
#if defined(__MINGW32__)
@@ -386,7 +382,7 @@ public:
}
private:
-
+
ThreadPoolCurrent& _current;
const T& _mutex;
bool _finish;