Skip to content

Commit

Permalink
PutLogs with ingest processor (#309)
Browse files Browse the repository at this point in the history
* feature: putlogs with ingest processor

* feature: producer with ingest processor

* chore: update examples

* feature: PutLogs and PostLogStoreLogs not support ingest processor

* feature: optimize processor param
  • Loading branch information
syaning authored Dec 23, 2024
1 parent ac186b4 commit 270ed16
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 13 deletions.
11 changes: 6 additions & 5 deletions client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ func (c *Client) PutLogs(project, logstore string, lg *LogGroup) (err error) {
// The callers should transform user logs into LogGroup.
func (c *Client) PostLogStoreLogs(project, logstore string, lg *LogGroup, hashKey *string) (err error) {
ls := convertLogstore(c, project, logstore)
return ls.PostLogStoreLogs(lg, hashKey)
req := &PostLogStoreLogsRequest{
LogGroup: lg,
HashKey: hashKey,
}
return ls.PostLogStoreLogs(req)
}

func (c *Client) PutLogsWithMetricStoreURL(project, logstore string, lg *LogGroup) (err error) {
Expand All @@ -111,10 +115,7 @@ func (c *Client) PutLogsWithMetricStoreURL(project, logstore string, lg *LogGrou

func (c *Client) PostLogStoreLogsV2(project, logstore string, req *PostLogStoreLogsRequest) (err error) {
ls := convertLogstore(c, project, logstore)
if err := ls.SetPutLogCompressType(req.CompressType); err != nil {
return err
}
return ls.PostLogStoreLogs(req.LogGroup, req.HashKey)
return ls.PostLogStoreLogs(req)
}

// PostRawLogWithCompressType put raw log data to log service, no marshal
Expand Down
111 changes: 111 additions & 0 deletions example/ingest_processor/producer_with_processor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package main

import (
"fmt"
"math/rand"
"os"
"os/signal"
"strconv"
"sync"
"time"

"github.com/aliyun/aliyun-log-go-sdk/producer"
)

// variables you should fill
//
// sample processor spl:
// * | parse-regexp content, '(\S+)\s-\s(\S+)\s\[(\S+)\]\s"(\S+)\s(\S+)\s(\S+)"\s(\d+)\s(\d+)\s(\d+)\s(\d+)\s(\S+)\s(\S+)\s"(.*)"' as remote_addr, remote_user, time_local, request_method, request_uri, http_protocol, request_time, request_length, status, body_bytes_sent, host, referer, user_agent | project-away content
var (
accessKeyId = os.Getenv("ACCESS_KEY_ID")
accessKeySecret = os.Getenv("ACCESS_KEY_SECRET")
endpoint = ""
project = ""
logstore = ""
processor = ""
)

// mock data config
var (
remoteUsers = []string{"Alice", "Bob", "Candy", "David", "Elisa"}
requestMethods = []string{"GET", "POST", "PUT", "DELETE", "HEAD"}
statuses = []string{"200", "301", "302", "400", "401", "403", "500", "501", "502"}
httpProtocol = "HTTP/1.1"
userAgent = "Mozilla/5.0 (Windows NT 5.2; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/13.0.782.41 Safari/535.1"
)

func mockNginxLog() string {
var (
remoteAddr = fmt.Sprintf("192.168.1.%d", rand.Intn(100))
remoteUser = remoteUsers[rand.Intn(len(remoteUsers))]
timeLocal = time.Now().Format(time.RFC3339)
requestMethod = requestMethods[rand.Intn(len(requestMethods))]
requestUri = fmt.Sprintf("/request/path-%d/file-%d", rand.Intn(10), rand.Intn(10))
requestTime = strconv.Itoa(rand.Intn(1000))
requestLength = strconv.Itoa(rand.Intn(100000))
status = statuses[rand.Intn(len(statuses))]
bodyBytesSent = strconv.Itoa(rand.Intn(100000))
host = fmt.Sprintf("www.test%d.com", rand.Intn(10))
referer = fmt.Sprintf("www.test%d.com", rand.Intn(10))
)

content := fmt.Sprintf(
`%s - %s [%s] "%s %s %s" %s %s %s %s %s %s "%s"`,
remoteAddr,
remoteUser,
timeLocal,
requestMethod,
requestUri,
httpProtocol,
requestTime,
requestLength,
status,
bodyBytesSent,
host,
referer,
userAgent,
)
return content
}

func main() {
config := producer.GetDefaultProducerConfig()
config.Endpoint = endpoint
config.AccessKeyID = accessKeyId
config.AccessKeySecret = accessKeySecret
config.GeneratePackId = true
config.Processor = processor

producerInstance, err := producer.NewProducer(config)
if err != nil {
panic(err)
}
producerInstance.Start()

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
log := producer.GenerateLog(
uint32(time.Now().Unix()),
map[string]string{"content": mockNginxLog()},
)
err := producerInstance.SendLog(project, logstore, "producer", "", log)
if err != nil {
fmt.Println(err)
}
}
}()
}
wg.Wait()
fmt.Println("Send completion")

term := make(chan os.Signal)
signal.Notify(term, os.Kill, os.Interrupt)
if _, ok := <-term; ok {
fmt.Println("Get the shutdown signal and start to shut down")
producerInstance.Close(60000)
}
}
103 changes: 103 additions & 0 deletions example/ingest_processor/putlogs_with_processor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package main

import (
"fmt"
"math/rand"
"os"
"strconv"
"time"

sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/gogo/protobuf/proto"
)

// variables you should fill
//
// sample processor spl:
// * | parse-regexp content, '(\S+)\s-\s(\S+)\s\[(\S+)\]\s"(\S+)\s(\S+)\s(\S+)"\s(\d+)\s(\d+)\s(\d+)\s(\d+)\s(\S+)\s(\S+)\s"(.*)"' as remote_addr, remote_user, time_local, request_method, request_uri, http_protocol, request_time, request_length, status, body_bytes_sent, host, referer, user_agent | project-away content
var (
accessKeyId = os.Getenv("ACCESS_KEY_ID")
accessKeySecret = os.Getenv("ACCESS_KEY_SECRET")
endpoint = ""
project = ""
logstore = ""
processor = ""
)

// mock data config
var (
remoteUsers = []string{"Alice", "Bob", "Candy", "David", "Elisa"}
requestMethods = []string{"GET", "POST", "PUT", "DELETE", "HEAD"}
statuses = []string{"200", "301", "302", "400", "401", "403", "500", "501", "502"}
httpProtocol = "HTTP/1.1"
userAgent = "Mozilla/5.0 (Windows NT 5.2; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/13.0.782.41 Safari/535.1"
)

func mockNginxLog() string {
var (
remoteAddr = fmt.Sprintf("192.168.1.%d", rand.Intn(100))
remoteUser = remoteUsers[rand.Intn(len(remoteUsers))]
timeLocal = time.Now().Format(time.RFC3339)
requestMethod = requestMethods[rand.Intn(len(requestMethods))]
requestUri = fmt.Sprintf("/request/path-%d/file-%d", rand.Intn(10), rand.Intn(10))
requestTime = strconv.Itoa(rand.Intn(1000))
requestLength = strconv.Itoa(rand.Intn(100000))
status = statuses[rand.Intn(len(statuses))]
bodyBytesSent = strconv.Itoa(rand.Intn(100000))
host = fmt.Sprintf("www.test%d.com", rand.Intn(10))
referer = fmt.Sprintf("www.test%d.com", rand.Intn(10))
)

content := fmt.Sprintf(
`%s - %s [%s] "%s %s %s" %s %s %s %s %s %s "%s"`,
remoteAddr,
remoteUser,
timeLocal,
requestMethod,
requestUri,
httpProtocol,
requestTime,
requestLength,
status,
bodyBytesSent,
host,
referer,
userAgent,
)
return content
}

func mockLogGroup(lines int, topic string) *sls.LogGroup {
logs := []*sls.Log{}
for i := 0; i < lines; i++ {
log := &sls.Log{
Time: proto.Uint32(uint32(time.Now().Unix())),
Contents: []*sls.LogContent{
&sls.LogContent{
Key: proto.String("content"),
Value: proto.String(mockNginxLog()),
},
},
}
logs = append(logs, log)
}
return &sls.LogGroup{
Topic: proto.String(topic),
Logs: logs,
}
}

func main() {
client := sls.CreateNormalInterface(endpoint, accessKeyId, accessKeySecret, "")

for {
req := &sls.PostLogStoreLogsRequest{
LogGroup: mockLogGroup(10, "PostLogStoreLogsV2"),
Processor: processor,
}
err := client.PostLogStoreLogsV2(project, logstore, req)
fmt.Println(time.Now(), "PostLogStoreLogsV2", err)

time.Sleep(time.Second)
}
}
29 changes: 22 additions & 7 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,21 @@ func (s *LogStore) PutLogs(lg *LogGroup) (err error) {

// PostLogStoreLogs put logs into Shard logstore by hashKey.
// The callers should transform user logs into LogGroup.
func (s *LogStore) PostLogStoreLogs(lg *LogGroup, hashKey *string) (err error) {
if len(lg.Logs) == 0 {
func (s *LogStore) PostLogStoreLogs(req *PostLogStoreLogsRequest) (err error) {
if err = s.SetPutLogCompressType(req.CompressType); err != nil {
return err
}

if req.LogGroup == nil || len(req.LogGroup.Logs) == 0 {
// empty log group or empty hashkey
return nil
}

if hashKey == nil || *hashKey == "" || s.useMetricStoreURL {
// empty hash call PutLogs
return s.PutLogs(lg)
if s.useMetricStoreURL {
return s.PutLogs(req.LogGroup)
}

body, err := proto.Marshal(lg)
body, err := proto.Marshal(req.LogGroup)
if err != nil {
return NewClientError(err)
}
Expand Down Expand Up @@ -409,7 +412,19 @@ func (s *LogStore) PostLogStoreLogs(lg *LogGroup, hashKey *string) (err error) {
outLen = len(out)
}

uri := fmt.Sprintf("/logstores/%v/shards/route?key=%v", s.Name, *hashKey)
var uri = fmt.Sprintf("/logstores/%s", s.Name)
var params = url.Values{}
if req.HashKey != nil && *req.HashKey != "" {
params.Set("key", *req.HashKey)
uri = fmt.Sprintf("/logstores/%s/shards/route", s.Name)
}
if req.Processor != "" {
params.Set("processor", req.Processor)
}
if len(params) > 0 {
uri = fmt.Sprintf("%s?%s", uri, params.Encode())
}

r, err := request(s.project, "POST", uri, h, out[:outLen])
if err != nil {
return NewClientError(err)
Expand Down
1 change: 1 addition & 0 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ type PostLogStoreLogsRequest struct {
LogGroup *LogGroup
HashKey *string
CompressType int
Processor string
}

type StoreView struct {
Expand Down
1 change: 1 addition & 0 deletions producer/io_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch) {
LogGroup: producerBatch.logGroup,
HashKey: producerBatch.getShardHash(),
CompressType: ioWorker.producer.producerConfig.CompressType,
Processor: ioWorker.producer.producerConfig.Processor,
}
err = ioWorker.client.PostLogStoreLogsV2(producerBatch.getProject(), producerBatch.getLogstore(), req)
}
Expand Down
3 changes: 2 additions & 1 deletion producer/producer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type ProducerConfig struct {
AccessKeySecret string // Deprecated: use CredentialsProvider instead
Region string
AuthVersion sls.AuthVersionType
CompressType int // only work for logstore now
CompressType int // only work for logstore now
Processor string // ingest processor
}

func GetDefaultProducerConfig() *ProducerConfig {
Expand Down

0 comments on commit 270ed16

Please sign in to comment.