summaryrefslogtreecommitdiff
path: root/cpp/src/Glacier2/RequestQueue.h
blob: 631d90240c9240cf13f3ed197a2ff24cd8531502 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//

#ifndef REQUEST_H
#define REQUEST_H

#include <Ice/Ice.h>

#include <Glacier2/Instrumentation.h>

#include <deque>

namespace Glacier2
{

class Instance;
class Request;
class RequestQueueThread;

class Request
{
public:

    Request(std::shared_ptr<Ice::ObjectPrx>,
            const std::pair<const Ice::Byte*, const Ice::Byte*>&,
            const Ice::Current&,
            bool,
            const Ice::Context&,
            std::function<void(bool, std::pair<const Ice::Byte*, const Ice::Byte*>)>,
            std::function<void(std::exception_ptr)>);

    void invoke(std::function<void(bool, std::pair<const Ice::Byte*, const Ice::Byte*>)>&&,
                std::function<void(std::exception_ptr)>&&,
                std::function<void(bool)>&& = nullptr);
    bool override(const std::shared_ptr<Request>&) const;
    bool hasOverride() const { return !_override.empty(); }

private:

    friend class RequestQueue;
    void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&);
    void exception(std::exception_ptr);
    void queued();

    const std::shared_ptr<Ice::ObjectPrx> _proxy;
    const Ice::ByteSeq _inParams;
    const Ice::Current _current;
    const bool _forwardContext;
    const Ice::Context _sslContext;
    const std::string _override;
    std::function<void(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&)> _response;
    std::function<void(std::exception_ptr)> _exception;
};

class RequestQueue : public std::enable_shared_from_this<RequestQueue>
{
public:

    RequestQueue(std::shared_ptr<RequestQueueThread>, std::shared_ptr<Instance>, std::shared_ptr<Ice::Connection>);

    bool addRequest(std::shared_ptr<Request>);
    void flushRequests();

    void destroy();

    void updateObserver(std::shared_ptr<Glacier2::Instrumentation::SessionObserver>);

private:

    void flush();

    void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const std::shared_ptr<Request>&);
    void exception(std::exception_ptr, const std::shared_ptr<Request>&);
    void sent(bool, const std::shared_ptr<Request>&);

    const std::shared_ptr<RequestQueueThread> _requestQueueThread;
    const std::shared_ptr<Instance> _instance;
    const std::shared_ptr<Ice::Connection> _connection;

    std::deque<std::shared_ptr<Request>> _requests;
    bool _pendingSend;
    std::shared_ptr<Request> _pendingSendRequest;
    bool _destroyed;
    std::shared_ptr<Glacier2::Instrumentation::SessionObserver> _observer;

    std::mutex _mutex;
};

class RequestQueueThread
{
public:

    RequestQueueThread(std::chrono::milliseconds);
    ~RequestQueueThread();

    void flushRequestQueue(std::shared_ptr<RequestQueue>);
    void destroy();

private:
    void run();

    const std::chrono::milliseconds _sleepTime;
    bool _destroy;
    bool _sleep;

    std::vector<std::shared_ptr<RequestQueue>> _queues;

    std::mutex _mutex;
    std::condition_variable _condVar;
    std::thread _thread;
};

}

#endif