Skip to content

Commit

Permalink
Add python support in integration tests (#9)
Browse files Browse the repository at this point in the history
add python support
  • Loading branch information
maxday authored Jun 22, 2021
1 parent f9ad6c9 commit 63ab3a9
Show file tree
Hide file tree
Showing 29 changed files with 514 additions and 1,204 deletions.
7 changes: 2 additions & 5 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
push:
paths:
- 'integration_tests/**'
- '.github/**'

jobs:

Expand Down Expand Up @@ -47,18 +48,14 @@ jobs:
uses: actions/checkout@v2
with:
repository: DataDog/datadog-agent
#ref: refs/heads/release/lambda-extension-v9
ref: refs/heads/maxday/integration-test # todo : change after merge of current PRs
ref: refs/heads/release/lambda-extension-v9 # todo : change after merge of current PRs
path: "datadog-agent"

- name: Build the layer
run: ./scripts/build_binary_and_layer.sh


- name: Run tests
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
# Todo : retrieve this layer version automatically from AWS
NODE_LAYER_VERSION: 55
run: cp -R ../.layers . && ./integration_tests/run.sh
3 changes: 0 additions & 3 deletions integration_tests/recorder-extension/a_recorder

This file was deleted.

9 changes: 9 additions & 0 deletions integration_tests/recorder-extension/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module datadog-lambda-extension/recorder-extension

go 1.14

require (
github.com/DataDog/agent-payload v4.73.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gorilla/mux v1.8.0 // indirect
)
35 changes: 35 additions & 0 deletions integration_tests/recorder-extension/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
github.com/DataDog/agent-payload v4.73.0+incompatible h1:SnHWa/x6fkyEw0ZnlWZOeEfE6caBQtviwoaJx2Rvuy4=
github.com/DataDog/agent-payload v4.73.0+incompatible/go.mod h1:/2RW4IC/2z54jtB6RLgq5UtVI1TsX0joDRjKbkLT+mk=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
264 changes: 264 additions & 0 deletions integration_tests/recorder-extension/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

// Some parts of this file are taken from : https://github.com/aws-samples/aws-lambda-extensions/tree/main/go-example-extension

package main

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/signal"
"sort"
"strings"
"syscall"
"time"

"github.com/DataDog/agent-payload/gogen"
)

const extensionName = "recorder-extension" // extension name has to match the filename
var extensionClient = NewClient(os.Getenv("AWS_LAMBDA_RUNTIME_API"))

func main() {
ctx, cancel := context.WithCancel(context.Background())

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-sigs
cancel()
}()

err := extensionClient.Register(ctx, extensionName)
if err != nil {
panic(err)
}

// port 8080 is used by the Lambda Invoke API
port := "3333"
Start(port)

// Will block until shutdown event is received or cancelled via the context.
processEvents(ctx)
}

func processEvents(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
res, err := extensionClient.NextEvent(ctx)
if err != nil {
return
}
if res.EventType == Shutdown {
time.Sleep(1900 * time.Millisecond)
return
}
}
}
}

// JSON representation of a message.
type jsonServerlessPayload struct {
Message jsonServerlessMessage `json:"message"`
Status string `json:"status"`
Timestamp int64 `json:"timestamp"`
Hostname string `json:"hostname"`
Service string `json:"service"`
Source string `json:"ddsource"`
Tags string `json:"ddtags"`
}

type jsonServerlessMessage struct {
Message string `json:"message"`
Lambda *jsonServerlessLambda `json:"lambda,omitempty"`
}

type jsonServerlessLambda struct {
ARN string `json:"arn"`
RequestID string `json:"request_id,omitempty"`
}

// NextEventResponse is the response for /event/next
type NextEventResponse struct {
EventType EventType `json:"eventType"`
}

// EventType represents the type of events recieved from /event/next
type EventType string

const (
// Shutdown is a shutdown event for the environment
Shutdown EventType = "SHUTDOWN"

extensionNameHeader = "Lambda-Extension-Name"
extensionIdentiferHeader = "Lambda-Extension-Identifier"
)

// Client is a simple client for the Lambda Extensions API
type Client struct {
baseURL string
httpClient *http.Client
extensionID string
}

