diff options
Diffstat (limited to 'cpp/src/Glacier2/RequestQueue.h')
-rw-r--r-- | cpp/src/Glacier2/RequestQueue.h | 89 |
1 files changed, 44 insertions, 45 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.h b/cpp/src/Glacier2/RequestQueue.h index 6ef8055cf61..631d90240c9 100644 --- a/cpp/src/Glacier2/RequestQueue.h +++ b/cpp/src/Glacier2/RequestQueue.h @@ -5,8 +5,6 @@ #ifndef REQUEST_H #define REQUEST_H -#include <IceUtil/Thread.h> -#include <IceUtil/Monitor.h> #include <Ice/Ice.h> #include <Glacier2/Instrumentation.h> @@ -17,99 +15,100 @@ namespace Glacier2 { class Instance; -typedef IceUtil::Handle<Instance> InstancePtr; - class Request; -typedef IceUtil::Handle<Request> RequestPtr; - class RequestQueueThread; -typedef IceUtil::Handle<RequestQueueThread> RequestQueueThreadPtr; -class Request : public Ice::LocalObject +class Request { public: - Request(const Ice::ObjectPrx&, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const Ice::Current&, bool, - const Ice::Context&, const Ice::AMD_Object_ice_invokePtr&); - - Ice::AsyncResultPtr invoke(const Ice::Callback_Object_ice_invokePtr& callback); - bool override(const RequestPtr&) const; - void addBatchProxy(std::set<Ice::ObjectPrx>&); + Request(std::shared_ptr<Ice::ObjectPrx>, + const std::pair<const Ice::Byte*, const Ice::Byte*>&, + const Ice::Current&, + bool, + const Ice::Context&, + std::function<void(bool, std::pair<const Ice::Byte*, const Ice::Byte*>)>, + std::function<void(std::exception_ptr)>); + + void invoke(std::function<void(bool, std::pair<const Ice::Byte*, const Ice::Byte*>)>&&, + std::function<void(std::exception_ptr)>&&, + std::function<void(bool)>&& = nullptr); + bool override(const std::shared_ptr<Request>&) const; bool hasOverride() const { return !_override.empty(); } private: friend class RequestQueue; void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&); - void exception(const Ice::Exception&); + void exception(std::exception_ptr); void queued(); - const Ice::ObjectPrx _proxy; + const std::shared_ptr<Ice::ObjectPrx> _proxy; const Ice::ByteSeq _inParams; const Ice::Current _current; const bool _forwardContext; const Ice::Context _sslContext; const std::string _override; - const Ice::AMD_Object_ice_invokePtr _amdCB; + std::function<void(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&)> _response; + std::function<void(std::exception_ptr)> _exception; }; -class RequestQueue : public IceUtil::Mutex, public IceUtil::Shared +class RequestQueue : public std::enable_shared_from_this<RequestQueue> { public: - RequestQueue(const RequestQueueThreadPtr&, const InstancePtr&, const Ice::ConnectionPtr&); + RequestQueue(std::shared_ptr<RequestQueueThread>, std::shared_ptr<Instance>, std::shared_ptr<Ice::Connection>); - bool addRequest(const RequestPtr&); + bool addRequest(std::shared_ptr<Request>); void flushRequests(); void destroy(); - void updateObserver(const Glacier2::Instrumentation::SessionObserverPtr&); + void updateObserver(std::shared_ptr<Glacier2::Instrumentation::SessionObserver>); private: - void destroyInternal(); - void flush(); - void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const RequestPtr&); - void exception(const Ice::Exception&, const RequestPtr&); - void sent(bool, const RequestPtr&); + void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const std::shared_ptr<Request>&); + void exception(std::exception_ptr, const std::shared_ptr<Request>&); + void sent(bool, const std::shared_ptr<Request>&); - const RequestQueueThreadPtr _requestQueueThread; - const InstancePtr _instance; - const Ice::ConnectionPtr _connection; - const Ice::Callback_Object_ice_invokePtr _callback; - const Ice::Callback_Connection_flushBatchRequestsPtr _flushCallback; + const std::shared_ptr<RequestQueueThread> _requestQueueThread; + const std::shared_ptr<Instance> _instance; + const std::shared_ptr<Ice::Connection> _connection; - std::deque<RequestPtr> _requests; - std::set<Ice::ObjectPrx> _batchProxies; + std::deque<std::shared_ptr<Request>> _requests; bool _pendingSend; - RequestPtr _pendingSendRequest; + std::shared_ptr<Request> _pendingSendRequest; bool _destroyed; - Glacier2::Instrumentation::SessionObserverPtr _observer; + std::shared_ptr<Glacier2::Instrumentation::SessionObserver> _observer; + + std::mutex _mutex; }; -typedef IceUtil::Handle<RequestQueue> RequestQueuePtr; -class RequestQueueThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex> +class RequestQueueThread { public: - RequestQueueThread(const IceUtil::Time&); - virtual ~RequestQueueThread(); + RequestQueueThread(std::chrono::milliseconds); + ~RequestQueueThread(); - void flushRequestQueue(const RequestQueuePtr&); + void flushRequestQueue(std::shared_ptr<RequestQueue>); void destroy(); - virtual void run(); - private: + void run(); - const IceUtil::Time _sleepTime; + const std::chrono::milliseconds _sleepTime; bool _destroy; bool _sleep; - IceUtil::Time _sleepDuration; - std::vector<RequestQueuePtr> _queues; + + std::vector<std::shared_ptr<RequestQueue>> _queues; + + std::mutex _mutex; + std::condition_variable _condVar; + std::thread _thread; }; } |