Skip to content

Commit

Permalink
fix: barry 2024-08-28 23:26:36
Browse files Browse the repository at this point in the history
  • Loading branch information
kooksee committed Aug 28, 2024
1 parent 8b1a312 commit a8eca3b
Show file tree
Hide file tree
Showing 21 changed files with 329 additions and 422 deletions.
30 changes: 13 additions & 17 deletions cmds/protoc-gen-cloud-job/internal/gen.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package internal

import (
"context"
"fmt"
"strings"

"github.com/dave/jennifer/jen"
"github.com/pubgo/lava/pkg/proto/cloudjobpb"
"google.golang.org/protobuf/compiler/protogen"
"google.golang.org/protobuf/proto"
)

const asyncPkg = "github.com/aginetwork7/portal-server/internal/component/asyncjob"
const jobPkg = "github.com/pubgo/lava/component/cloudjobs"

type eventInfo struct {
srv *protogen.Service
Expand All @@ -19,11 +19,11 @@ type eventInfo struct {

// GenerateFile generates a .errors.pb.go file containing service definitions.
func GenerateFile(gen *protogen.Plugin, file *protogen.File) *protogen.GeneratedFile {
filename := file.GeneratedFilenamePrefix + ".pb.asyncjob.go"
filename := file.GeneratedFilenamePrefix + ".pb.cloud_job.go"
genFile := jen.NewFile(string(file.GoPackageName))
genFile.HeaderComment("Code generated by protoc-gen-asyncjob. DO NOT EDIT.")
genFile.HeaderComment("Code generated by protoc-gen-cloud-job. DO NOT EDIT.")
genFile.HeaderComment("versions:")
genFile.HeaderComment(fmt.Sprintf("- protoc-gen-asyncjob %s", version))
genFile.HeaderComment(fmt.Sprintf("- protoc-gen-cloud-job %s", version))
genFile.HeaderComment(fmt.Sprintf("- protoc %s", protocVersion(gen)))
if file.Proto.GetOptions().GetDeprecated() {
genFile.HeaderComment(fmt.Sprintf("%s is a deprecated file.", file.Desc.Path()))
Expand All @@ -40,7 +40,7 @@ func GenerateFile(gen *protogen.Plugin, file *protogen.File) *protogen.Generated

var events = make(map[string]map[string]*eventInfo)
for _, srv := range file.Services {
name, ok := proto.GetExtension(srv.Desc.Options(), asyncjobpb.E_JobName).(string)
name, ok := proto.GetExtension(srv.Desc.Options(), cloudjobpb.E_JobName).(string)
if !ok || name == "" {
continue
}
Expand All @@ -50,13 +50,13 @@ func GenerateFile(gen *protogen.Plugin, file *protogen.File) *protogen.Generated
}

for _, m := range srv.Methods {
jobSubjectName, ok := proto.GetExtension(m.Desc.Options(), asyncjobpb.E_SubjectName).(string)
jobSubjectName, ok := proto.GetExtension(m.Desc.Options(), cloudjobpb.E_SubjectName).(string)
if !ok || jobSubjectName == "" {
continue
}

if events[name][jobSubjectName] != nil {
gen.Error(fmt.Errorf("async job:%s subject:%s exists", name, jobSubjectName))
gen.Error(fmt.Errorf("cloud job:%s subject:%s exists", name, jobSubjectName))
return g
}

Expand All @@ -79,20 +79,20 @@ func GenerateFile(gen *protogen.Plugin, file *protogen.File) *protogen.Generated
Id(keyName).
Op("=").
Lit(subName)
genFile.Var().Id("_").Op("=").Qual(asyncPkg, "RegisterSubject").
genFile.Var().Id("_").Op("=").Qual(jobPkg, "RegisterSubject").
Call(jen.Id(keyName), jen.New(jen.Id(info.mth.Input.GoIdent.GoName))).Line()

genFile.
Func().
Id(fmt.Sprintf("Register%s%sAsyncJob", code, info.mth.GoName)).
Params(
jen.Id("jobCli").Op("*").Qual(asyncPkg, "Client"),
jen.Id("jobCli").Op("*").Qual(jobPkg, "Client"),
jen.Id("handler").Func().Params(
jen.Id("ctx").Op("*").Qual(asyncPkg, "Context"),
jen.Id("ctx").Op("*").Qual(jobPkg, "Context"),
jen.Id("req").Op("*").Id(info.mth.Input.GoIdent.GoName),
).Error(),
).
Block(jen.Qual(asyncPkg, "RegisterJobHandler").Call(jen.Id("jobCli"), jen.Lit(name), jen.Id(keyName), jen.Id("handler")))
Block(jen.Qual(jobPkg, "RegisterJobHandler").Call(jen.Id("jobCli"), jen.Lit(name), jen.Id(keyName), jen.Id("handler")))
genFile.Line()

var prefix = fmt.Sprintf("Push%s", code)
Expand All @@ -102,7 +102,7 @@ func GenerateFile(gen *protogen.Plugin, file *protogen.File) *protogen.Generated
Func().
Id(mthName).
Params(
jen.Id("jobCli").Op("*").Qual(asyncPkg, "Client"),
jen.Id("jobCli").Op("*").Qual(jobPkg, "Client"),
jen.Id("ctx").Qual("context", "Context"),
jen.Id("req").Op("*").Id(info.mth.Input.GoIdent.GoName),
).
Expand All @@ -127,10 +127,6 @@ func protocVersion(gen *protogen.Plugin) string {
return fmt.Sprintf("v%d.%d.%d%s", v.GetMajor(), v.GetMinor(), v.GetPatch(), suffix)
}

func PushS3DeleteObjectEvent(jobCli *eventjob.Client, ctx context.Context, req *pbv1.PushS3DeleteObjectEventReq) error {
return jobCli.Publish(ctx, pbv1.S3PushS3DeleteObjectEventKey, req)
}

func handlerPushEventName(name string, prefix string) string {
if strings.HasPrefix(name, prefix) {
return name
Expand Down
2 changes: 1 addition & 1 deletion cmds/protoc-gen-cloud-job/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package main
import (
"flag"

"github.com/pubgo/lava/cmds/protoc-gen-asyncjob/internal"
"github.com/pubgo/lava/cmds/protoc-gen-cloud-job/internal"
"google.golang.org/protobuf/compiler/protogen"
"google.golang.org/protobuf/types/pluginpb"
)
Expand Down
6 changes: 3 additions & 3 deletions component/cloudjobs/aaa.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package eventjob
package cloudjobs

import (
"github.com/pubgo/funk/log"
"google.golang.org/protobuf/proto"
)

var logger = log.GetLogger("eventjob")
var logger = log.GetLogger("cloud_jobs")

type Register interface {
RegisterAsyncJob(jobCli *Client)
RegisterCloudJobs(jobCli *Client)
}

type JobHandler[T proto.Message] func(ctx *Context, args T) error
2 changes: 1 addition & 1 deletion component/cloudjobs/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eventjob
package cloudjobs

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion component/cloudjobs/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eventjob
package cloudjobs

import (
"time"
Expand Down
14 changes: 14 additions & 0 deletions component/cloudjobs/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
jobs:
# stream name: stream config
streams:
gid:
storage: "file"
subjects: ["gid.>"]
consumers:
gid:
- consumer: "test:gid"
stream: "gid"
subjects: "gid.proxy.exec"
job:
timeout: "1m"
max_retries: 10
2 changes: 1 addition & 1 deletion component/cloudjobs/context.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eventjob
package cloudjobs

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion component/cloudjobs/errors.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eventjob
package cloudjobs

import "github.com/pubgo/funk/errors"

Expand Down
2 changes: 1 addition & 1 deletion component/cloudjobs/registry.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eventjob
package cloudjobs

import "google.golang.org/protobuf/proto"

Expand Down
2 changes: 1 addition & 1 deletion component/cloudjobs/util.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eventjob
package cloudjobs

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion component/cloudjobs/util_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eventjob
package cloudjobs

import (
"context"
Expand Down
Loading

0 comments on commit a8eca3b

Please sign in to comment.