-
Notifications
You must be signed in to change notification settings - Fork 19
/
cache.go
133 lines (104 loc) · 2.67 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package wire
import (
"context"
"fmt"
"sync"
"github.com/jeroenrinzema/psql-wire/pkg/buffer"
"github.com/lib/pq/oid"
)
type Statement struct {
fn PreparedStatementFn
parameters []oid.Oid
columns Columns
}
func DefaultStatementCacheFn() StatementCache {
return &DefaultStatementCache{}
}
type DefaultStatementCache struct {
statements map[string]*Statement
mu sync.RWMutex
}
// Set attempts to bind the given statement to the given name. Any
// previously defined statement is overridden.
func (cache *DefaultStatementCache) Set(ctx context.Context, name string, stmt *PreparedStatement) error {
cache.mu.Lock()
defer cache.mu.Unlock()
if cache.statements == nil {
cache.statements = map[string]*Statement{}
}
cache.statements[name] = &Statement{
fn: stmt.fn,
parameters: stmt.parameters,
columns: stmt.columns,
}
return nil
}
// Get attempts to get the prepared statement for the given name. An error
// is returned when no statement has been found.
func (cache *DefaultStatementCache) Get(ctx context.Context, name string) (*Statement, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
if cache.statements == nil {
return nil, nil
}
stmt, has := cache.statements[name]
if !has {
return nil, nil
}
return stmt, nil
}
type Portal struct {
statement *Statement
parameters []Parameter
formats []FormatCode
}
func DefaultPortalCacheFn() PortalCache {
return &DefaultPortalCache{}
}
type DefaultPortalCache struct {
portals map[string]*Portal
mu sync.RWMutex
}
func (cache *DefaultPortalCache) Bind(ctx context.Context, name string, stmt *Statement, parameters []Parameter, formats []FormatCode) error {
cache.mu.Lock()
defer cache.mu.Unlock()
if cache.portals == nil {
cache.portals = map[string]*Portal{}
}
cache.portals[name] = &Portal{
statement: stmt,
parameters: parameters,
formats: formats,
}
return nil
}
func (cache *DefaultPortalCache) Get(ctx context.Context, name string) (*Portal, error) {
cache.mu.Lock()
defer cache.mu.Unlock()
if cache.portals == nil {
return nil, nil
}
portal, has := cache.portals[name]
if !has {
return nil, nil
}
return portal, nil
}
func (cache *DefaultPortalCache) Execute(ctx context.Context, name string, reader *buffer.Reader, writer *buffer.Writer) (err error) {
defer func() {
r := recover()
if r != nil {
err = fmt.Errorf("unexpected panic: %s", r)
}
}()
cache.mu.Lock()
defer cache.mu.Unlock()
if cache.portals == nil {
return nil
}
portal, has := cache.portals[name]
if !has {
return nil
}
return portal.statement.fn(ctx, NewDataWriter(ctx, portal.statement.columns, portal.formats, reader, writer), portal.parameters)
}