Skip to content

Commit

Permalink
Update existing registry errors and add more detail
Browse files Browse the repository at this point in the history
  • Loading branch information
phillebaba committed May 16, 2024
1 parent 6633882 commit 71d3c55
Showing 1 changed file with 31 additions and 27 deletions.
58 changes: 31 additions & 27 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package registry

import (
"context"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -142,6 +143,7 @@ func (r *Registry) handle(rw mux.ResponseWriter, req *http.Request) {

if req.URL.Path == "/healthz" && req.Method == http.MethodGet {
r.readyHandler(rw, req)
handler = "ready"
return
}
if strings.HasPrefix(req.URL.Path, "/v2") && (req.Method == http.MethodGet || req.Method == http.MethodHead) {
Expand All @@ -151,10 +153,10 @@ func (r *Registry) handle(rw mux.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusNotFound)
}

func (r *Registry) readyHandler(rw mux.ResponseWriter, req *http.Request) {
func (r *Registry) readyHandler(rw mux.ResponseWriter, _ *http.Request) {
ok, err := r.router.Ready()
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
rw.WriteError(http.StatusInternalServerError, fmt.Errorf("could not determine router readiness: %w", err))
return
}
if !ok {
Expand All @@ -166,15 +168,16 @@ func (r *Registry) readyHandler(rw mux.ResponseWriter, req *http.Request) {
func (r *Registry) registryHandler(rw mux.ResponseWriter, req *http.Request) string {
// Quickly return 200 for /v2 to indicate that registry supports v2.
if path.Clean(req.URL.Path) == "/v2" {
return "registry"
rw.WriteHeader(http.StatusOK)
return "v2"
}

// Parse out path components from request.
originalRegistry := req.URL.Query().Get("ns")
ref, err := parsePathComponents(originalRegistry, req.URL.Path)
if err != nil {
rw.WriteError(http.StatusNotFound, err)
return ""
rw.WriteError(http.StatusNotFound, fmt.Errorf("could not parse path according to OCI distribution spec: %w", err))
return "registry"
}

// Request with mirror header are proxied.
Expand All @@ -193,11 +196,10 @@ func (r *Registry) registryHandler(rw mux.ResponseWriter, req *http.Request) str
case referenceKindBlob:
r.handleBlob(rw, req, ref)
return "blob"
default:
rw.WriteError(http.StatusNotFound, fmt.Errorf("unknown reference kind %s", ref.kind))
return "registry"
}

// If nothing matches return 404.
rw.WriteHeader(http.StatusNotFound)
return ""
}

func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref reference) {
Expand Down Expand Up @@ -237,23 +239,31 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re
resolveCtx = logr.NewContext(resolveCtx, log)
peerCh, err := r.router.Resolve(resolveCtx, key, isExternal, r.resolveRetries)
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
rw.WriteError(http.StatusInternalServerError, fmt.Errorf("error occurred when attempting to resolve mirrors: %w", err))
return
}
// TODO: Refactor context cancel and mirror channel closing

mirrorAttempts := 0
for {
select {
// TODO: Refactor context cancel and mirror channel closing
case <-resolveCtx.Done():
// Request has been closed by server or client. No use continuing.
rw.WriteError(http.StatusNotFound, fmt.Errorf("request closed for key: %s", key))
rw.WriteError(http.StatusNotFound, fmt.Errorf("mirroring for image component %s has been cancelled: %w", key, resolveCtx.Err()))
return
case ipAddr, ok := <-peerCh:
// Channel closed means no more mirrors will be received and max retries has been reached.
if !ok {
rw.WriteError(http.StatusNotFound, fmt.Errorf("mirror resolve retries exhausted for key: %s", key))
err = fmt.Errorf("mirror with image component %s could not be found", key)
if mirrorAttempts > 0 {
err = errors.Join(err, fmt.Errorf("requests to %d mirrors failed, all attempts have been exhausted or timeout has been reached", mirrorAttempts))
}
rw.WriteError(http.StatusNotFound, err)
return
}

mirrorAttempts++

// Modify response returns and error on non 200 status code and NOP error handler skips response writing.
// If proxy fails no response is written and it is tried again against a different mirror.
// If the response writer has been written to it means that the request was properly proxied.
Expand All @@ -269,13 +279,11 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re
proxy := httputil.NewSingleHostReverseProxy(u)
proxy.Transport = r.transport
proxy.ErrorHandler = func(_ http.ResponseWriter, _ *http.Request, err error) {
log.Error(err, "proxy failed attempting next")
log.Error(err, "request to mirror failed", "attempt", mirrorAttempts)
}
proxy.ModifyResponse = func(resp *http.Response) error {
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("expected mirror to respond with 200 OK but received: %s", resp.Status)
log.Error(err, "mirror failed attempting next")
return err
return fmt.Errorf("expected mirror to respond with 200 OK but received: %s", resp.Status)
}
succeeded = true
return nil
Expand All @@ -295,13 +303,13 @@ func (r *Registry) handleManifest(rw mux.ResponseWriter, req *http.Request, ref
var err error
ref.dgst, err = r.ociClient.Resolve(req.Context(), ref.name)
if err != nil {
rw.WriteError(http.StatusNotFound, err)
rw.WriteError(http.StatusNotFound, fmt.Errorf("could not get digest for image tag %s: %w", ref.name, err))
return
}
}
b, mediaType, err := r.ociClient.GetManifest(req.Context(), ref.dgst)
if err != nil {
rw.WriteError(http.StatusNotFound, err)
rw.WriteError(http.StatusNotFound, fmt.Errorf("could not get manifest content for digest %s: %w", ref.dgst.String(), err))
return
}
rw.Header().Set("Content-Type", mediaType)
Expand All @@ -312,19 +320,15 @@ func (r *Registry) handleManifest(rw mux.ResponseWriter, req *http.Request, ref
}
_, err = rw.Write(b)
if err != nil {
rw.WriteError(http.StatusNotFound, err)
r.log.Error(err, "error occurred when writing manifest")
return
}
}

func (r *Registry) handleBlob(rw mux.ResponseWriter, req *http.Request, ref reference) {
if ref.dgst == "" {
rw.WriteHeader(http.StatusNotFound)
return
}
size, err := r.ociClient.Size(req.Context(), ref.dgst)
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
rw.WriteError(http.StatusInternalServerError, fmt.Errorf("could not determine size of blob with digest %s: %w", ref.dgst.String(), err))
return
}
rw.Header().Set("Content-Length", strconv.FormatInt(size, 10))
Expand All @@ -338,13 +342,13 @@ func (r *Registry) handleBlob(rw mux.ResponseWriter, req *http.Request, ref refe
}
rc, err := r.ociClient.GetBlob(req.Context(), ref.dgst)
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
rw.WriteError(http.StatusInternalServerError, fmt.Errorf("could not get reader for blob with digest %s: %w", ref.dgst.String(), err))
return
}
defer rc.Close()
_, err = io.Copy(w, rc)
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
r.log.Error(err, "error occurred when copying blob")
return
}
}
Expand Down

0 comments on commit 71d3c55

Please sign in to comment.