Skip to content

Commit

Permalink
Merge remote-tracking branch 'gitlab/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
Dot-Liu committed Sep 26, 2023
2 parents 54cc66c + 4cb4e85 commit 5954192
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 256 deletions.
56 changes: 26 additions & 30 deletions drivers/counter/nsq/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (p *producerPool) PublishAsync(topic string, body []byte) error {
if err := producerNode.producer.Publish(topic, body); err != nil {
//发送失败,将该节点状态置为disconnected,等待check重新连接
producerNode.status = disconnected
log.Errorf("log output nsqd is invalid. nsqd_addr:%s error:%s", producerNode.producer.String(), err)
log.Errorf("fail to publish. nsqd_addr:%s error:%s", producerNode.producer.String(), err)
continue
}
return
Expand All @@ -99,44 +99,40 @@ func (p *producerPool) PublishAsync(topic string, body []byte) error {
return nil
}

func (p *producerPool) check() {
for _, n := range p.nodes {
if err := n.producer.Ping(); err != nil {

oldProducer := n.producer
newProducer, _ := nsq.NewProducer(oldProducer.String(), p.config)
if err = newProducer.Ping(); err != nil {
if n.status == connecting {
n.status = disconnected
log.Errorf("log output nsqd is invalid. nsqd_addr:%s error:%s", oldProducer.String(), err)
}
newProducer.Stop()
continue
}
n.producer = newProducer
n.status = connecting
oldProducer.Stop()
}
n.status = connecting

}
}

// Check 检查节点状态
func (p *producerPool) Check() {

ticker := time.NewTicker(time.Second * 30)
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
ctx, cancelFunc := context.WithCancel(context.Background())
p.cancelFunc = cancelFunc
for {

select {
case <-ticker.C:
for _, n := range p.nodes {
if err := n.producer.Ping(); err != nil {
//解决断线重连的问题
//n.producer.Stop()
//if err = n.producer.Ping();err != nil{
// continue
//}
//n.status = connecting
//continue

oldProducer := n.producer
newProducer, _ := nsq.NewProducer(oldProducer.String(), p.config)
if err = newProducer.Ping(); err != nil {
if n.status == connecting {
n.status = disconnected
log.Errorf("log output nsqd is invalid. nsqd_addr:%s error:%s", oldProducer.String(), err)
}
newProducer.Stop()
continue
}
n.producer = newProducer
n.status = connecting
oldProducer.Stop()
}
n.status = connecting

}
p.check()
case <-ctx.Done():
return
}
Expand Down
6 changes: 4 additions & 2 deletions drivers/output/nsq/output.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package nsq

import (
scope_manager "github.com/eolinker/apinto/scope-manager"
"reflect"

scope_manager "github.com/eolinker/apinto/scope-manager"

"github.com/eolinker/apinto/drivers"
"github.com/eolinker/apinto/output"
"github.com/eolinker/eosc"
Expand All @@ -22,7 +23,8 @@ type NsqOutput struct {
func (n *NsqOutput) Output(entry eosc.IEntry) error {
w := n.write
if w != nil {
return w.output(entry)
w.output(entry)
return nil
}
return eosc.ErrorWorkerNotRunning
}
Expand Down
90 changes: 41 additions & 49 deletions drivers/output/nsq/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,79 +62,71 @@ func CreateProducerPool(addrs []string, authSecret string, conf map[string]inter
return pool, nil
}

func (p *producerPool) PublishAsync(topic string, body []byte) error {
func (p *producerPool) Publish(topic string, body [][]byte) error {
if p.isClose {
return errNoValidProducer
}

//使用round-robin进行负载均衡
n := int(atomic.AddUint32(&p.next, 1))

go func(n int) {
for attempt := 0; attempt < p.size; attempt++ {
//轮询
index := (n + attempt - 1) % p.size
producerNode := p.nodes[index]
//若该节点不可用
if producerNode.status == disconnected {
continue
}

for attempt := 0; attempt < p.size; attempt++ {
//轮询
index := (n + attempt - 1) % p.size
producerNode := p.nodes[index]
//若该节点不可用
if producerNode.status == disconnected {
continue
}
//发送消息
if err := producerNode.producer.MultiPublish(topic, body); err != nil {
//发送失败,将该节点状态置为disconnected,等待check重新连接
producerNode.status = disconnected
log.Errorf("fail to publish. nsqd_addr:%s error:%s", producerNode.producer.String(), err)
continue
}
return nil
}
//log.Errorf("no available nsqd node. data: %s", fmt.Sprintf("topic:%s data:%s", topic, body))
return fmt.Errorf("no available nsqd node, data: %s", fmt.Sprintf("topic:%s data:%s", topic, body))
}

//发送消息
if err := producerNode.producer.Publish(topic, body); err != nil {
//发送失败,将该节点状态置为disconnected,等待check重新连接
producerNode.status = disconnected
log.Errorf("log output nsqd is invalid. nsqd_addr:%s error:%s", producerNode.producer.String(), err)
func (p *producerPool) check() {
for _, n := range p.nodes {
if err := n.producer.Ping(); err != nil {

oldProducer := n.producer
newProducer, _ := nsq.NewProducer(oldProducer.String(), p.config)
if err = newProducer.Ping(); err != nil {
if n.status == connecting {
n.status = disconnected
log.Errorf("log output nsqd is invalid. nsqd_addr:%s error:%s", oldProducer.String(), err)
}
newProducer.Stop()
continue
}
return
n.producer = newProducer
n.status = connecting
oldProducer.Stop()
}
log.Errorf("no available nsqd node. data: %s", fmt.Sprintf("topic:%s data:%s", topic, body))
}(n)
n.status = connecting

return nil
}
}

// Check 检查节点状态
func (p *producerPool) Check() {

ticker := time.NewTicker(time.Second * 30)
p.check()
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
ctx, cancelFunc := context.WithCancel(context.Background())
p.cancelFunc = cancelFunc
for {

select {
case <-ticker.C:
for _, n := range p.nodes {

if err := n.producer.Ping(); err != nil {
//解决断线重连的问题
//n.producer.Stop()
//if err = n.producer.Ping();err != nil{
// continue
//}
//n.status = connecting
//continue

oldProducer := n.producer
newProducer, _ := nsq.NewProducer(oldProducer.String(), p.config)
if err = newProducer.Ping(); err != nil {
if n.status == connecting {
n.status = disconnected
log.Errorf("log output nsqd is invalid. nsqd_addr:%s error:%s", oldProducer.String(), err)
}
newProducer.Stop()
continue
}
n.producer = newProducer
n.status = connecting
oldProducer.Stop()
}
n.status = connecting

}
p.check()
case <-ctx.Done():
return
}
Expand Down
Loading

0 comments on commit 5954192

Please sign in to comment.