Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cloud job #44

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmds/protoc-gen-cloud-job/internal/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func GenerateFile(gen *protogen.Plugin, file *protogen.File) *protogen.Generated

genFile.
Func().
Id(fmt.Sprintf("Register%s%sAsyncJob", code, info.mth.GoName)).
Id(fmt.Sprintf("Register%s%sCloudJob", code, info.mth.GoName)).
Params(
jen.Id("jobCli").Op("*").Qual(jobPkg, "Client"),
jen.Id("handler").Func().Params(
Expand All @@ -96,7 +96,7 @@ func GenerateFile(gen *protogen.Plugin, file *protogen.File) *protogen.Generated
genFile.Line()

var prefix = fmt.Sprintf("Push%s", code)
var mthName = fmt.Sprintf("%sAsyncJob", info.mth.GoName)
var mthName = fmt.Sprintf("%sCloudJob", info.mth.GoName)
mthName = handlerPushEventName(mthName, prefix)
genFile.
Func().
Expand Down
22 changes: 11 additions & 11 deletions component/cloudjobs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (c *Client) initConsumer() error {
defer cancel()
for jobOrConsumerName, consumers := range c.p.Cfg.Consumers {
jobName := jobOrConsumerName
assert.If(c.handlers[jobName] == nil, "failed to find job handler: %s, please impl RegisterAsyncJob", jobName)
assert.If(c.handlers[jobName] == nil, "failed to find job handler: %s, please impl RegisterCloudJob", jobName)

consumerName := jobOrConsumerName
for _, cfg := range consumers {
Expand Down Expand Up @@ -180,7 +180,7 @@ func (c *Client) initConsumer() error {
e.Any("stream_name", streamName)
e.Any("consumer_name", consumerName)
e.Any("job_subject", subName)
e.Msg("register async job handler executor")
e.Msg("register cloud job handler executor")
})
c.jobs[streamName][consumerName][subName] = job
}
Expand Down Expand Up @@ -228,7 +228,7 @@ func (c *Client) doHandler(meta *jetstream.MsgMetadata, msg jetstream.Msg, job *
e.Str("timeout", timeout.String())
e.Str("start_time", now.String())
e.Str("job_cost", time.Since(now).String())
e.Msg("failed to do async job handler")
e.Msg("failed to do cloud job handler")
})
}()

Expand All @@ -252,7 +252,7 @@ func (c *Client) doHandler(meta *jetstream.MsgMetadata, msg jetstream.Msg, job *

return errors.WrapFn(job.handler(msgCtx, dst), func() errors.Tags {
return errors.Tags{
errors.T("msg", "failed to do async job handler"),
errors.T("msg", "failed to do cloud job handler"),
errors.T("args", dst),
errors.T("any_pb", pb.String()),
}
Expand All @@ -271,7 +271,7 @@ func (c *Client) doConsume() error {
e.Str("stream", streamName)
e.Str("consumer", consumerName)
e.Any("subjects", lo.MapKeys(jobSubjects, func(_ *jobHandler, key string) string { return key }))
e.Msg("async job do consumer")
e.Msg("cloud job do consumer")
})

var doConsumeHandler = func(msg jetstream.Msg) {
Expand All @@ -288,7 +288,7 @@ func (c *Client) doConsume() error {

logger.Debug().Func(func(e *zerolog.Event) {
addMsgInfo(e)
e.Msg("received async job event")
e.Msg("received cloud job event")
})

handler := jobSubjects[msg.Subject()]
Expand Down Expand Up @@ -341,12 +341,12 @@ func (c *Client) doConsume() error {
Err(err).
Func(addMsgInfo).
Any("metadata", meta).
Msg("retry nats stream async job event")
Msg("retry nats stream cloud job event")
checkErrAndLog(msg.NakWithDelay(backoff), "failed to retry msg with delay nak")
return
}

checkErrAndLog(err, "failed to do handler async job")
checkErrAndLog(err, "failed to do handler cloud job")
checkErrAndLog(msg.Ack(), "failed to do msg ack with handler error")
}

Expand Down Expand Up @@ -392,9 +392,9 @@ func (c *Client) publish(ctx context.Context, key string, args proto.Message, op
}
}
if gErr == nil {
logger.Info(ctx).Func(msgFn).Msg("succeed to publish async job event to stream")
logger.Info(ctx).Func(msgFn).Msg("succeed to publish cloud job event to stream")
} else {
logger.Err(gErr, ctx).Func(msgFn).Msg("failed to publish async job event to stream")
logger.Err(gErr, ctx).Func(msgFn).Msg("failed to publish cloud job event to stream")
}
}()

Expand Down Expand Up @@ -467,6 +467,6 @@ func (c *Client) registerJobHandler(jobName string, topic string, handler JobHan
e.Str("job_name", jobName)
e.Str("topic", topic)
e.Str("job_handler", stack.CallerWithFunc(handler).String())
e.Msg("register async job handler")
e.Msg("register cloud job handler")
})
}
2 changes: 1 addition & 1 deletion component/cloudjobs/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cloudjobs

import "github.com/pubgo/funk/errors"

var errReject = errors.New("asyncjob: reject retry and discard msg")
var errReject = errors.New("cloudjobs: reject retry and discard msg")

func Reject(errs ...error) error {
var reason = "reject"
Expand Down
4 changes: 2 additions & 2 deletions internal/example/grpc/pkg/proto/gidpb/id.pb.cloud_job.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

116 changes: 0 additions & 116 deletions pkg/proto/asyncjobpb/options.pb.go

This file was deleted.

Loading