Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ndjson semantics to delegated routing endpoint #200

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 104 additions & 55 deletions delegated_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ const (
)

type findFunc func(ctx context.Context, method, source string, req *url.URL, encrypted bool) (int, []byte)
type findStreamFunc func(ctx context.Context, method string, req *url.URL, encrypted bool) (int, chan model.ProviderResult)

func NewDelegatedTranslator(backend findFunc) (http.Handler, error) {
finder := delegatedTranslator{backend}
func NewDelegatedTranslator(backend findFunc, streamingBackend findStreamFunc) (http.Handler, error) {
finder := delegatedTranslator{backend, streamingBackend}
m := http.NewServeMux()
m.HandleFunc("/providers", finder.provide)
m.HandleFunc("/encrypted/providers", finder.provide)
Expand All @@ -34,7 +35,8 @@ func NewDelegatedTranslator(backend findFunc) (http.Handler, error) {
}

type delegatedTranslator struct {
be findFunc
be findFunc
sbe findStreamFunc
}

func (dt *delegatedTranslator) provide(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -81,6 +83,48 @@ func (dt *delegatedTranslator) find(w http.ResponseWriter, r *http.Request, encr

// Translate URL by mapping `/providers/{CID}` to `/cid/{CID}`.
uri := r.URL.JoinPath("../../cid", cidUrlParam)

acc, err := getAccepts(r)
if err != nil {
http.Error(w, "invalid Accept header", http.StatusBadRequest)
return
}

switch {
case acc.ndjson:
rcode, respChan := dt.sbe(r.Context(), findMethodDelegated, uri, encrypted)
if rcode != http.StatusOK {
http.Error(w, "", rcode)
return
}
out := &drResp{}
hasWritten := false
encoder := json.NewEncoder(w)

for rcrd := range respChan {
if !hasWritten {
w.Header().Set("Content-Type", mediaTypeNDJson)
w.Header().Set("Connection", "Keep-Alive")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(200)
hasWritten = true
}
prov := drProvFromResult(rcrd)
// if new
if out.append(prov) {
if err := encoder.Encode(prov); err != nil {
return
}
}
}
if len(out.seenProviders) == 0 {
// no response.
w.WriteHeader(http.StatusNotFound)
}
return
default:
}

rcode, resp := dt.be(r.Context(), http.MethodGet, findMethodDelegated, uri, encrypted)
if rcode != http.StatusOK {
http.Error(w, "", rcode)
Expand All @@ -105,63 +149,14 @@ func (dt *delegatedTranslator) find(w http.ResponseWriter, r *http.Request, encr

res := parsed.MultihashResults[0]

out := drResp{}
out := &drResp{}

// Records returned from IPNI via Delegated Routing don't have ContextID in them. Becuase of that,
// some records that are valid from the IPNI point of view might look like duplicates from the Delegated Routing point of view.
// To make the Delegated Routing output nicer, deduplicate identical records.
uniqueProviders := map[uint32]struct{}{}
appendIfUnique := func(drp *drProvider) {
capacity := len(drp.ID) + len(drp.Schema)
for _, proto := range drp.Protocols {
capacity += len(proto)
}
for _, meta := range drp.Metadata {
capacity += len(meta)
}
drpb := make([]byte, 0, capacity)
drpb = append(drpb, []byte(drp.ID)...)
for _, proto := range drp.Protocols {
drpb = append(drpb, []byte(proto)...)
}
drpb = append(drpb, []byte(drp.Schema)...)
for _, meta := range drp.Metadata {
drpb = append(drpb, meta...)
}
key := crc32.ChecksumIEEE(drpb)
if _, ok := uniqueProviders[key]; ok {
return
}
uniqueProviders[key] = struct{}{}
out.Providers = append(out.Providers, *drp)
}

for _, p := range res.ProviderResults {
md := metadata.Default.New()
err := md.UnmarshalBinary(p.Metadata)
if err != nil {
appendIfUnique(&drProvider{
Schema: peerSchema,
ID: p.Provider.ID,
Addrs: p.Provider.Addrs,
})
} else {
provider := &drProvider{
Schema: peerSchema,
ID: p.Provider.ID,
Addrs: p.Provider.Addrs,
Metadata: make(map[string][]byte),
}

for _, proto := range md.Protocols() {
pl := md.Get(proto)
plb, _ := pl.MarshalBinary()
provider.Protocols = append(provider.Protocols, proto.String())
provider.Metadata[proto.String()] = plb
}

appendIfUnique(provider)
}
out.append(drProvFromResult(p))
}

outBytes, err := json.Marshal(out)
Expand All @@ -174,7 +169,34 @@ func (dt *delegatedTranslator) find(w http.ResponseWriter, r *http.Request, encr
}

type drResp struct {
Providers []drProvider
Providers []drProvider
seenProviders map[uint32]struct{}
}

func (dr *drResp) append(drp *drProvider) bool {
capacity := len(drp.ID) + len(drp.Schema)
for _, proto := range drp.Protocols {
capacity += len(proto)
}
for _, meta := range drp.Metadata {
capacity += len(meta)
}
drpb := make([]byte, 0, capacity)
drpb = append(drpb, []byte(drp.ID)...)
for _, proto := range drp.Protocols {
drpb = append(drpb, []byte(proto)...)
}
drpb = append(drpb, []byte(drp.Schema)...)
for _, meta := range drp.Metadata {
drpb = append(drpb, meta...)
}
key := crc32.ChecksumIEEE(drpb)
if _, ok := dr.seenProviders[key]; ok {
return false
}
dr.seenProviders[key] = struct{}{}
dr.Providers = append(dr.Providers, *drp)
return true
}

type drProvider struct {
Expand All @@ -185,6 +207,33 @@ type drProvider struct {
Metadata map[string][]byte
}

func drProvFromResult(p model.ProviderResult) *drProvider {
md := metadata.Default.New()
err := md.UnmarshalBinary(p.Metadata)
if err != nil {
return &drProvider{
Schema: peerSchema,
ID: p.Provider.ID,
Addrs: p.Provider.Addrs,
}
} else {
provider := &drProvider{
Schema: peerSchema,
ID: p.Provider.ID,
Addrs: p.Provider.Addrs,
Metadata: make(map[string][]byte),
}

for _, proto := range md.Protocols() {
pl := md.Get(proto)
plb, _ := pl.MarshalBinary()
provider.Protocols = append(provider.Protocols, proto.String())
provider.Metadata[proto.String()] = plb
}
return provider
}
}

func (dp drProvider) MarshalJSON() ([]byte, error) {
m := map[string]interface{}{}
if dp.Metadata != nil {
Expand Down
Loading
Loading