Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Restperf should stream results while parsing #3356

Merged
merged 5 commits into from
Dec 6, 2024

Conversation

rahulguptajss
Copy link
Contributor

No description provided.

@rahulguptajss
Copy link
Contributor Author

Sample code for showing memory retention which is ~165MB

package main

import (
	"fmt"
	"io/ioutil"
	"math/rand"
	"os"
	"runtime"
	"time"

	"github.com/tidwall/gjson"
)

type Client struct{}

type PerfRecord struct {
	Records   []gjson.Result
	Timestamp int64
}

func (c *Client) GetRest(href string, headers ...map[string]string) ([]byte, error) {
	// Simulate reading a large JSON file
	return ioutil.ReadFile(href)
}

func FetchRestPerfData(client *Client, href string, perfRecords *[]PerfRecord, batchSize int, headers ...map[string]string) error {
	var prevLink string
	nextLink := href

	for {
		response, err := client.GetRest(nextLink, headers...)
		if err != nil {
			return fmt.Errorf("error making request %w", err)
		}

		// extract returned records since paginated records need to be merged into a single list
		output := gjson.ParseBytes(response)
		data := output.Get("records")
		numRecords := output.Get("num_records")
		next := output.Get("_links.next.href")

		if numRecords.Int() > 0 {
			recordsArray := data.Array()
			for i := 0; i < len(recordsArray); i += batchSize {
				end := i + batchSize
				if end > len(recordsArray) {
					end = len(recordsArray)
				}
				batch := recordsArray[i:end]
				p := PerfRecord{Records: batch, Timestamp: time.Now().UnixNano()}
				*perfRecords = append(*perfRecords, p)
			}
		}

		prevLink = nextLink
		nextLink = next.String()

		if nextLink == "" || nextLink == prevLink {
			// no nextLink or nextLink is the same as the previous link, no progress is being made, exit
			break
		}
	}

	return nil
}

func generateLargeJSONFile(filename string, numRecords int) error {
	file, err := os.Create(filename)
	if err != nil {
		return err
	}
	defer file.Close()

	_, err = file.WriteString(`{"records": [`)
	if err != nil {
		return err
	}

	for i := 0; i < numRecords; i++ {
		record := fmt.Sprintf(`{"id": %d, "value": %d}`, i, rand.Int())
		if i < numRecords-1 {
			record += ","
		}
		_, err = file.WriteString(record)
		if err != nil {
			return err
		}
	}

	_, err = file.WriteString(`], "num_records": ` + fmt.Sprintf("%d", numRecords) + `, "_links": {"next": {"href": ""}}}`)
	return err
}

func main() {
	client := &Client{}
	var perfRecords []PerfRecord

	// Generate a large JSON file
	filename := "large.json"
	numRecords := 1000000 // 1 million records
	err := generateLargeJSONFile(filename, numRecords)
	if err != nil {
		fmt.Println("Error generating large JSON file:", err)
		return
	}
	runtime.GC()
	var m runtime.MemStats

	// Capture memory stats before
	runtime.ReadMemStats(&m)
	fmt.Printf("Memory before: %v KB\n", m.Alloc/1024)

	// Fetch performance data with batching
	batchSize := 50
	err = FetchRestPerfData(client, filename, &perfRecords, batchSize)
	if err != nil {
		fmt.Println("Error:", err)
	}
	keepInMemory(perfRecords)
	runtime.GC()

	runtime.ReadMemStats(&m)
	fmt.Printf("Memory after: %v KB\n", m.Alloc/1024)

	time.Sleep(2 * time.Minute)

	runtime.ReadMemStats(&m)
	fmt.Printf("Memory after processing: %v KB\n", m.Alloc/1024)

	time.Sleep(10 * time.Minute)
}

var perfRecords []PerfRecord

func keepInMemory(p []PerfRecord) {
	perfRecords = p
}

@rahulguptajss
Copy link
Contributor Author

rahulguptajss commented Dec 5, 2024

After streaming ~200KB

package main

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"math/rand"
	"net/http"
	_ "net/http/pprof"
	"runtime"
	"time"

	"github.com/tidwall/gjson"
)

