From bae9bc71e231e10dd2f8908fd65e0bd829b69f80 Mon Sep 17 00:00:00 2001 From: Albert Date: Tue, 29 Mar 2022 15:37:27 +0800 Subject: [PATCH] feat: kafka lab --- modules/video/service/service.go | 13 +++------ modules/video/stream/stream.go | 23 ++++----------- pkg/otelkit/pgkit/client.go | 49 ++++++++++++++++++++++++++++++++ pkg/otelkit/pgkit/client_test.go | 47 ++++++++++++++++++++++++++++++ pkg/otelkit/pgkit/main_test.go | 13 +++++++++ 5 files changed, 119 insertions(+), 26 deletions(-) create mode 100644 pkg/otelkit/pgkit/client.go create mode 100644 pkg/otelkit/pgkit/client_test.go create mode 100644 pkg/otelkit/pgkit/main_test.go diff --git a/modules/video/service/service.go b/modules/video/service/service.go index 75e914d..5773c8f 100644 --- a/modules/video/service/service.go +++ b/modules/video/service/service.go @@ -120,12 +120,8 @@ func (s *service) UploadVideo(stream pb.Video_UploadVideoServer) error { return err } - if err := s.produceVideoCreatedEvent(&pb.HandleVideoCreatedRequest{ - Id: id.Hex(), - Url: path.Join(s.storage.Endpoint(), s.storage.Bucket(), objectName), - }); err != nil { - return err - } + // [Kafka TODO] + // [Describe] Video now is uploaded successfully, try to take advantage of produceVideoCreatedEvent here to send messages. if err := stream.SendAndClose(&pb.UploadVideoResponse{ Id: id.Hex(), @@ -169,9 +165,8 @@ func (s *service) produceVideoCreatedEvent(req *pb.HandleVideoCreatedRequest) er {Value: valueBytes}, } - if err := s.producer.SendMessages(msgs); err != nil { - return err - } + // [Kafka TODO] + // [Describe] Send message to kafka via sarama producer return nil } diff --git a/modules/video/stream/stream.go b/modules/video/stream/stream.go index 7c52848..29ca1a6 100644 --- a/modules/video/stream/stream.go +++ b/modules/video/stream/stream.go @@ -37,24 +37,14 @@ func (s *stream) HandleVideoCreated(ctx context.Context, req *pb.HandleVideoCrea if req.GetScale() != 0 { variant := strconv.Itoa(int(req.GetScale())) - if err := s.handleVideoWithVariant(ctx, id, variant, req.GetUrl()); err != nil { - return nil, &saramakit.HandlerError{Retry: true, Err: err} - } + // [Kafka TODO] + // [Describe] Transcode video if get message with scale != 0, you can handle error occurance like above primitive.ObjectIDFromHex(req.GetId()). return &emptypb.Empty{}, nil } - // fanout create events to each variant - variants := []int32{1080, 720, 480, 320} - for _, scale := range variants { - if err := s.produceVideoCreatedWithScaleEvent(&pb.HandleVideoCreatedRequest{ - Id: req.GetId(), - Url: req.GetUrl(), - Scale: scale, - }); err != nil { - return nil, &saramakit.HandlerError{Retry: true, Err: err} - } - } + // [Kafka TODO] + // [Describe] Fanout create events to each variant [1080, 720, 480, 320], you can handle error occurance like above primitive.ObjectIDFromHex(req.GetId()). return &emptypb.Empty{}, nil } @@ -80,9 +70,8 @@ func (s *stream) produceVideoCreatedWithScaleEvent(req *pb.HandleVideoCreatedReq {Value: valueBytes}, } - if err := s.producer.SendMessages(msgs); err != nil { - return err - } + // [Kafka TODO] + // [Describe] Send message to kafka return nil } diff --git a/pkg/otelkit/pgkit/client.go b/pkg/otelkit/pgkit/client.go new file mode 100644 index 0000000..73132c1 --- /dev/null +++ b/pkg/otelkit/pgkit/client.go @@ -0,0 +1,49 @@ +package pgkit + +import ( + "context" + "os" + + "github.com/NTHU-LSALAB/NTHU-Distributed-System/pkg/logkit" + "github.com/go-pg/pg/v10" + "go.uber.org/zap" +) + +type PGConfig struct { + URL string `long:"url" env:"URL" description:"the URL of PostgreSQL" required:"true"` +} + +type PGClient struct { + *pg.DB + closeFunc func() +} + +func (c *PGClient) Close() error { + if c.closeFunc != nil { + c.closeFunc() + } + return c.DB.Close() +} + +func NewPGClient(ctx context.Context, conf *PGConfig) *PGClient { + if url := os.ExpandEnv(conf.URL); url != "" { + conf.URL = url + } + + logger := logkit.FromContext(ctx).With(zap.String("url", conf.URL)) + opts, err := pg.ParseURL(conf.URL) + if err != nil { + logger.Fatal("failed to parse PostgreSQL url", zap.Error(err)) + } + + db := pg.Connect(opts).WithContext(ctx) + if err := db.Ping(ctx); err != nil { + logger.Fatal("failed to ping PostgreSQL", zap.Error(err)) + } + + logger.Info("create PostgreSQL client successfully") + + return &PGClient{ + DB: db, + } +} diff --git a/pkg/otelkit/pgkit/client_test.go b/pkg/otelkit/pgkit/client_test.go new file mode 100644 index 0000000..6e2e10b --- /dev/null +++ b/pkg/otelkit/pgkit/client_test.go @@ -0,0 +1,47 @@ +package pgkit + +import ( + "context" + "os" + + "github.com/NTHU-LSALAB/NTHU-Distributed-System/pkg/logkit" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("PGClient", func() { + Describe("NewPGClient", func() { + var ( + ctx context.Context + pgConf *PGConfig + pgClient *PGClient + ) + + BeforeEach(func() { + ctx = logkit.NewLogger(&logkit.LoggerConfig{ + Development: true, + }).WithContext(context.Background()) + + pgConf = &PGConfig{ + URL: "postgres://postgres@postgres:5432/postgres?sslmode=disable", + } + if url := os.Getenv("POSTGRES_URL"); url != "" { + pgConf.URL = url + } + }) + + JustBeforeEach(func() { + pgClient = NewPGClient(ctx, pgConf) + }) + + AfterEach(func() { + Expect(pgClient.Close()).NotTo(HaveOccurred()) + }) + + When("success", func() { + It("returns new PGClient without error", func() { + Expect(pgClient).NotTo(BeNil()) + }) + }) + }) +}) diff --git a/pkg/otelkit/pgkit/main_test.go b/pkg/otelkit/pgkit/main_test.go new file mode 100644 index 0000000..b390e54 --- /dev/null +++ b/pkg/otelkit/pgkit/main_test.go @@ -0,0 +1,13 @@ +package pgkit + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestPGKit(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Test PG Kit") +}