Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: content moderation for provider based on deployer address #172

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 71 additions & 5 deletions bidengine/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
ctypes "github.com/akash-network/provider/cluster/types/v1beta3"
"github.com/akash-network/provider/event"
"github.com/akash-network/provider/session"

clusterClient "github.com/akash-network/provider/cluster/kube"
providerflags "github.com/akash-network/provider/cmd/provider-services/cmd/flags"
"github.com/spf13/viper"
)

// order manages bidding and general lifecycle handling of an order.
Expand Down Expand Up @@ -402,11 +406,64 @@ loop:

offer := mtypes.ResourceOfferFromRU(reservation.GetAllocatedResources())

// Begin submitting fulfillment
msg = mtypes.NewMsgCreateBid(o.orderID, o.session.Provider().Address(), price, o.cfg.Deposit, offer)
bidch = runner.Do(func() runner.Result {
return runner.NewResult(o.session.Client().Tx().Broadcast(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError()))
})
// Check if the provider's address matches the allowed or denied wallet addresses.
var allowWalletAddress, denyWalletAddress []string
ownerAddress := o.orderID.Owner

// Fetching configuration path and namespace from viper.
configPath, ns := viper.GetString(providerflags.FlagKubeConfig), viper.GetString(providerflags.FlagK8sManifestNS)

// Initialize a new client for cluster operations.
client, err := clusterClient.NewClient(ctx, o.log, ns, configPath)
if err != nil {
o.log.Error("Failed to initialize cluster client", "err", err)
break loop
}

// Retrieving all moderation filters.
allModerationFilters, err := client.AllModerationFilters(ctx)
if err != nil {
o.log.Error("Failed to retrieve moderation filters", "error", err)
break loop
}

// Filtering allowed and denied wallet addresses based on moderation filters.
for _, allFilters := range allModerationFilters {
if allFilters.Type == "TenantAddress" && allFilters.Allow {
allowWalletAddress = append(allowWalletAddress, allFilters.Pattern)
}
if allFilters.Type == "TenantAddress" && !allFilters.Allow {
denyWalletAddress = append(denyWalletAddress, allFilters.Pattern)
}
}

// Check if both allowWalletAddress and denyWalletAddress are empty
if len(allowWalletAddress) == 0 && len(denyWalletAddress) == 0 {
// Both lists are empty, proceeding to create and submit the bid
// Begin submitting fulfillment
msg = mtypes.NewMsgCreateBid(o.orderID, o.session.Provider().Address(), price, o.cfg.Deposit, offer)
bidch = runner.Do(func() runner.Result {
return runner.NewResult(o.session.Client().Tx().Broadcast(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError()))
})
} else {
// Checking if the owner's address is in the allow or deny list and acting accordingly.
if ContainsString(allowWalletAddress, ownerAddress) {
// Address is allowed, proceeding to create and submit the bid.
// Begin submitting fulfillment
msg = mtypes.NewMsgCreateBid(o.orderID, o.session.Provider().Address(), price, o.cfg.Deposit, offer)
bidch = runner.Do(func() runner.Result {
return runner.NewResult(o.session.Client().Tx().Broadcast(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError()))
})
} else if ContainsString(denyWalletAddress, ownerAddress) {
o.log.Info("Wallet Address Check", "denyWalletAddress", denyWalletAddress, "foundAddress", ownerAddress)
o.log.Error("Bid not placed: deployer's address is denied", "address", ownerAddress)
break loop
} else {
o.log.Info("Wallet Address Check", "allowedAddresses", allowWalletAddress, "foundAddress", ownerAddress)
o.log.Error("Bid not placed: deployer's address is not in the allow list", "address", ownerAddress)
break loop
}
}

case result := <-bidch:
bidch = nil
Expand Down Expand Up @@ -488,6 +545,15 @@ loop:
}
}

func ContainsString(slice []string, target string) bool {
for _, element := range slice {
if element == target {
return true
}
}
return false
}

