summaryrefslogtreecommitdiff
path: root/cpp/src/Glacier2/RequestQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Glacier2/RequestQueue.h')
-rw-r--r--cpp/src/Glacier2/RequestQueue.h89
1 files changed, 44 insertions, 45 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.h b/cpp/src/Glacier2/RequestQueue.h
index 6ef8055cf61..631d90240c9 100644
--- a/cpp/src/Glacier2/RequestQueue.h
+++ b/cpp/src/Glacier2/RequestQueue.h
@@ -5,8 +5,6 @@
#ifndef REQUEST_H
#define REQUEST_H
-#include <IceUtil/Thread.h>
-#include <IceUtil/Monitor.h>
#include <Ice/Ice.h>
#include <Glacier2/Instrumentation.h>
@@ -17,99 +15,100 @@ namespace Glacier2
{
class Instance;
-typedef IceUtil::Handle<Instance> InstancePtr;
-
class Request;
-typedef IceUtil::Handle<Request> RequestPtr;
-
class RequestQueueThread;
-typedef IceUtil::Handle<RequestQueueThread> RequestQueueThreadPtr;
-class Request : public Ice::LocalObject
+class Request
{
public:
- Request(const Ice::ObjectPrx&, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const Ice::Current&, bool,
- const Ice::Context&, const Ice::AMD_Object_ice_invokePtr&);
-
- Ice::AsyncResultPtr invoke(const Ice::Callback_Object_ice_invokePtr& callback);
- bool override(const RequestPtr&) const;
- void addBatchProxy(std::set<Ice::ObjectPrx>&);
+ Request(std::shared_ptr<Ice::ObjectPrx>,
+ const std::pair<const Ice::Byte*, const Ice::Byte*>&,
+ const Ice::Current&,
+ bool,
+ const Ice::Context&,
+ std::function<void(bool, std::pair<const Ice::Byte*, const Ice::Byte*>)>,
+ std::function<void(std::exception_ptr)>);
+
+ void invoke(std::function<void(bool, std::pair<const Ice::Byte*, const Ice::Byte*>)>&&,
+ std::function<void(std::exception_ptr)>&&,
+ std::function<void(bool)>&& = nullptr);
+ bool override(const std::shared_ptr<Request>&) const;
bool hasOverride() const { return !_override.empty(); }
private:
friend class RequestQueue;
void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&);
- void exception(const Ice::Exception&);
+ void exception(std::exception_ptr);
void queued();
- const Ice::ObjectPrx _proxy;
+ const std::shared_ptr<Ice::ObjectPrx> _proxy;
const Ice::ByteSeq _inParams;
const Ice::Current _current;
const bool _forwardContext;
const Ice::Context _sslContext;
const std::string _override;
- const Ice::AMD_Object_ice_invokePtr _amdCB;
+ std::function<void(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&)> _response;
+ std::function<void(std::exception_ptr)> _exception;
};
-class RequestQueue : public IceUtil::Mutex, public IceUtil::Shared
+class RequestQueue : public std::enable_shared_from_this<RequestQueue>
{
public:
- RequestQueue(const RequestQueueThreadPtr&, const InstancePtr&, const Ice::ConnectionPtr&);
+ RequestQueue(std::shared_ptr<RequestQueueThread>, std::shared_ptr<Instance>, std::shared_ptr<Ice::Connection>);
- bool addRequest(const RequestPtr&);
+ bool addRequest(std::shared_ptr<Request>);
void flushRequests();
void destroy();
- void updateObserver(const Glacier2::Instrumentation::SessionObserverPtr&);
+ void updateObserver(std::shared_ptr<Glacier2::Instrumentation::SessionObserver>);
private:
- void destroyInternal();
-
void flush();
- void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const RequestPtr&);
- void exception(const Ice::Exception&, const RequestPtr&);
- void sent(bool, const RequestPtr&);
+ void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const std::shared_ptr<Request>&);
+ void exception(std::exception_ptr, const std::shared_ptr<Request>&);
+ void sent(bool, const std::shared_ptr<Request>&);
- const RequestQueueThreadPtr _requestQueueThread;
- const InstancePtr _instance;
- const Ice::ConnectionPtr _connection;
- const Ice::Callback_Object_ice_invokePtr _callback;
- const Ice::Callback_Connection_flushBatchRequestsPtr _flushCallback;
+ const std::shared_ptr<RequestQueueThread> _requestQueueThread;
+ const std::shared_ptr<Instance> _instance;
+ const std::shared_ptr<Ice::Connection> _connection;
- std::deque<RequestPtr> _requests;
- std::set<Ice::ObjectPrx> _batchProxies;
+ std::deque<std::shared_ptr<Request>> _requests;
bool _pendingSend;
- RequestPtr _pendingSendRequest;
+ std::shared_ptr<Request> _pendingSendRequest;
bool _destroyed;
- Glacier2::Instrumentation::SessionObserverPtr _observer;
+ std::shared_ptr<Glacier2::Instrumentation::SessionObserver> _observer;
+
+ std::mutex _mutex;
};
-typedef IceUtil::Handle<RequestQueue> RequestQueuePtr;
-class RequestQueueThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex>
+class RequestQueueThread
{
public:
- RequestQueueThread(const IceUtil::Time&);
- virtual ~RequestQueueThread();
+ RequestQueueThread(std::chrono::milliseconds);
+ ~RequestQueueThread();
- void flushRequestQueue(const RequestQueuePtr&);
+ void flushRequestQueue(std::shared_ptr<RequestQueue>);
void destroy();
- virtual void run();
-
private:
+ void run();
- const IceUtil::Time _sleepTime;
+ const std::chrono::milliseconds _sleepTime;
bool _destroy;
bool _sleep;
- IceUtil::Time _sleepDuration;
- std::vector<RequestQueuePtr> _queues;
+
+ std::vector<std::shared_ptr<RequestQueue>> _queues;
+
+ std::mutex _mutex;
+ std::condition_variable _condVar;
+ std::thread _thread;
};
}