This repository has been archived by the owner on Oct 29, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
reader.go
319 lines (289 loc) · 7.62 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
package warc
import (
"bufio"
"bytes"
"compress/bzip2"
"compress/gzip"
"io"
"io/ioutil"
"strconv"
"github.com/pkg/errors"
)
// Reader parses WARC records from an underlying scanner.
// Create a new reader with NewReader
type Reader struct {
rc io.ReadCloser // raw io.readerCloser
scanner *bufio.Scanner // scanner to pull tokens from
phase scanPhase
bodyLen int64
}
// NewReader creates a new WARC reader from an io.Reader
// Always use NewReader, (instead of manually allocating a reader)
func NewReader(r io.Reader) (*Reader, error) {
rc, err := decompress(r)
if err != nil {
return nil, err
}
rdr := &Reader{
rc: rc,
scanner: bufio.NewScanner(rc),
}
rdr.scanner.Split(rdr.split)
return rdr, nil
}
// Read a record, will return nil, io.EOF to signal
// no more records
func (r *Reader) Read() (Record, error) {
// rec, err := r.readRecord()
// if err == nil {
// fmt.Println(string(rec.(*Resource).Content))
// }
return r.readRecord()
}
// ReadAll Consumes the entire reader, returning a slice of records
func (r *Reader) ReadAll() (records Records, err error) {
for {
record, err := r.Read()
if err == io.EOF {
return records, nil
}
if err != nil {
return nil, err
}
records = append(records, &record)
}
}
// scanphase denotes different "modes" for scanning
type scanPhase int
const (
scanPhaseVersion scanPhase = iota
scanPhaseHeaderKey
scanPhaseHeaderValue
scanPhaseContent
)
func (r *Reader) readRecord() (rec Record, err error) {
var key string
rec = Record{
Headers: map[string]string{},
}
for r.scanner.Scan() {
token := r.scanner.Bytes()
switch r.phase {
case scanPhaseVersion:
rec.Format = recordFormat(string(bytes.TrimSpace(token)))
if rec.Format == RecordFormatUnknown {
return rec, errors.Errorf("Unknown record format: '%s'", string(bytes.TrimSpace(token)))
}
r.phase = scanPhaseHeaderKey
case scanPhaseHeaderKey:
if bytes.Equal(token, crlf) {
r.phase = scanPhaseContent
r.bodyLen = -1
r.checkContentLength(&rec)
rec.Content = bytes.NewBuffer(nil)
if r.bodyLen != -1 {
rec.Content.Grow(int(r.bodyLen))
}
} else {
key = CanonicalKey(string(token))
r.phase = scanPhaseHeaderValue
}
case scanPhaseHeaderValue:
rec.Headers[key] = string(bytes.TrimSpace(token))
if key == FieldNameWARCType {
rec.Type = ParseRecordType(rec.Headers[key])
}
r.phase = scanPhaseHeaderKey
case scanPhaseContent:
by := r.scanner.Bytes()
bytes.NewReader(by).WriteTo(rec.Content)
if len(by) == 0 {
r.phase = scanPhaseVersion
return
} else if r.bodyLen != -1 {
r.bodyLen -= int64(len(by))
}
}
}
if r.scanner.Err() != nil {
return rec, r.scanner.Err()
}
if r.phase != scanPhaseVersion && r.phase != scanPhaseContent {
return rec, io.ErrUnexpectedEOF
}
return rec, io.EOF
}
func (r *Reader) checkContentLength(rec *Record) error {
if rec.Headers[FieldNameWARCSegmentNumber] != "" {
// Segmented content
return nil
}
if rec.Headers[FieldNameContentLength] != "" {
// Non-segmented, have Content-Length => read the whole thing in one block
length, err := strconv.ParseInt(rec.Headers[FieldNameContentLength], 10, 64)
if err != nil {
return errors.Wrap(err, "warc: Invalid Content-Length")
}
r.bodyLen = length
}
return nil
}
func (r *Reader) split(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
switch r.phase {
case scanPhaseVersion:
return splitLine(data, atEOF)
case scanPhaseHeaderKey:
return splitKey(data, atEOF)
case scanPhaseHeaderValue:
return splitValue(data, atEOF)
case scanPhaseContent:
if r.bodyLen != -1 {
return splitFull(data, atEOF, r.bodyLen)
}
fallthrough
default: // default to scanPhaseContent
return splitBlock(data, atEOF)
}
}
var crlf = []byte("\r\n")
var doubleCrlf = []byte("\r\n\r\n")
func splitLine(data []byte, atEOF bool) (advance int, token []byte, err error) {
if bytes.HasPrefix(data, crlf) {
// Found block-end from previous record. Skip.
if bytes.HasPrefix(data, doubleCrlf) {
return len(doubleCrlf), nil, nil
}
return len(crlf), nil, nil
}
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// We have a full newline-terminated line.
return i + 1, dropCR(data[0:i]), nil
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), dropCR(data), nil
}
// Request more data.
return 0, nil, nil
}
func splitKey(data []byte, atEOF bool) (advance int, token []byte, err error) {
if bytes.Index(data, crlf) == 0 {
return len(crlf), crlf, nil
}
if i := bytes.IndexByte(data, ':'); i >= 0 {
return i + 1, data[0:i], nil
}
if atEOF {
return len(data), data, nil
}
return 0, nil, nil
}
func splitValue(data []byte, atEOF bool) (advance int, token []byte, err error) {
// TODO - MULTILINE VALUES
if i := bytes.Index(data, crlf); i == 0 {
// if we hit double clrf return
return len(crlf), nil, nil
} else if i > 0 {
return i + len(crlf), data[0:i], nil
}
if atEOF {
return len(data), data, nil
}
return 0, nil, nil
}
func splitBlock(data []byte, atEOF bool) (advance int, token []byte, err error) {
if i := bytes.Index(data, doubleCrlf); i >= 0 {
return i + len(doubleCrlf), data[0:i], nil
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}
return 0, nil, nil
}
func splitFull(data []byte, atEOF bool, bytesLeft int64) (advance int, token []byte, err error) {
length := int(bytesLeft)
if bytesLeft <= int64(len(data)) {
return length, data[:length], nil
}
if atEOF {
return len(data), data, errors.Errorf("warc: unexpected EOF in record content, got %v bytes (expected %v more)", len(data), bytesLeft)
}
if len(data) > 0 {
return len(data), data, nil
}
return 0, nil, nil
}
// dropCR drops a terminal \r from the data.
func dropCR(data []byte) []byte {
if len(data) > 0 && data[len(data)-1] == '\r' {
return data[0 : len(data)-1]
}
return data
}
// readBlockBody
func readBlockBody(data []byte) ([]byte, error) {
start := bytes.LastIndex(data, crlf)
if start == -1 {
return data, nil
}
r := bytes.NewReader(data[start+len(crlf):])
res, err := decompress(r)
if err != nil {
return nil, err
}
defer res.Close()
return ioutil.ReadAll(res)
}
const (
compressionNone = iota
compressionBZIP
compressionGZIP
)
// guessCompression returns the compression type of a data stream by matching
// the first two bytes with the magic numbers of compression formats.
func guessCompression(b *bufio.Reader) (int, error) {
magic, err := b.Peek(2)
if err != nil {
if err == io.EOF {
err = nil
}
return compressionNone, err
}
switch {
case magic[0] == 0x42 && magic[1] == 0x5a:
return compressionBZIP, nil
case magic[0] == 0x1f && magic[1] == 0x8b:
return compressionGZIP, nil
}
return compressionNone, nil
}
// decompress automatically decompresses data streams and makes sure the result
// obeys the io.ReadCloser interface. This way callers don't need to check
// whether the underlying reader has a Close() function or not, they just call
// defer Close() on the result.
func decompress(r io.Reader) (res io.ReadCloser, err error) {
// Create a buffered reader to peek the stream's magic number.
dataReader := bufio.NewReader(r)
compr, err := guessCompression(dataReader)
if err != nil {
return nil, err
}
switch compr {
case compressionGZIP:
gzipReader, err := gzip.NewReader(dataReader)
if err != nil {
return nil, err
}
res = gzipReader
case compressionBZIP:
bzipReader := bzip2.NewReader(dataReader)
res = ioutil.NopCloser(bzipReader)
case compressionNone:
res = ioutil.NopCloser(dataReader)
}
return res, err
}