From 685bb6bd7c78837faae508ba984b5db8950a190a Mon Sep 17 00:00:00 2001 From: Julius Hinze Date: Wed, 16 Oct 2024 17:04:54 +0200 Subject: [PATCH] Limit HTTP server's concurrency using semaphore. Signed-off-by: Julius Hinze --- docs/thick-plugin.md | 4 +- go.mod | 1 + go.sum | 2 + pkg/server/server.go | 24 +++- pkg/server/thick_cni_test.go | 3 +- pkg/server/types.go | 1 + vendor/golang.org/x/sync/LICENSE | 27 ++++ vendor/golang.org/x/sync/PATENTS | 22 +++ .../golang.org/x/sync/semaphore/semaphore.go | 136 ++++++++++++++++++ vendor/modules.txt | 3 + 10 files changed, 217 insertions(+), 6 deletions(-) create mode 100644 vendor/golang.org/x/sync/LICENSE create mode 100644 vendor/golang.org/x/sync/PATENTS create mode 100644 vendor/golang.org/x/sync/semaphore/semaphore.go diff --git a/docs/thick-plugin.md b/docs/thick-plugin.md index a8c9eae47..6f739b2ff 100644 --- a/docs/thick-plugin.md +++ b/docs/thick-plugin.md @@ -72,6 +72,7 @@ is provided. - `"logLevel"`: the logging level for the multus daemon logs. - `"logToStderr"`: enable this to have the daemon multus logs echoed to stderr as well. By default, it is disabled. +- `concurrentExecs`: integer that, if specified, defines the amount of parallel chroot plugin executions (optional). In addition, you can add any configuration which is in [configuration reference](https://github.com/k8snetworkplumbingwg/multus-cni/blob/master/docs/configuration.md#multus-cni-configuration-reference). Server configuration override multus CNI configuration (e.g. `/etc/cni/net.d/00-multus.conf`) @@ -89,7 +90,8 @@ Below you can see an example of the daemon configuration: "cniVersion": "0.3.1", "cniConfigDir": "/host/etc/cni/net.d", "multusConfigFile": "auto", - "multusAutoconfigDir": "/host/etc/cni/net.d" + "multusAutoconfigDir": "/host/etc/cni/net.d", + "concurrentExecs": 10 } ``` diff --git a/go.mod b/go.mod index bb29add3b..b67b02c09 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 golang.org/x/net v0.23.0 + golang.org/x/sync v0.4.0 golang.org/x/sys v0.18.0 google.golang.org/grpc v1.58.3 gopkg.in/natefinch/lumberjack.v2 v2.0.0 diff --git a/go.sum b/go.sum index 8b1f10734..a9dc0b7e2 100644 --- a/go.sum +++ b/go.sum @@ -148,6 +148,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/server/server.go b/pkg/server/server.go index d469af23a..a18d90dd8 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -29,6 +29,7 @@ import ( "github.com/containernetworking/cni/pkg/skel" cnitypes "github.com/containernetworking/cni/pkg/types" cni100 "github.com/containernetworking/cni/pkg/types/100" + "golang.org/x/sync/semaphore" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -46,7 +47,7 @@ import ( netdefinformerv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions/k8s.cni.cncf.io/v1" kapi "k8s.io/api/core/v1" - meta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -251,18 +252,33 @@ func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreRe logging.Verbosef("server configured with chroot: %s", daemonConfig.ChrootDir) } - return newCNIServer(daemonConfig.SocketDir, kubeClient, exec, serverConfig, ignoreReadinessIndicator) + return newCNIServer(daemonConfig.SocketDir, kubeClient, exec, serverConfig, ignoreReadinessIndicator, daemonConfig.ConcurrentExecs) } -func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, servConfig []byte, ignoreReadinessIndicator bool) (*Server, error) { +func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, servConfig []byte, ignoreReadinessIndicator bool, concurrency *int) (*Server, error) { informerFactory, podInformer := newPodInformer(kubeClient.Client, os.Getenv("MULTUS_NODE_NAME")) netdefInformerFactory, netdefInformer := newNetDefInformer(kubeClient.NetClient) kubeClient.SetK8sClientInformers(podInformer, netdefInformer) router := http.NewServeMux() + handler := http.Handler(router) + + // limit concurrent requests by using a semaphore + if concurrency != nil { + sem := semaphore.NewWeighted(int64(*concurrency)) + handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if err := sem.Acquire(r.Context(), 1); err != nil { + http.Error(w, fmt.Sprintf("%v", err), http.StatusInternalServerError) + return + } + defer sem.Release(1) + router.ServeHTTP(w, r) + }) + } + s := &Server{ Server: http.Server{ - Handler: router, + Handler: handler, }, rundir: rundir, kubeclient: kubeClient, diff --git a/pkg/server/thick_cni_test.go b/pkg/server/thick_cni_test.go index e97612fa9..ceb0550eb 100644 --- a/pkg/server/thick_cni_test.go +++ b/pkg/server/thick_cni_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/tools/record" netfake "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/fake" + k8s "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/api" testhelpers "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/testing" @@ -274,7 +275,7 @@ func createFakePod(k8sClient *k8s.ClientInfo, podName string) error { func startCNIServer(ctx context.Context, runDir string, k8sClient *k8s.ClientInfo, servConfig []byte) (*Server, error) { const period = 0 - cniServer, err := newCNIServer(runDir, k8sClient, &fakeExec{}, servConfig, true) + cniServer, err := newCNIServer(runDir, k8sClient, &fakeExec{}, servConfig, true, nil) if err != nil { return nil, err } diff --git a/pkg/server/types.go b/pkg/server/types.go index 81d4d6819..280aee81d 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -77,6 +77,7 @@ type ControllerNetConf struct { LogLevel string `json:"logLevel"` LogToStderr bool `json:"logToStderr,omitempty"` PerNodeCertificate *PerNodeCertificate `json:"perNodeCertificate,omitempty"` + ConcurrentExecs *int `json:"concurrentExecs,omitempty"` MetricsPort *int `json:"metricsPort,omitempty"` diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 000000000..6a66aea5e --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 000000000..733099041 --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/semaphore/semaphore.go b/vendor/golang.org/x/sync/semaphore/semaphore.go new file mode 100644 index 000000000..30f632c57 --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore.go @@ -0,0 +1,136 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package semaphore provides a weighted semaphore implementation. +package semaphore // import "golang.org/x/sync/semaphore" + +import ( + "container/list" + "context" + "sync" +) + +type waiter struct { + n int64 + ready chan<- struct{} // Closed when semaphore acquired. +} + +// NewWeighted creates a new weighted semaphore with the given +// maximum combined weight for concurrent access. +func NewWeighted(n int64) *Weighted { + w := &Weighted{size: n} + return w +} + +// Weighted provides a way to bound concurrent access to a resource. +// The callers can request access with a given weight. +type Weighted struct { + size int64 + cur int64 + mu sync.Mutex + waiters list.List +} + +// Acquire acquires the semaphore with a weight of n, blocking until resources +// are available or ctx is done. On success, returns nil. On failure, returns +// ctx.Err() and leaves the semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (s *Weighted) Acquire(ctx context.Context, n int64) error { + s.mu.Lock() + if s.size-s.cur >= n && s.waiters.Len() == 0 { + s.cur += n + s.mu.Unlock() + return nil + } + + if n > s.size { + // Don't make other Acquire calls block on one that's doomed to fail. + s.mu.Unlock() + <-ctx.Done() + return ctx.Err() + } + + ready := make(chan struct{}) + w := waiter{n: n, ready: ready} + elem := s.waiters.PushBack(w) + s.mu.Unlock() + + select { + case <-ctx.Done(): + err := ctx.Err() + s.mu.Lock() + select { + case <-ready: + // Acquired the semaphore after we were canceled. Rather than trying to + // fix up the queue, just pretend we didn't notice the cancelation. + err = nil + default: + isFront := s.waiters.Front() == elem + s.waiters.Remove(elem) + // If we're at the front and there're extra tokens left, notify other waiters. + if isFront && s.size > s.cur { + s.notifyWaiters() + } + } + s.mu.Unlock() + return err + + case <-ready: + return nil + } +} + +// TryAcquire acquires the semaphore with a weight of n without blocking. +// On success, returns true. On failure, returns false and leaves the semaphore unchanged. +func (s *Weighted) TryAcquire(n int64) bool { + s.mu.Lock() + success := s.size-s.cur >= n && s.waiters.Len() == 0 + if success { + s.cur += n + } + s.mu.Unlock() + return success +} + +// Release releases the semaphore with a weight of n. +func (s *Weighted) Release(n int64) { + s.mu.Lock() + s.cur -= n + if s.cur < 0 { + s.mu.Unlock() + panic("semaphore: released more than held") + } + s.notifyWaiters() + s.mu.Unlock() +} + +func (s *Weighted) notifyWaiters() { + for { + next := s.waiters.Front() + if next == nil { + break // No more waiters blocked. + } + + w := next.Value.(waiter) + if s.size-s.cur < w.n { + // Not enough tokens for the next waiter. We could keep going (to try to + // find a waiter with a smaller request), but under load that could cause + // starvation for large requests; instead, we leave all remaining waiters + // blocked. + // + // Consider a semaphore used as a read-write lock, with N tokens, N + // readers, and one writer. Each reader can Acquire(1) to obtain a read + // lock. The writer can Acquire(N) to obtain a write lock, excluding all + // of the readers. If we allow the readers to jump ahead in the queue, + // the writer will starve — there is always one token available for every + // reader. + break + } + + s.cur += w.n + s.waiters.Remove(next) + close(w.ready) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 6aaf1c642..b78f9d06b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -224,6 +224,9 @@ golang.org/x/net/trace ## explicit; go 1.17 golang.org/x/oauth2 golang.org/x/oauth2/internal +# golang.org/x/sync v0.4.0 +## explicit; go 1.17 +golang.org/x/sync/semaphore # golang.org/x/sys v0.18.0 ## explicit; go 1.18 golang.org/x/sys/plan9