Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
it512 committed Dec 18, 2024
1 parent 7fd57d5 commit f032de3
Showing 1 changed file with 28 additions and 24 deletions.
52 changes: 28 additions & 24 deletions wechat/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mq

import (
"context"
"log"
"log/slog"
"time"

Expand All @@ -12,6 +11,10 @@ import (

const MSGPACK_MIME = "application/msgpack"

func Dial(url string) (*amqp.Connection, error) {
return amqp.Dial(url)
}

func MsgpMsg(m msgp.Marshaler) (amqp.Publishing, error) {
bs, err := m.MarshalMsg(nil)
if err != nil {
Expand All @@ -21,56 +24,58 @@ func MsgpMsg(m msgp.Marshaler) (amqp.Publishing, error) {
return amqp.Publishing{Body: bs, ContentType: MSGPACK_MIME}, nil
}

func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}

type MQ struct {
Conn *amqp.Connection
ExchangeName string
Timeout time.Duration
timeout time.Duration
logger *slog.Logger

Logger *slog.Logger
conn *amqp.Connection
exchangeName string

ch *amqp.Channel

binit bool
}

func New(logger *slog.Logger, timeout time.Duration) *MQ {
return &MQ{
Timeout: timeout,
Logger: logger,
timeout: timeout,
logger: logger,
binit: false,
}
}

func (q *MQ) BuildWith(conn *amqp.Connection) {
func (q *MQ) BuildWith(conn *amqp.Connection, exanme string) error {
if !q.binit {
return nil
}

var err error

q.Conn = conn
q.conn = conn
q.exchangeName = exanme

q.ch, err = q.Conn.Channel()
failOnError(err, "Failed to open a channel")
if q.ch, err = q.conn.Channel(); err != nil {
return err
}

err = q.ch.ExchangeDeclare(
q.ExchangeName, // name
q.exchangeName, // name
amqp.ExchangeTopic, // type
true, // durable
true, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")

return err
}

func (mq *MQ) Send(ctx context.Context, routingKey string, msg amqp.Publishing) error {
octx, cancel := context.WithTimeout(ctx, mq.Timeout)
octx, cancel := context.WithTimeout(ctx, mq.timeout)
defer cancel()

err := mq.ch.PublishWithContext(octx,
mq.ExchangeName, // exchange
mq.exchangeName, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
Expand All @@ -80,6 +85,5 @@ func (mq *MQ) Send(ctx context.Context, routingKey string, msg amqp.Publishing)
}

func (mq *MQ) Close() error {
mq.ch.Close()
return nil
return mq.ch.Close()
}

0 comments on commit f032de3

Please sign in to comment.