1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//
#ifndef REQUEST_H
#define REQUEST_H
#include <Ice/Ice.h>
#include <Glacier2/Instrumentation.h>
#include <deque>
namespace Glacier2
{
class Instance;
class Request;
class RequestQueueThread;
class Request
{
public:
Request(std::shared_ptr<Ice::ObjectPrx>,
const std::pair<const Ice::Byte*, const Ice::Byte*>&,
const Ice::Current&,
bool,
const Ice::Context&,
std::function<void(bool, std::pair<const Ice::Byte*, const Ice::Byte*>)>,
std::function<void(std::exception_ptr)>);
void invoke(std::function<void(bool, std::pair<const Ice::Byte*, const Ice::Byte*>)>&&,
std::function<void(std::exception_ptr)>&&,
std::function<void(bool)>&& = nullptr);
bool override(const std::shared_ptr<Request>&) const;
bool hasOverride() const { return !_override.empty(); }
private:
friend class RequestQueue;
void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&);
void exception(std::exception_ptr);
void queued();
const std::shared_ptr<Ice::ObjectPrx> _proxy;
const Ice::ByteSeq _inParams;
const Ice::Current _current;
const bool _forwardContext;
const Ice::Context _sslContext;
const std::string _override;
std::function<void(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&)> _response;
std::function<void(std::exception_ptr)> _exception;
};
class RequestQueue : public std::enable_shared_from_this<RequestQueue>
{
public:
RequestQueue(std::shared_ptr<RequestQueueThread>, std::shared_ptr<Instance>, std::shared_ptr<Ice::Connection>);
bool addRequest(std::shared_ptr<Request>);
void flushRequests();
void destroy();
void updateObserver(std::shared_ptr<Glacier2::Instrumentation::SessionObserver>);
private:
void flush();
void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const std::shared_ptr<Request>&);
void exception(std::exception_ptr, const std::shared_ptr<Request>&);
void sent(bool, const std::shared_ptr<Request>&);
const std::shared_ptr<RequestQueueThread> _requestQueueThread;
const std::shared_ptr<Instance> _instance;
const std::shared_ptr<Ice::Connection> _connection;
std::deque<std::shared_ptr<Request>> _requests;
bool _pendingSend;
std::shared_ptr<Request> _pendingSendRequest;
bool _destroyed;
std::shared_ptr<Glacier2::Instrumentation::SessionObserver> _observer;
std::mutex _mutex;
};
class RequestQueueThread
{
public:
RequestQueueThread(std::chrono::milliseconds);
~RequestQueueThread();
void flushRequestQueue(std::shared_ptr<RequestQueue>);
void destroy();
private:
void run();
const std::chrono::milliseconds _sleepTime;
bool _destroy;
bool _sleep;
std::vector<std::shared_ptr<RequestQueue>> _queues;
std::mutex _mutex;
std::condition_variable _condVar;
std::thread _thread;
};
}
#endif
|