type Client struct{}

type PerfRecord struct {
	Records   []gjson.Result
	Timestamp int64
}

type Page struct {
	Records    []map[string]interface{} `json:"records"`
	NumRecords int                      `json:"num_records"`
	NextLink   string                   `json:"next"`
}

func (c *Client) GetRest(href string, headers ...map[string]string) ([]byte, error) {
	// Simulate reading a paginated JSON file
	return ioutil.ReadFile(href)
}

func FetchRestPerfData(client *Client, href string, batchSize int, headers ...map[string]string) error {
	var prevLink string
	nextLink := href

	for {
		response, err := client.GetRest(nextLink, headers...)
		if err != nil {
			return fmt.Errorf("error making request %w", err)
		}

		// extract returned records since paginated records need to be merged into a single list
		output := gjson.ParseBytes(response)
		data := output.Get("records")
		numRecords := output.Get("num_records")
		next := output.Get("next")

		if numRecords.Int() > 0 {
			recordsArray := data.Array()
			for i := 0; i < len(recordsArray); i += batchSize {
				end := i + batchSize
				if end > len(recordsArray) {
					end = len(recordsArray)
				}

				batch := recordsArray[i:end]
				p := PerfRecord{Records: batch, Timestamp: time.Now().UnixNano()}

				processBatch(p)
			}
		}

		prevLink = nextLink
		nextLink = next.String()

		if nextLink == "" || nextLink == prevLink {
			// no nextLink or nextLink is the same as the previous link, no progress is being made, exit
			break
		}
	}

	return nil
}

func generateLargeJSONFile(filename string, numRecords int, batchSize int) error {
	for i := 0; i < numRecords; i += batchSize {
		end := i + batchSize
		if end > numRecords {
			end = numRecords
		}

		page := Page{
			Records:    make([]map[string]interface{}, end-i),
			NumRecords: end - i,
			NextLink:   "",
		}

		for j := i; j < end; j++ {
			page.Records[j-i] = map[string]interface{}{
				"id":    j,
				"value": rand.Int(),
			}
		}

		if end < numRecords {
			page.NextLink = fmt.Sprintf("page_%d.json", end)
		}

		pageData, err := json.Marshal(page)
		if err != nil {
			return err
		}

		pageFilename := fmt.Sprintf("page_%d.json", i)
		err = ioutil.WriteFile(pageFilename, pageData, 0644)
		if err != nil {
			return err
		}
	}

	return nil
}

func main() {
	client := &Client{}

	// Generate a large JSON file with multiple pages
	numRecords := 1000000 // 1 million records
	batchSize := 50
	err := generateLargeJSONFile("large.json", numRecords, batchSize)
	if err != nil {
		fmt.Println("Error generating large JSON file:", err)
		return
	}

	go func() {
		fmt.Println("Starting pprof server on :6060")
		http.ListenAndServe(":6060", nil)
	}()

	var m runtime.MemStats

	runtime.ReadMemStats(&m)
	fmt.Printf("Memory before: %v KB\n", m.Alloc/1024)

	err = FetchRestPerfData(client, "page_0.json", batchSize)
	if err != nil {
		fmt.Println("Error:", err)
	}

	runtime.GC()

	runtime.ReadMemStats(&m)
	fmt.Printf("Memory after: %v KB\n", m.Alloc/1024)

	time.Sleep(2 * time.Minute)

	runtime.ReadMemStats(&m)
	fmt.Printf("Memory after processing: %v KB\n", m.Alloc/1024)

	time.Sleep(10 * time.Minute)
}

var perf PerfRecord

func processBatch(p PerfRecord) {
	perf = p
}

@rahulguptajss rahulguptajss linked an issue Dec 5, 2024 that may be closed by this pull request
Hardikl
Hardikl previously approved these changes Dec 5, 2024
cmd/tools/rest/rest.go Outdated Show resolved Hide resolved
@rahulguptajss rahulguptajss merged commit f7fdf52 into main Dec 6, 2024
8 checks passed
@rahulguptajss rahulguptajss deleted the rg2-restperf-stream branch December 6, 2024 05:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Restperf should stream results while parsing
3 participants