forked from maestre3d/dynamoql-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader.go
105 lines (96 loc) · 2.69 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package dynamoql
import (
"context"
"errors"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// ErrReaderEOF reader has reached the end of the file.
var ErrReaderEOF = errors.New("dynamoql: Reader has reached end of file")
// QueryReader iterates for each item stored in an Amazon DynamoDB table using the Query API.
//
// Uses a pre-fetching strategy, loading chunks of data (specified by the user, default is 10 items per-chunk) into
// an internal buffer before actual item iteration.
//
// When a reader reaches last item from the local data chunk, it will automatically fetch the next chunk until
// no more chunks are left (if iteration is not stopped in the meanwhile).
//
// Some example for using QueryReader:
//
// bills := make([]Bill, 0, 10)
// for r.Next() {
// item, err := r.GetItem(ctx)
// if err != nil {
// break
// }
//
// bill := Bill{}
// if err = bill.UnmarshalDynamoDB(item); err != nil {
// break
// }
//
// bills = append(bills, bill)
// if r.Count() >= 10 {
// break
// }
// }
type QueryReader struct {
paginator *QueryPaginator
buf *ItemBuffer
hasNext bool
readPivot int
itemCount int
}
// NewQueryReader allocates a QueryReader with required internal components. Returns nil if a nil
// dynamodb.QueryInput is passed.
func NewQueryReader(chunkSize int32, c *dynamodb.Client, q dynamodb.QueryInput) *QueryReader {
return &QueryReader{
paginator: NewQueryPaginator(chunkSize, c, q),
buf: NewItemBuffer(int(chunkSize)),
readPivot: 0,
hasNext: true,
}
}
// Next indicates if there is another item to get.
func (q *QueryReader) Next() bool {
return q.hasNext
}
// Loads chunks of data into the buffer.
//
// Uses a QueryPaginator as underlying item fetching mechanism.
func (q *QueryReader) read(ctx context.Context) error {
if q.buf.IsFull() {
q.buf.Reset()
q.readPivot = 0
}
for q.paginator.Next() {
out, err := q.paginator.GetPage(ctx)
if err != nil {
return err
} else if len(out.Items) == 0 {
return ErrReaderEOF
}
q.buf.WriteItems(out.Items)
if int(q.paginator.Count()) >= q.buf.Cap() {
break
}
}
return nil
}
// Count returns the count of each item retrieved by a QueryReader instance.
func (q *QueryReader) Count() int {
return q.itemCount
}
// GetItem retrieves an Item from an Amazon DynamoDB table.
func (q *QueryReader) GetItem(ctx context.Context) (map[string]types.AttributeValue, error) {
if q.buf.Len() == 0 || q.readPivot > q.buf.Len()-1 {
if err := q.read(ctx); err != nil {
return nil, err
}
}
item := q.buf.ItemAt(q.readPivot)
q.readPivot++
q.itemCount++
q.hasNext = q.buf.PeekAt(q.readPivot) || q.paginator.lastEvalKey != nil
return item, nil
}