func (o *order) shouldBid(group *dtypes.Group) (bool, error) {
// does provider have required attributes?
if !group.GroupSpec.MatchAttributes(o.session.Provider().Attributes) {
Expand Down
6 changes: 6 additions & 0 deletions cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type ReadClient interface {
AllHostnames(context.Context) ([]ctypes.ActiveHostname, error)
GetManifestGroup(context.Context, mtypes.LeaseID) (bool, crd.ManifestGroup, error)

AllModerationFilters(context.Context) ([]ctypes.ActiveFilters, error)

ObserveHostnameState(ctx context.Context) (<-chan ctypes.HostnameResourceEvent, error)
GetHostnameDeploymentConnections(ctx context.Context) ([]ctypes.LeaseIDHostnameConnection, error)

Expand Down Expand Up @@ -607,6 +609,10 @@ func (c *nullClient) AllHostnames(context.Context) ([]ctypes.ActiveHostname, err
return nil, nil
}

func (c *nullClient) AllModerationFilters(context.Context) ([]ctypes.ActiveFilters, error) {
return nil, nil
}

func (c *nullClient) KubeVersion() (*version.Info, error) {
return nil, nil
}
Expand Down
45 changes: 45 additions & 0 deletions cluster/kube/client_moderationfilters_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package kube

import (
"context"
"fmt"

"github.com/akash-network/provider/cluster/kube/builder"
ctypes "github.com/akash-network/provider/cluster/types/v1beta3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/pager"

crd "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2"
)

func (c *client) AllModerationFilters(ctx context.Context) ([]ctypes.ActiveFilters, error) {
result := make([]ctypes.ActiveFilters, 0)

filterPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return c.ac.AkashV2beta2().ModerationFilters(c.ns).List(ctx, opts)
})

listOptions := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=true", builder.AkashManagedLabelName),
}

err := filterPager.EachListItem(ctx, listOptions, func(obj runtime.Object) error {
fp := obj.(*crd.ModerationFilter)

for _, filter := range fp.Spec {
result = append(result, ctypes.ActiveFilters{
Allow: filter.Allow,
Type: filter.Type,
Pattern: filter.Pattern,
})
}
return nil

})

if err != nil {
return nil, err
}
return result, nil
}
54 changes: 54 additions & 0 deletions cluster/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions cluster/types/v1beta3/moderationfilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package v1beta3

type ActiveFilters struct {
Allow bool
Pattern string
Type string
}
46 changes: 46 additions & 0 deletions pkg/apis/akash.network/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -526,3 +526,49 @@ spec:
type: string
sharing_key:
type: string
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
# name must match the spec fields below, and be in the form: <plural>.<group>
name: moderationfilters.akash.network
# DO NOT REMOVE resource-policy annotation!
annotations:
"helm.sh/resource-policy": keep
spec:
names:
# plural name to be used in the URL: /apis/<group>/<version>/<plural>
plural: moderationfilters
# singular name to be used as an alias on the CLI and for display
singular: moderationfilter
# kind is normally the CamelCased singular type. Your resource manifests use this.
kind: ModerationFilter
# shortNames allow shorter string to match your resource on the CLI
shortNames:
- mfilter
# group name to use for REST API: /apis/<group>/<version>
# list of versions supported by this CustomResourceDefinition
group: akash.network
scope: Namespaced
versions:
- name: v2beta2
# Each version can be enabled/disabled by Served flag.
served: true
# One and only one version must be marked as the storage version.
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: array
items:
type: object
properties:
type:
type: string
enum: [TenantAddress, Hostname, Image]
pattern:
type: string
allow:
type: boolean
30 changes: 30 additions & 0 deletions pkg/apis/akash.network/v2beta2/moderationfilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package v2beta2

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// ModerationFilter store metadata, specifications and status of the ModerationFilter
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type ModerationFilter struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata"`

Spec []ModerationFilterSpec `json:"spec,omitempty"`
}

// ModerationFilterList stores metadata and items list of moderationfilter
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type ModerationFilterList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []ModerationFilter `json:"items"`
}

// ModerationFilterSpec stores LeaseID, Group and metadata details
type ModerationFilterSpec struct {
Type string `json:"type"`
Allow bool `json:"allow"`
Pattern string `json:"pattern"`
}
10 changes: 10 additions & 0 deletions pkg/apis/akash.network/v2beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,13 @@ func resourcesFromAkash(aru types.Resources) (Resources, error) {

return res, nil
}

type FilterName struct {
Rules []Rules `json:"rules"`
}

type Rules struct {
Type string `json:"type"`
Allow bool `json:"allow"`
Pattern string `json:"size"`
}
Loading