From b070a1a984e55f30e56116d608f9baa97f14f84b Mon Sep 17 00:00:00 2001 From: Moses Narrow Date: Sat, 31 Aug 2024 11:04:28 -0500 Subject: [PATCH] dmsgcurl use multiple dmsg-discoveries in fallback configuration --- cmd/dmsgcurl/commands/dmsgcurl.go | 274 ++++++++++++------------------ 1 file changed, 113 insertions(+), 161 deletions(-) diff --git a/cmd/dmsgcurl/commands/dmsgcurl.go b/cmd/dmsgcurl/commands/dmsgcurl.go index 69b6a7c0..bf1f7638 100644 --- a/cmd/dmsgcurl/commands/dmsgcurl.go +++ b/cmd/dmsgcurl/commands/dmsgcurl.go @@ -1,4 +1,4 @@ -// Package commands cmd/dmsgcurl/commands/dmsgcurl.go +// Package commands cmd/dmsgcurl/commands package commands import ( @@ -30,10 +30,9 @@ import ( ) var ( - dmsgDisc string - dmsgSessions int - dmsgcurlData string - // dmsgcurlHeader string + dmsgDisc []string + dmsgSessions int + dmsgcurlData string sk cipher.SecKey dmsgcurlLog *logging.Logger dmsgcurlAgent string @@ -45,44 +44,37 @@ var ( ) func init() { - RootCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "c", "", "dmsg discovery url default:\n"+skyenv.DmsgDiscAddr) + RootCmd.Flags().StringSliceVarP(&dmsgDisc, "dmsg-disc", "c", []string{skyenv.DmsgDiscAddr}, "dmsg discovery url(s)") RootCmd.Flags().IntVarP(&dmsgSessions, "sess", "e", 1, "number of dmsg servers to connect to") - RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "fatal", "[ debug | warn | error | fatal | panic | trace | info ]\033[0m") + RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "fatal", "[ debug | warn | error | fatal | panic | trace | info ]") RootCmd.Flags().StringVarP(&dmsgcurlData, "data", "d", "", "dmsghttp POST data") - // RootCmd.Flags().StringVarP(&dmsgcurlHeader, "header", "H", "", "Pass custom header(s) to server") RootCmd.Flags().StringVarP(&dmsgcurlOutput, "out", "o", "", "output filepath") - RootCmd.Flags().BoolVarP(&replace, "replace", "r", false, "replace exist file with new downloaded") + RootCmd.Flags().BoolVarP(&replace, "replace", "r", false, "replace existing file with new downloaded") RootCmd.Flags().IntVarP(&dmsgcurlTries, "try", "t", 1, "download attempts (0 unlimits)") RootCmd.Flags().IntVarP(&dmsgcurlWait, "wait", "w", 0, "time to wait between fetches") RootCmd.Flags().StringVarP(&dmsgcurlAgent, "agent", "a", "dmsgcurl/"+buildinfo.Version(), "identify as `AGENT`") if os.Getenv("DMSGCURL_SK") != "" { sk.Set(os.Getenv("DMSGCURL_SK")) //nolint } - RootCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r") + RootCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified") } -// RootCmd containsa the root dmsgcurl command +// RootCmd contains the root cli command var RootCmd = &cobra.Command{ Use: func() string { return strings.Split(filepath.Base(strings.ReplaceAll(strings.ReplaceAll(fmt.Sprintf("%v", os.Args), "[", ""), "]", "")), " ")[0] }(), - Short: "DMSG curl utility", - Long: ` - ┌┬┐┌┬┐┌─┐┌─┐┌─┐┬ ┬┬─┐┬ - │││││└─┐│ ┬│ │ │├┬┘│ - ─┴┘┴ ┴└─┘└─┘└─┘└─┘┴└─┴─┘ -DMSG curl utility`, + Short: "DMSG curl utility", + Long: `DMSG curl utility`, SilenceErrors: true, SilenceUsage: true, DisableSuggestions: true, DisableFlagsInUseLine: true, Version: buildinfo.Version(), - PreRun: func(cmd *cobra.Command, args []string) { - if dmsgDisc == "" { - dmsgDisc = skyenv.DmsgDiscAddr + RunE: func(_ *cobra.Command, args []string) error { + if len(dmsgDisc) == 0 || dmsgDisc[0] == "" { + dmsgDisc = []string{skyenv.DmsgDiscAddr} } - }, - RunE: func(cmd *cobra.Command, args []string) error { if dmsgcurlLog == nil { dmsgcurlLog = logging.MustGetLogger("dmsgcurl") } @@ -91,144 +83,125 @@ DMSG curl utility`, logging.SetLevel(lvl) } } - ctx, cancel := cmdutil.SignalContext(context.Background(), dmsgcurlLog) defer cancel() - pk, err := sk.PubKey() if err != nil { pk, sk = cipher.GenerateKeyPair() } - - u, err := parseURL(args) + if len(args) == 0 { + return errors.New("no URL(s) provided") + } + if len(args) > 1 { + return errors.New("multiple URLs is not yet supported") + } + parsedURL, err := url.Parse(args[0]) if err != nil { dmsgcurlLog.WithError(err).Fatal("failed to parse provided URL") } if dmsgcurlData != "" { - dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk) - if err != nil { - dmsgcurlLog.WithError(err).Fatal("failed to start dmsg") - } - defer closeDmsg() + return handlePostRequest(ctx, pk, parsedURL) + } + return handleDownload(ctx, pk, parsedURL) + }, +} - httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} +func handlePostRequest(ctx context.Context, pk cipher.PubKey, parsedURL *url.URL) error { + for _, disco := range dmsgDisc { + dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk, disco) + if err != nil { + dmsgcurlLog.WithError(err).Warnf("Failed to start dmsg with discovery %s", disco) + continue + } + defer closeDmsg() - req, err := http.NewRequest(http.MethodPost, u.URL.String(), strings.NewReader(dmsgcurlData)) - if err != nil { - dmsgcurlLog.WithError(err).Fatal("Failed to formulate HTTP request.") - } - req.Header.Set("Content-Type", "text/plain") + httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} + req, err := http.NewRequest(http.MethodPost, parsedURL.String(), strings.NewReader(dmsgcurlData)) + if err != nil { + dmsgcurlLog.WithError(err).Fatal("Failed to formulate HTTP request.") + } + req.Header.Set("Content-Type", "text/plain") - resp, err := httpC.Do(req) - if err != nil { - dmsgcurlLog.WithError(err).Fatal("Failed to execute HTTP request.") - } + resp, err := httpC.Do(req) + if err != nil { + dmsgcurlLog.WithError(err).Warnf("Failed to execute HTTP request with discovery %s", disco) + continue + } + defer closeResponseBody(resp) - defer func() { - if err := resp.Body.Close(); err != nil { - dmsgcurlLog.WithError(err).Fatal("Failed to close response body") - } - }() - respBody, err := io.ReadAll(resp.Body) - if err != nil { - dmsgcurlLog.WithError(err).Fatal("Failed to read respose body.") - } - fmt.Println(string(respBody)) - } else { - file := os.Stdout - if dmsgcurlOutput != "" { - file, err = parseOutputFile(dmsgcurlOutput, replace) - } - if err != nil { - return fmt.Errorf("failed to prepare output file: %w", err) - } - defer func() { - if fErr := file.Close(); fErr != nil { - dmsgcurlLog.WithError(fErr).Warn("Failed to close output file.") - } - if err != nil { - if rErr := os.RemoveAll(file.Name()); rErr != nil { - dmsgcurlLog.WithError(rErr).Warn("Failed to remove output file.") - } - } - }() + respBody, err := io.ReadAll(resp.Body) + if err != nil { + dmsgcurlLog.WithError(err).Fatal("Failed to read response body.") + } + fmt.Println(string(respBody)) + return nil + } + return errors.New("all dmsg discovery addresses failed") +} - dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk) - if err != nil { - return fmt.Errorf("failed to start dmsg: %w", err) - } - defer closeDmsg() +func handleDownload(ctx context.Context, pk cipher.PubKey, parsedURL *url.URL) error { + file, err := prepareOutputFile() + if err != nil { + return fmt.Errorf("failed to prepare output file: %w", err) + } + defer closeAndCleanFile(file, err) + + for _, disco := range dmsgDisc { + dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk, disco) + if err != nil { + dmsgcurlLog.WithError(err).Warnf("Failed to start dmsg with discovery %s", disco) + continue + } + defer closeDmsg() - httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} + httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} - for i := 0; i < dmsgcurlTries; i++ { - if dmsgcurlOutput != "" { - dmsgcurlLog.Debugf("Download attempt %d/%d ...", i, dmsgcurlTries) - if _, err := file.Seek(0, 0); err != nil { - return fmt.Errorf("failed to reset file: %w", err) - } + for i := 0; i < dmsgcurlTries; i++ { + if dmsgcurlOutput != "" { + dmsgcurlLog.Debugf("Download attempt %d/%d ...", i, dmsgcurlTries) + if _, err := file.Seek(0, 0); err != nil { + return fmt.Errorf("failed to reset file: %w", err) } - if err := Download(ctx, dmsgcurlLog, &httpC, file, u.URL.String(), 0); err != nil { - dmsgcurlLog.WithError(err).Error() - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(time.Duration(dmsgcurlWait) * time.Second): - continue - } + } + if err := download(ctx, dmsgcurlLog, &httpC, file, parsedURL.String(), 0); err != nil { + dmsgcurlLog.WithError(err).Error() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Duration(dmsgcurlWait) * time.Second): + continue } - - // download successful. - return nil } - return errors.New("all download attempts failed") - + return nil } - return nil - }, -} - -// URL represents a dmsg http URL. -type URL struct { - dmsg.Addr - url.URL -} - -// Fill fills the internal fields from an URL string. -func (du *URL) fill(str string) error { - u, err := url.Parse(str) - if err != nil { - return err } - if u.Scheme == "" { - return errors.New("URL is missing a scheme") - } + return errors.New("all download attempts failed with all dmsg discovery addresses") +} - if u.Host == "" { - return errors.New("URL is missing a host") +func prepareOutputFile() (*os.File, error) { + if dmsgcurlOutput == "" { + return os.Stdout, nil } - - du.URL = *u - return du.Addr.Set(u.Host) + return parseOutputFile(dmsgcurlOutput, replace) } -func parseURL(args []string) (*URL, error) { - if len(args) == 0 { - return nil, errors.New("no URL(s) provided") +func closeAndCleanFile(file *os.File, err error) { + if fErr := file.Close(); fErr != nil { + dmsgcurlLog.WithError(fErr).Warn("Failed to close output file.") } - - if len(args) > 1 { - return nil, errors.New("multiple URLs is not yet supported") + if err != nil && file != os.Stdout { + if rErr := os.RemoveAll(file.Name()); rErr != nil { + dmsgcurlLog.WithError(rErr).Warn("Failed to remove output file.") + } } +} - var out URL - if err := out.fill(args[0]); err != nil { - return nil, fmt.Errorf("provided URL is invalid: %w", err) +func closeResponseBody(resp *http.Response) { + if err := resp.Body.Close(); err != nil { + dmsgcurlLog.WithError(err).Fatal("Failed to close response body") } - - return &out, nil } func parseOutputFile(output string, replace bool) (*os.File, error) { @@ -252,8 +225,8 @@ func parseOutputFile(output string, replace bool) (*os.File, error) { return nil, os.ErrExist } -func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { - dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, dmsgcurlLog), &dmsg.Config{MinSessions: dmsgSessions}) +func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey, disco string) (dmsgC *dmsg.Client, stop func(), err error) { + dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(disco, &http.Client{}, dmsgcurlLog), &dmsg.Config{MinSessions: dmsgSessions}) go dmsgC.Serve(context.Background()) stop = func() { @@ -275,8 +248,7 @@ func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC * } } -// Download downloads a file from the given URL into 'w'. -func Download(ctx context.Context, log logrus.FieldLogger, httpC *http.Client, w io.Writer, urlStr string, maxSize int64) error { +func download(ctx context.Context, log logrus.FieldLogger, httpC *http.Client, w io.Writer, urlStr string, maxSize int64) error { req, err := http.NewRequest(http.MethodGet, urlStr, nil) if err != nil { log.WithError(err).Fatal("Failed to formulate HTTP request.") @@ -285,20 +257,14 @@ func Download(ctx context.Context, log logrus.FieldLogger, httpC *http.Client, w if err != nil { return fmt.Errorf("failed to connect to HTTP server: %w", err) } - if maxSize > 0 { - if resp.ContentLength > maxSize*1024 { - return fmt.Errorf("requested file size is more than allowed size: %d KB > %d KB", (resp.ContentLength / 1024), maxSize) - } + if maxSize > 0 && resp.ContentLength > maxSize*1024 { + return fmt.Errorf("requested file size is more than allowed size: %d KB > %d KB", (resp.ContentLength / 1024), maxSize) } - n, err := CancellableCopy(ctx, w, resp.Body, resp.ContentLength) + n, err := cancellableCopy(ctx, w, resp.Body, resp.ContentLength) if err != nil { return fmt.Errorf("download failed at %d/%dB: %w", n, resp.ContentLength, err) } - defer func() { - if err := resp.Body.Close(); err != nil { - log.WithError(err).Warn("HTTP Response body closed with non-nil error.") - } - }() + defer closeResponseBody(resp) return nil } @@ -307,38 +273,25 @@ type readerFunc func(p []byte) (n int, err error) func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) } -// CancellableCopy will call the Reader and Writer interface multiple time, in order -// to copy by chunk (avoiding loading the whole file in memory). -func CancellableCopy(ctx context.Context, w io.Writer, body io.ReadCloser, length int64) (int64, error) { - - n, err := io.Copy(io.MultiWriter(w, &ProgressWriter{Total: length}), readerFunc(func(p []byte) (int, error) { - - // golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations +func cancellableCopy(ctx context.Context, w io.Writer, body io.ReadCloser, length int64) (int64, error) { + n, err := io.Copy(io.MultiWriter(w, &progressWriter{Total: length}), readerFunc(func(p []byte) (int, error) { select { - - // if context has been canceled case <-ctx.Done(): - // stop process and propagate "Download Canceled" error return 0, errors.New("Download Canceled") default: - // otherwise just run default io.Reader implementation return body.Read(p) } })) return n, err } -// ProgressWriter prints the progress of a download to stdout. -type ProgressWriter struct { - // atomic requires 64-bit alignment for struct field access +type progressWriter struct { Current int64 Total int64 } -// Write implements io.Writer -func (pw *ProgressWriter) Write(p []byte) (int, error) { +func (pw *progressWriter) Write(p []byte) (int, error) { n := len(p) - current := atomic.AddInt64(&pw.Current, int64(n)) total := atomic.LoadInt64(&pw.Total) pc := fmt.Sprintf("%d%%", current*100/total) @@ -350,11 +303,10 @@ func (pw *ProgressWriter) Write(p []byte) (int, error) { fmt.Print("\n") } } - return n, nil } -// Execute executes root CLI command. +// Execute executes the RootCmd func Execute() { if err := RootCmd.Execute(); err != nil { log.Fatal("Failed to execute command: ", err)