summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/FileCache.cpp
blob: f14e1b303492715cfcfe0aa836db231a286c38c4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
// **********************************************************************
//
// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
//
// This copy of Ice is licensed to you under the terms described in the
// ICE_LICENSE file included in this distribution.
//
// **********************************************************************

#include <Ice/Communicator.h>
#include <Ice/Properties.h>

#include <IceUtil/FileUtil.h>

#include <IceGrid/FileCache.h>
#include <IceGrid/Exception.h>

#include <deque>

using namespace std;
using namespace IceGrid;

FileCache::FileCache(const Ice::CommunicatorPtr& com) : 
    _messageSizeMax(com->getProperties()->getPropertyAsIntWithDefault("Ice.MessageSizeMax", 1024) * 1024 - 256)
{
}

Ice::Long
FileCache::getOffsetFromEnd(const string& file, int originalCount)
{
    IceUtilInternal::ifstream is(file); // file is a UTF-8 string
    if(is.fail())
    {
        throw FileNotAvailableException("failed to open file `" + file + "'");
    }

    if(originalCount < 0)
    {
        return 0;
    }

    is.seekg(0, ios::end);
    streampos endOfFile = is.tellg();
    if(originalCount == 0)
    {
        return endOfFile;
    }

    streamoff blockSize = 16 * 1024; // Start reading a block of 16K from the end of the file.
    streampos lastBlockOffset = endOfFile;
    int totalCount = 0;
    string line;
    deque<pair<streampos, string> > lines;
    do
    {
        lines.clear();

        //
        // Move the current position of the stream to the new block to
        // read.
        //
        is.clear();
        if(lastBlockOffset - blockSize > streamoff(0))
        {
            is.seekg(lastBlockOffset - blockSize);
            getline(is, line); // Ignore the first line as it's most likely not complete.
        }
        else
        {
            is.seekg(0, ios::beg); // We've reach the begining of the file.
        }
        
        //
        // Read the block and count the number of lines in the block
        // If we found the "first last N lines", we start throwing out
        // the lines read at the begining of the file.
        //
        int count = originalCount - totalCount; // Number of lines left to find.
        while(is.good() && is.tellg() <= streamoff(lastBlockOffset))
        {
            streampos beg = is.tellg();
            getline(is, line);
            if(is.eof() && line.empty()) // Don't count the last line if it's empty.
            {
                continue;
            }

            lines.push_back(make_pair(beg, line));
            ++totalCount;
            if(lines.size() == static_cast<unsigned int>(count + 1))
            {
                --totalCount;
                lines.pop_front();
            }
        }

        if(lastBlockOffset - blockSize < streamoff(0))
        {
            break; // We're done if the block started at the begining of the file.
        }
        else if(totalCount < originalCount)
        {
            //
            // Otherwise, it we still didn't find the required number of lines, 
            // read another block of text before this block.
            //
            lastBlockOffset -= blockSize; // Position of the block we just read.
            blockSize *= 2; // Read a bigger block.
        }
    }
    while(totalCount < originalCount && !is.bad());

    if(is.bad())
    {
        throw FileNotAvailableException("unrecoverable error occured while reading file `" + file + "'");
    }
 
    if(lines.empty())
    {
        return 0;
    }
    else
    {
        return lines[0].first;
    }
}

bool
FileCache::read(const string& file, Ice::Long offset, int size, Ice::Long& newOffset, Ice::StringSeq& lines)
{
    assert(size > 0);

    if(size > _messageSizeMax)
    {
        size = _messageSizeMax;
    }

    if(size <= 5)
    {
        throw FileNotAvailableException("maximum bytes per read request is too low");
    }

    IceUtilInternal::ifstream is(file); // file is a UTF-8 string
    if(is.fail())
    {
        throw FileNotAvailableException("failed to open file `" + file + "'");
    }

    //
    // Check if the requested offset is past the end of the file, if
    // that's the case return an empty sequence of lines and indicate
    // the EOF.
    //
    is.seekg(0, ios::end);
    if(offset >= is.tellg())
    {
        newOffset = is.tellg();
        lines = Ice::StringSeq();
        return true;
    }

    //
    // Read lines from the file until we read enough or reached EOF.
    // 
    newOffset = offset;
    lines = Ice::StringSeq();
    is.seekg(static_cast<streamoff>(offset), ios::beg);
    int totalSize = 0;
    string line;

    for(int i = 0; is.good(); ++i)
    {
        getline(is, line);

        int lineSize = static_cast<int>(line.size()) + 5; // 5 bytes for the encoding of the string size (worst case)
        if(lineSize + totalSize > size)
        {
            if(totalSize + 5 < size)
            {
                // There's some room left for a part of the string, return a partial string
                line = line.substr(0, size - totalSize - 5);
                lines.push_back(line);
                newOffset += line.size();
            }
            else
            {
                lines.push_back("");
            }
            return false; // We didn't reach the end of file, we've just reached the size limit!
        }

        totalSize += lineSize;
        lines.push_back(line);

        //
        // If there was a partial read update the offset using the current line size,
        // otherwise we have read a new complete line and we can use tellg to update
        // the offset.
        //
        if(!is.good())
        {
            newOffset += line.size();
        }
        else
        {
            newOffset = is.tellg();
        }
    }

    if(is.bad())
    {
        throw FileNotAvailableException("unrecoverable error occured while reading file `" + file + "'");
    }

    return is.eof();
}