summaryrefslogtreecommitdiff
path: root/js/src/Ice/BatchRequestQueue.js
blob: d763977352491307b29a639a95d0be80c3ff19d6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//

const Ice = require("../Ice/ModuleRegistry").Ice;
Ice._ModuleRegistry.require(module,
    [
        "../Ice/Stream",
        "../Ice/Debug",
        "../Ice/Protocol"
    ]);

const OutputStream = Ice.OutputStream;
const Debug = Ice.Debug;
const Protocol = Ice.Protocol;

const udpOverhead = 20 + 8;

class BatchRequestQueue
{
    constructor(instance, datagram)
    {
        this._batchStreamInUse = false;
        this._batchRequestNum = 0;
        this._batchStream = new OutputStream(instance, Protocol.currentProtocolEncoding);
        this._batchStream.writeBlob(Protocol.requestBatchHdr);
        this._batchMarker = this._batchStream.size;
        this._exception = null;

        this._maxSize = instance.batchAutoFlushSize();
        if(this._maxSize > 0 && datagram)
        {
            const udpSndSize = instance.initializationData().properties.getPropertyAsIntWithDefault(
                "Ice.UDP.SndSize", 65535 - udpOverhead);
            if(udpSndSize < this._maxSize)
            {
                this._maxSize = udpSndSize;
            }
        }
    }

    prepareBatchRequest(os)
    {
        if(this._exception)
        {
            throw this._exception;
        }
        this._batchStream.swap(os);
    }

    finishBatchRequest(os, proxy, operation)
    {
        //
        // No need for synchronization, no other threads are supposed
        // to modify the queue since we set this._batchStreamInUse to true.
        //
        this._batchStream.swap(os);

        try
        {
            if(this._maxSize > 0 && this._batchStream.size >= this._maxSize)
            {
                proxy.ice_flushBatchRequests(); // Auto flush
            }

            Debug.assert(this._batchMarker < this._batchStream.size);
            this._batchMarker = this._batchStream.size;
            ++this._batchRequestNum;
        }
        finally
        {
            this._batchStream.resize(this._batchMarker);
        }
    }

    abortBatchRequest(os)
    {
        this._batchStream.swap(os);
        this._batchStream.resize(this._batchMarker);
    }

    swap(os)
    {
        if(this._batchRequestNum === 0)
        {
            return 0;
        }

        let lastRequest = null;
        if(this._batchMarker < this._batchStream.size)
        {
            const length = this._batchStream.size - this._batchMarker;
            this._batchStream.pos = this._batchMarker;
            lastRequest = this._batchStream.buffer.getArray(length);
            this._batchStream.resize(this._batchMarker);
        }

        const requestNum = this._batchRequestNum;
        this._batchStream.swap(os);

        //
        // Reset the batch.
        //
        this._batchRequestNum = 0;
        this._batchStream.writeBlob(Protocol.requestBatchHdr);
        this._batchMarker = this._batchStream.size;
        if(lastRequest !== null)
        {
            this._batchStream.writeBlob(lastRequest);
        }
        return requestNum;
    }

    destroy(ex)
    {
        this._exception = ex;
    }

    isEmpty()
    {
        return this._batchStream.size === Protocol.requestBatchHdr.length;
    }
}

Ice.BatchRequestQueue = BatchRequestQueue;
module.exports.Ice = Ice;