Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sylwiaszunejko committed Oct 29, 2024
1 parent f62fc6e commit 4f62762
Show file tree
Hide file tree
Showing 11 changed files with 722 additions and 9 deletions.
222 changes: 222 additions & 0 deletions dialer/recorder/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package recorder

import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"os"
"path"
"time"

"github.com/gocql/gocql"
"github.com/gocql/gocql/dialer"
)

func NewRecordDialer(dir string) *RecordDialer {
return &RecordDialer{
dir: dir,
}
}

type RecordDialer struct {
dir string
net.Dialer
}

func (d *RecordDialer) DialContext(ctx context.Context, network, addr string) (conn net.Conn, err error) {
fmt.Println("Dial Context Record Dialer")
sourcePort := gocql.ScyllaGetSourcePort(ctx)
fmt.Println("Source port: ", sourcePort)
// if sourcePort == 0 {
// return d.Dialer.DialContext(ctx, network, addr)
// }
dialerWithLocalAddr := d.Dialer
dialerWithLocalAddr.LocalAddr, err = net.ResolveTCPAddr(network, fmt.Sprintf(":%d", sourcePort))
if err != nil {
fmt.Println(err)
return nil, err
}

conn, err = dialerWithLocalAddr.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}

return NewConnectionRecorder(path.Join(d.dir, fmt.Sprintf("%s-%d", addr, sourcePort)), conn)
}

func NewConnectionRecorder(fname string, conn net.Conn) (net.Conn, error) {
fd_writes, err := os.OpenFile(fname+"Writes", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
fd_reads, err2 := os.OpenFile(fname+"Reads", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err2 != nil {
return nil, err2
}
return &ConnectionRecorder{fd_writes: fd_writes, fd_reads: fd_reads, orig: conn, write_record: &RecordWriter{new: true}, read_record: &RecordWriter{new: true}}, nil
}

type RecordWriter struct {
new bool
to_record int
record map[string]interface{}
}

type ConnectionRecorder struct {
fd_writes *os.File
fd_reads *os.File
orig net.Conn
read_record *RecordWriter
write_record *RecordWriter
}

func (c *ConnectionRecorder) Read(b []byte) (n int, err error) {
n, err = c.orig.Read(b)
if err != nil && err != io.EOF {
return n, err
}

if c.read_record.new {
p := 4
stream_id := int(b[2])
if b[0] > 0x02 {
p = 5
stream_id = int(b[2])<<8 | int(b[3])
}

c.read_record.record = make(map[string]interface{})
c.read_record.to_record = int(b[p+0])<<24 | int(b[p+1])<<16 | int(b[p+2])<<8 | int(b[p+3])
c.read_record.record["stream_id"] = stream_id
b_copy := make([]byte, n)
_ = copy(b_copy, b[:n])
c.read_record.record["data"] = b_copy

if n < c.read_record.to_record {
c.read_record.new = false
c.read_record.to_record = c.read_record.to_record - n
} else {
// Write JSON record to file
jsonData, marshalErr := json.Marshal(c.read_record.record)
if marshalErr != nil {
return n, fmt.Errorf("failed to encode JSON record: %w", marshalErr)
}
_, writeErr := c.fd_reads.Write(append(jsonData, '\n'))
if writeErr != nil {
return n, fmt.Errorf("failed to record read: %w", writeErr)
}
}

return n, err
}

if currentData, ok := c.read_record.record["data"].([]byte); ok {
c.read_record.record["data"] = append(currentData, b[:n]...)
} else {
return n, err
}

c.read_record.to_record = c.read_record.to_record - n
if c.read_record.to_record <= 0 {
c.read_record.new = true
// Write JSON record to file
jsonData, marshalErr := json.Marshal(c.read_record.record)
if marshalErr != nil {
return n, fmt.Errorf("failed to encode JSON record: %w", marshalErr)
}
_, writeErr := c.fd_reads.Write(append(jsonData, '\n'))
if writeErr != nil {
return n, fmt.Errorf("failed to record read: %w", writeErr)
}
}

return n, err
}

func (c *ConnectionRecorder) Write(b []byte) (n int, err error) {
n, err = c.orig.Write(b)

if c.write_record.new {
p := 4
stream_id := int(b[2])
if b[0] > 0x02 {
p = 5
stream_id = int(b[2])<<8 | int(b[3])
}

c.write_record.record = make(map[string]interface{})
c.write_record.to_record = int(b[p+0])<<24 | int(b[p+1])<<16 | int(b[p+2])<<8 | int(b[p+3])
c.write_record.record["stream_id"] = stream_id
b_copy := make([]byte, n)
_ = copy(b_copy, b[:n])
c.write_record.record["data"] = b_copy

if n < c.write_record.to_record {
c.write_record.new = false
c.write_record.to_record = c.write_record.to_record - n
} else {
// Write JSON record to file
c.write_record.record["hash"] = dialer.GetFrameHash(c.write_record.record["data"].([]byte))
jsonData, marshalErr := json.Marshal(c.write_record.record)
if marshalErr != nil {
return n, fmt.Errorf("failed to encode JSON record: %w", marshalErr)
}
_, writeErr := c.fd_writes.Write(append(jsonData, '\n'))
if writeErr != nil {
return n, fmt.Errorf("failed to record write: %w", writeErr)
}
}

return n, err
}

c.write_record.record["data"] = append(c.write_record.record["data"].([]byte), b[:n]...)

c.write_record.to_record = c.write_record.to_record - n
if c.write_record.to_record <= 0 {
c.write_record.new = true
// Write JSON record to file
c.write_record.record["hash"] = dialer.GetFrameHash(c.write_record.record["data"].([]byte))
jsonData, marshalErr := json.Marshal(c.write_record.record)
if marshalErr != nil {
return n, fmt.Errorf("failed to encode JSON record: %w", marshalErr)
}
_, writeErr := c.fd_writes.Write(append(jsonData, '\n'))
if writeErr != nil {
return n, fmt.Errorf("failed to record write: %w", writeErr)
}
}
return n, err
}

func (c ConnectionRecorder) Close() error {
if err := c.fd_writes.Close(); err != nil {
return fmt.Errorf("failed to close the file: %w", err)
}
if err := c.fd_reads.Close(); err != nil {
return fmt.Errorf("failed to close the file: %w", err)
}
return c.orig.Close()
}

func (c ConnectionRecorder) LocalAddr() net.Addr {
return c.orig.LocalAddr()
}

func (c ConnectionRecorder) RemoteAddr() net.Addr {
return c.orig.RemoteAddr()
}

func (c ConnectionRecorder) SetDeadline(t time.Time) error {
return c.orig.SetDeadline(t)
}

func (c ConnectionRecorder) SetReadDeadline(t time.Time) error {
return c.orig.SetReadDeadline(t)
}

func (c ConnectionRecorder) SetWriteDeadline(t time.Time) error {
return c.orig.SetWriteDeadline(t)
}
Loading

0 comments on commit 4f62762

Please sign in to comment.