在agent开启时初始化了3个UDP服务
每个服务对应处理不同的数据格式
官方推荐使用6831端口接收数据
github.com/uber/jaeger/cmd/agent/app/flags.go #35
var defaultProcessors = []struct {
model model
protocol protocol
hostPort string
}{
{model: "zipkin", protocol: "compact", hostPort: ":5775"},
{model: "jaeger", protocol: "compact", hostPort: ":6831"},
{model: "jaeger", protocol: "binary", hostPort: ":6832"},
}
github.com/uber/jaeger/cmd/agent/app/servers/thriftudp/transport.go #73
func NewTUDPServerTransport(hostPort string) (*TUDPTransport, error) {
addr, err := net.ResolveUDPAddr("udp", hostPort)
if err != nil {
return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error())
}
conn, err := net.ListenUDP(addr.Network(), addr)
if err != nil {
return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error())
}
return &TUDPTransport{addr: conn.LocalAddr(), conn: conn}, nil
}
github.com/uber/jaeger/cmd/agent/app/processors/thrift_processor.go #78
// Serve initiates the readers and starts serving traffic
func (s *ThriftProcessor) Serve() {
s.processing.Add(s.numProcessors)
for i := 0; i < s.numProcessors; i++ {
go s.processBuffer()
}
s.server.Serve()
}
github.com/uber/jaeger/cmd/agent/app/servers/tbuffered_server.go
func NewTBufferedServer(
transport thrift.TTransport,
maxQueueSize int,
maxPacketSize int,
mFactory metrics.Factory,
) (*TBufferedServer, error) {
//数据队列
dataChan := make(chan *ReadBuf, maxQueueSize)
//对象池初始化
var readBufPool = &sync.Pool{
New: func() interface{} {
return &ReadBuf{bytes: make([]byte, maxPacketSize)}
},
}
res := &TBufferedServer{dataChan: dataChan,
transport: transport,
maxQueueSize: maxQueueSize,
maxPacketSize: maxPacketSize,
readBufPool: readBufPool,
}
metrics.Init(&res.metrics, mFactory, nil)
return res, nil
}
github.com/uber/jaeger/cmd/agent/app/servers/tbuffered_server.go #80
func (s *TBufferedServer) Serve() {
atomic.StoreUint32(&s.serving, 1)
for s.IsServing() {
readBuf := s.readBufPool.Get().(*ReadBuf)
n, err := s.transport.Read(readBuf.bytes)
if err == nil {
readBuf.n = n
s.metrics.PacketSize.Update(int64(n))
select {
case s.dataChan <- readBuf:
s.metrics.PacketsProcessed.Inc(1)
s.updateQueueSize(1)
default:
//这里需要注意,如果写比处理快,agent将会扔掉超出的部分数据
s.metrics.PacketsDropped.Inc(1)
}
} else {
s.metrics.ReadError.Inc(1)
}
}
}
注意点:agent的队列默认存放1000条消息,超过部分会被扔掉。
基于服务器的承载能力可以适当调节参数,减少数据的丢失,保持调用链的完整。
- 增加队列长度(default 1000) --processor.zipkin-compact.server-queue-size
- 增加处理协成数 (default 50) --processor.zipkin-compact.workers
- 增加agent服务节点
github.com/uber/jaeger/cmd/agent/app/processors/thrift_processor.go #104
func (s *ThriftProcessor) processBuffer() {
for readBuf := range s.server.DataChan() {
protocol := s.protocolPool.Get().(thrift.TProtocol)
protocol.Transport().Write(readBuf.GetBytes())
s.server.DataRecd(readBuf) // acknowledge receipt and release the buffer
if ok, _ := s.handler.Process(protocol, protocol); !ok {
// TODO log the error
s.metrics.HandlerProcessError.Inc(1)
}
s.protocolPool.Put(protocol)
}
s.processing.Done()
}
github.com/uber/jaeger/thrift-gen/jaeger/agent.go #105
func (p *AgentProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
name, _, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return false, err
}
if processor, ok := p.GetProcessorFunction(name); ok {
return processor.Process(seqId, iprot, oprot)
}
iprot.Skip(thrift.STRUCT)
iprot.ReadMessageEnd()
x7 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
x7.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false, x7
}
github.com/uber/jaeger/thrift-gen/agent/agent.go #187
func (p *agentProcessorEmitBatch) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
args := AgentEmitBatchArgs{}
if err = args.Read(iprot); err != nil {
iprot.ReadMessageEnd()
return false, err
}
iprot.ReadMessageEnd()
var err2 error
if err2 = p.handler.EmitBatch(args.Batch); err2 != nil {
return true, err2
}
return true, nil
}
github.com/uber/jaeger/thrift-gen/jaeger/tchan-jaeger.go #39
func (c *tchanCollectorClient) SubmitBatches(ctx thrift.Context, batches []*Batch) ([]*BatchSubmitResponse, error) {
var resp CollectorSubmitBatchesResult
args := CollectorSubmitBatchesArgs{
Batches: batches,
}
success, err := c.client.Call(ctx, c.thriftService, "submitBatches", &args, &resp)
if err == nil && !success {
}
return resp.GetSuccess(), err
}