// NewClient returns a Lambda Extensions API client
func NewClient(awsLambdaRuntimeAPI string) *Client {
baseURL := fmt.Sprintf("http://%s/2020-01-01/extension", awsLambdaRuntimeAPI)
return &Client{
baseURL: baseURL,
httpClient: &http.Client{},
}
}

// Register will register the extension with the Extensions API
func (e *Client) Register(ctx context.Context, filename string) error {
const action = "/register"
url := e.baseURL + action

reqBody, err := json.Marshal(map[string]interface{}{
"events": []EventType{Shutdown},
})
if err != nil {
return err
}
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(reqBody))
if err != nil {
return err
}
httpReq.Header.Set(extensionNameHeader, filename)
httpRes, err := e.httpClient.Do(httpReq)
if err != nil {
return err
}
if httpRes.StatusCode != 200 {
return fmt.Errorf("request failed with status %s", httpRes.Status)
}
defer httpRes.Body.Close()
e.extensionID = httpRes.Header.Get(extensionIdentiferHeader)
return nil
}

// NextEvent blocks while long polling for the next lambda invoke or shutdown
func (e *Client) NextEvent(ctx context.Context) (*NextEventResponse, error) {
const action = "/event/next"
url := e.baseURL + action

httpReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
httpReq.Header.Set(extensionIdentiferHeader, e.extensionID)
httpRes, err := e.httpClient.Do(httpReq)
if err != nil {
return nil, err
}
if httpRes.StatusCode != 200 {
return nil, fmt.Errorf("request failed with status %s", httpRes.Status)
}
defer httpRes.Body.Close()
body, err := ioutil.ReadAll(httpRes.Body)
if err != nil {
return nil, err
}
res := NextEventResponse{}
err = json.Unmarshal(body, &res)
if err != nil {
return nil, err
}
return &res, nil
}

// Start is starting the http server to receive logs, traces and metrics
func Start(port string) {
go startHTTPServer(port)
}

func startHTTPServer(port string) {
http.HandleFunc("/api/beta/sketches", func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Printf("Error while reading HTTP request body: %s \n", err)
return
}
pl := new(gogen.SketchPayload)
if err := pl.Unmarshal(body); err != nil {
fmt.Printf("Error while unmarshalling sketches %s \n", err)
return
}

for _, sketch := range pl.Sketches {
jsonSketch, err := json.Marshal(sketch)
if err != nil {
fmt.Printf("Error while JSON encoding the sketch")
}
fmt.Printf("[sketch] %s \n", string(jsonSketch))
}
})

http.HandleFunc("/v1/input", func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
decompressedBody, err := decompress(body)
if err != nil {
return
}
var messages []jsonServerlessPayload
if err := json.Unmarshal(decompressedBody, &messages); err != nil {
return
}
for _, log := range messages {
sortedTags := strings.Split(log.Tags, ",")
sort.Strings(sortedTags)
log.Tags = strings.Join(sortedTags, ",")
jsonLog, err := json.Marshal(log)
if err != nil {
fmt.Printf("Error while JSON encoding the Log")
}
stringJsonLog := string(jsonLog)
// if we log an unwanted log, it will be available in the next log api payload -> infinite loop
if !strings.Contains(stringJsonLog, "[log]") && !strings.Contains(stringJsonLog, "[metric]") {
fmt.Printf("[log] %s\n", stringJsonLog)
}
}
})

http.HandleFunc("/api/v1/series", func(w http.ResponseWriter, r *http.Request) {
})

http.HandleFunc("/api/v1/check_run", func(w http.ResponseWriter, r *http.Request) {
})

err := http.ListenAndServe(":"+port, nil)
if err != nil {
panic(err)
}
}

func decompress(payload []byte) ([]byte, error) {
reader, err := gzip.NewReader(bytes.NewReader(payload))
if err != nil {
return nil, err
}

var buffer bytes.Buffer
_, err = buffer.ReadFrom(reader)
if err != nil {
return nil, err
}

return buffer.Bytes(), nil
}
10 changes: 0 additions & 10 deletions integration_tests/recorder-extension/package.json

This file was deleted.

Loading

0 comments on commit 63ab3a9

Please sign in to comment.