diff --git a/wechat/mq/mq.go b/wechat/mq/mq.go index 0cf0797..510ba89 100644 --- a/wechat/mq/mq.go +++ b/wechat/mq/mq.go @@ -2,7 +2,6 @@ package mq import ( "context" - "log" "log/slog" "time" @@ -12,6 +11,10 @@ import ( const MSGPACK_MIME = "application/msgpack" +func Dial(url string) (*amqp.Connection, error) { + return amqp.Dial(url) +} + func MsgpMsg(m msgp.Marshaler) (amqp.Publishing, error) { bs, err := m.MarshalMsg(nil) if err != nil { @@ -21,39 +24,42 @@ func MsgpMsg(m msgp.Marshaler) (amqp.Publishing, error) { return amqp.Publishing{Body: bs, ContentType: MSGPACK_MIME}, nil } -func failOnError(err error, msg string) { - if err != nil { - log.Panicf("%s: %s", msg, err) - } -} - type MQ struct { - Conn *amqp.Connection - ExchangeName string - Timeout time.Duration + timeout time.Duration + logger *slog.Logger - Logger *slog.Logger + conn *amqp.Connection + exchangeName string ch *amqp.Channel + + binit bool } func New(logger *slog.Logger, timeout time.Duration) *MQ { return &MQ{ - Timeout: timeout, - Logger: logger, + timeout: timeout, + logger: logger, + binit: false, } } -func (q *MQ) BuildWith(conn *amqp.Connection) { +func (q *MQ) BuildWith(conn *amqp.Connection, exanme string) error { + if !q.binit { + return nil + } + var err error - q.Conn = conn + q.conn = conn + q.exchangeName = exanme - q.ch, err = q.Conn.Channel() - failOnError(err, "Failed to open a channel") + if q.ch, err = q.conn.Channel(); err != nil { + return err + } err = q.ch.ExchangeDeclare( - q.ExchangeName, // name + q.exchangeName, // name amqp.ExchangeTopic, // type true, // durable true, // auto-deleted @@ -61,16 +67,15 @@ func (q *MQ) BuildWith(conn *amqp.Connection) { false, // no-wait nil, // arguments ) - failOnError(err, "Failed to declare an exchange") - + return err } func (mq *MQ) Send(ctx context.Context, routingKey string, msg amqp.Publishing) error { - octx, cancel := context.WithTimeout(ctx, mq.Timeout) + octx, cancel := context.WithTimeout(ctx, mq.timeout) defer cancel() err := mq.ch.PublishWithContext(octx, - mq.ExchangeName, // exchange + mq.exchangeName, // exchange routingKey, // routing key false, // mandatory false, // immediate @@ -80,6 +85,5 @@ func (mq *MQ) Send(ctx context.Context, routingKey string, msg amqp.Publishing) } func (mq *MQ) Close() error { - mq.ch.Close() - return nil + return mq.ch.Close() }