-
Notifications
You must be signed in to change notification settings - Fork 18
/
each.go
138 lines (115 loc) · 2.65 KB
/
each.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package xlog
import (
"context"
"reflect"
"regexp"
"runtime"
"sync"
"golang.org/x/sync/errgroup"
)
// a List of directories that should be ignored by directory walking function.
// for example the versioning extension can register `.versions` directory to be
// ignored
var ignoredPaths = []*regexp.Regexp{
regexp.MustCompile(`^\.`), // Ignore any hidden directory
}
// IgnorePath Register a pattern to be ignored when walking directories.
func IgnorePath(r *regexp.Regexp) {
ignoredPaths = append(ignoredPaths, r)
}
// IsIgnoredPath checks if a file path should be ignored according to the list
// of ignored paths. page source implementations can use it to ignore files from
// their sources
func IsIgnoredPath(d string) bool {
for _, v := range ignoredPaths {
if v.MatchString(d) {
return true
}
}
return false
}
var pages []Page
func Pages(ctx context.Context) []Page {
if pages == nil {
populatePagesCache(ctx)
}
return pages[:]
}
// EachPage iterates on all available pages. many extensions
// uses it to get all pages and maybe parse them and extract needed information
func EachPage(ctx context.Context, f func(Page)) {
if pages == nil {
populatePagesCache(ctx)
}
currentPages := pages
for _, p := range currentPages {
select {
case <-ctx.Done():
return
default:
f(p)
}
}
}
var concurrency = runtime.NumCPU() * 4
// MapPage Similar to EachPage but iterates concurrently and accumulates
// returns in a slice
func MapPage[T any](ctx context.Context, f func(Page) T) []T {
if pages == nil {
populatePagesCache(ctx)
}
grp, ctx := errgroup.WithContext(ctx)
grp.SetLimit(concurrency)
currentPages := pages
output := make([]T, 0, len(currentPages))
var outputLck sync.Mutex
for _, p := range currentPages {
select {
case <-ctx.Done():
break
default:
grp.Go(func() (err error) {
val := f(p)
if isNil(val) {
return
}
outputLck.Lock()
output = append(output, val)
outputLck.Unlock()
return
})
}
}
grp.Wait()
return output
}
// From https://stackoverflow.com/a/77341451/22401486
func isNil[T any](t T) bool {
v := reflect.ValueOf(t)
kind := v.Kind()
// Must be one of these types to be nillable
return !v.IsValid() || (kind == reflect.Ptr ||
kind == reflect.Interface ||
kind == reflect.Slice ||
kind == reflect.Map ||
kind == reflect.Chan ||
kind == reflect.Func) &&
v.IsNil()
}
func clearPagesCache(p Page) (err error) {
pages = nil
return nil
}
func populatePagesCache(ctx context.Context) {
pages = []Page{}
for _, s := range sources {
select {
case <-ctx.Done():
return
default:
s.Each(ctx, func(p Page) {
pages = append(pages, p)
})
}
}
}