summaryrefslogtreecommitdiff
path: root/js/src/Ice/BatchRequestQueue.js
blob: 6b33a1d6b8115d3fd4808e7ee627acb8d8ad0081 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// **********************************************************************
//
// Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
//
// This copy of Ice is licensed to you under the terms described in the
// ICE_LICENSE file included in this distribution.
//
// **********************************************************************

var Ice = require("../Ice/ModuleRegistry").Ice;
Ice.__M.require(module,
    [
        "../Ice/Class",
        "../Ice/BasicStream",
        "../Ice/Debug",
        "../Ice/ExUtil",
        "../Ice/Protocol",
    ]);

var BasicStream = Ice.BasicStream;
var Debug = Ice.Debug;
var ExUtil = Ice.ExUtil;
var Class = Ice.Class;
var Protocol = Ice.Protocol;

var udpOverhead = 20 + 8;

var BatchRequestQueue = Class({
    __init__: function(instance, datagram)
    {
        this._batchStreamInUse = false;
        this._batchRequestNum = 0;
        this._batchStream = new BasicStream(instance, Protocol.currentProtocolEncoding);
        this._batchStream.writeBlob(Protocol.requestBatchHdr);
        this._batchMarker = this._batchStream.size;
        this._exception = null;

        this._maxSize = instance.batchAutoFlushSize();
        if(this._maxSize > 0 && datagram)
        {
            var props = instance.initializationData().properties;
            var udpSndSize = props.getPropertyAsIntWithDefault("Ice.UDP.SndSize", 65535 - udpOverhead);
            if(udpSndSize < this._maxSize)
            {
                this._maxSize = udpSndSize;
            }
        }
    },
    prepareBatchRequest: function(os)
    {
        if(this._exception)
        {
            throw this._exception;
        }
        this._batchStream.swap(os);
    },
    finishBatchRequest: function(os, proxy, operation)
    {
        //
        // No need for synchronization, no other threads are supposed
        // to modify the queue since we set this._batchStreamInUse to true.
        //
        this._batchStream.swap(os);

        try
        {
            if(this._maxSize > 0 && this._batchStream.size >= this._maxSize)
            {
                proxy.ice_flushBatchRequests(); // Auto flush
            }

            Debug.assert(this._batchMarker < this._batchStream.size);
            this._batchMarker = this._batchStream.size;
            ++this._batchRequestNum;
        }
        finally
        {
            this._batchStream.resize(this._batchMarker);
        }
    },
    abortBatchRequest: function(os)
    {
        this._batchStream.swap(os);
        this._batchStream.resize(this._batchMarker);
    },
    swap: function(os)
    {
        if(this._batchRequestNum === 0)
        {
            return 0;
        }

        var lastRequest = null;
        if(this._batchMarker < this._batchStream.size)
        {
            var length = this._batchStream.size - this._batchMarker;
            this._batchStream.pos = this._batchMarker;
            lastRequest = this._batchStream.buffer.getArray(length);
            this._batchStream.resize(this._batchMarker);
        }

        var requestNum = this._batchRequestNum;
        this._batchStream.swap(os);

        //
        // Reset the batch.
        //
        this._batchRequestNum = 0;
        this._batchStream.writeBlob(Protocol.requestBatchHdr);
        this._batchMarker = this._batchStream.size;
        if(lastRequest !== null)
        {
            this._batchStream.writeBlob(lastRequest);
        }
        return requestNum;
    },
    destroy: function(ex)
    {
        this._exception = ex;
    },
    isEmpty: function()
    {
        return this._batchStream.size === Protocol.requestBatchHdr.length;
    }
});

Ice.BatchRequestQueue = BatchRequestQueue;
module.exports.Ice = Ice;