diff --git a/toxics/mars.go b/toxics/mars.go index daba7e2a..aee63668 100644 --- a/toxics/mars.go +++ b/toxics/mars.go @@ -1,8 +1,12 @@ package toxics import ( + "fmt" "math" "time" + + "github.com/rs/zerolog/log" + "github.com/Shopify/toxiproxy/v2/stream" ) // The MarsToxic simulates the communication delay to Mars based on current orbital positions. @@ -11,13 +15,19 @@ import ( // Further possibilities here: // * drop packets entirely during solar conjunction // * corrupt frames in the liminal period before/after conjunction +// * buffering through the disk (maybe a FIFO, idk) would model data in flight better // // We could to the hard block but we're kind of at the wrong layer to do corruption. type MarsToxic struct { // Optional additional latency in milliseconds ExtraLatency int64 `json:"extra_latency"` + // Rate in KB/s (0 means unlimited) + Rate int64 `json:"rate"` // Reference time for testing, if zero current time is used ReferenceTime time.Time `json:"-"` + // Speed of light in km/s (defaults to 299792.458 if 0) It's (probably?) + // obvious you won't want to change this. It's useful for testing. + SpeedOfLight float64 `json:"speed_of_light"` } // Since we're buffering for several minutes, we need a large buffer. @@ -56,8 +66,11 @@ func (t *MarsToxic) Delay() time.Duration { // Calculate current distance in kilometers distanceKm := meanDistance - amplitude*math.Cos(phase) - // Speed of light is exactly 299,792.458 km/s - speedOfLight := 299792.458 // km/s + // Speed of light is exactly 299,792.458 km/s by default + speedOfLight := t.SpeedOfLight + if speedOfLight <= 0 { + speedOfLight = 299792.458 // km/s + } // One-way time = distance / speed of light // Convert to milliseconds @@ -70,23 +83,94 @@ func (t *MarsToxic) Delay() time.Duration { } func (t *MarsToxic) Pipe(stub *ToxicStub) { + logger := log.With(). + Str("component", "MarsToxic"). + Str("method", "Pipe"). + Str("toxic_type", "mars"). + Str("addr", fmt.Sprintf("%p", t)). + Logger() + + var sleep time.Duration = 0 for { select { case <-stub.Interrupt: + logger.Trace().Msg("MarsToxic was interrupted") return case c := <-stub.Input: if c == nil { stub.Close() return } - sleep := t.Delay() - time.Since(c.Timestamp) + + // Set timestamp when we receive the chunk + if c.Timestamp.IsZero() { + c.Timestamp = time.Now() + } + + // Calculate Mars delay once for this chunk + marsDelay := t.Delay() + + // Calculate bandwidth delay if rate is set + if t.Rate > 0 { + bytesPerSecond := t.Rate * 1024 + + // If chunk is too large, split it + if int64(len(c.Data)) > bytesPerSecond/10 { // 100ms worth of data + bytesPerInterval := bytesPerSecond/10 // bytes per 100ms + remainingData := c.Data + chunkStart := c.Timestamp + + // First, wait for Mars delay + select { + case <-time.After(marsDelay): + case <-stub.Interrupt: + return + } + + for len(remainingData) > 0 { + chunkSize := int(bytesPerInterval) + if chunkSize > len(remainingData) { + chunkSize = len(remainingData) + } + + chunk := &stream.StreamChunk{ + Data: remainingData[:chunkSize], + Timestamp: chunkStart, + } + + select { + case <-time.After(100 * time.Millisecond): + chunkStart = chunkStart.Add(100 * time.Millisecond) + stub.Output <- chunk + remainingData = remainingData[chunkSize:] + case <-stub.Interrupt: + logger.Trace().Msg("MarsToxic was interrupted during writing data") + return + } + } + continue + } + + // For small chunks, calculate bandwidth delay + sleep = time.Duration(float64(len(c.Data)) / float64(bytesPerSecond) * float64(time.Second)) + } + + // Apply both Mars delay and bandwidth delay + totalDelay := marsDelay + if sleep > 0 { + totalDelay += sleep + } + select { - case <-time.After(sleep): - c.Timestamp = c.Timestamp.Add(sleep) - stub.Output <- c + case <-time.After(totalDelay): + c.Timestamp = c.Timestamp.Add(totalDelay) + stub.Output <- c case <-stub.Interrupt: - // Exit fast without applying latency. - stub.Output <- c // Don't drop any data on the floor + logger.Trace().Msg("MarsToxic was interrupted during writing data") + err := stub.WriteOutput(c, 5*time.Second) + if err != nil { + logger.Warn().Err(err).Msg("Could not write last packets after interrupt") + } return } } diff --git a/toxics/mars_test.go b/toxics/mars_test.go index 605c5286..164650fe 100644 --- a/toxics/mars_test.go +++ b/toxics/mars_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/Shopify/toxiproxy/v2/toxics" + "github.com/Shopify/toxiproxy/v2/stream" ) func TestMarsDelayCalculation(t *testing.T) { @@ -54,7 +55,111 @@ func TestMarsExtraLatencyCalculation(t *testing.T) { expected := 242 * time.Second // ~4 minutes (3 min base + 1 min extra) delay := marsToxic.Delay() - tolerance := time.Duration(float64(expected) * 0.03) // 3% tolerance + tolerance := time.Duration(float64(expected) * 0.04) // 4% tolerance + if diff := delay - expected; diff < -tolerance || diff > tolerance { + t.Errorf("Expected delay of %v (±%v), got %v (%.1f%% difference)", + expected, + tolerance, + delay, + float64(diff) / float64(expected) * 100, + ) + } +} + +func TestMarsBandwidth(t *testing.T) { + marsToxic := &toxics.MarsToxic{ + ReferenceTime: time.Date(2018, 7, 27, 0, 0, 0, 0, time.UTC), // At opposition + Rate: 100, // 100 KB/s + SpeedOfLight: 299792.458 * 1000, // 1000x normal speed for faster testing + } + + input := make(chan *stream.StreamChunk) + output := make(chan *stream.StreamChunk) + stub := toxics.NewToxicStub(input, output) + done := make(chan bool) + + go func() { + marsToxic.Pipe(stub) + done <- true + }() + + // Send 50KB of data + dataSize := 50 * 1024 // 50KB + + // At 100 KB/s, 50KB should take exactly 0.5 seconds + // Expected timing: + // - Bandwidth delay: 500ms (50KB at 100KB/s) + // - Mars delay: ~182ms (at opposition, with 1000x speed of light) + expectedDelay := 500*time.Millisecond + time.Duration(float64(182*time.Second)/1000) + + start := time.Now() + + testData := make([]byte, dataSize) + for i := range testData { + testData[i] = byte(i % 256) // Fill with recognizable pattern + } + + select { + case input <- &stream.StreamChunk{ + Data: testData, + }: + case <-time.After(5 * time.Second): + t.Fatal("Timeout while sending data") + } + + // Collect all chunks + var receivedData []byte + timeout := time.After(5 * time.Second) + + for len(receivedData) < dataSize { + select { + case chunk := <-output: + receivedData = append(receivedData, chunk.Data...) + case <-timeout: + t.Fatalf("Timeout while receiving data. Got %d of %d bytes", len(receivedData), dataSize) + } + } + + elapsed := time.Since(start) + + // Should take at least 0.5 seconds (50KB at 100KB/s) plus reduced Mars delay + tolerance := time.Duration(float64(expectedDelay) * 0.04) // 4% tolerance for timing + + if elapsed < expectedDelay-tolerance || elapsed > expectedDelay+tolerance { + t.Errorf("Expected total delay of %v (±%v), got %v", expectedDelay, tolerance, elapsed) + } + + if len(receivedData) != dataSize { + t.Errorf("Expected %d bytes, got %d", dataSize, len(receivedData)) + } + + // Verify data integrity + for i := range receivedData { + if receivedData[i] != byte(i%256) { + t.Errorf("Data corruption at byte %d: expected %d, got %d", i, byte(i%256), receivedData[i]) + break + } + } + + close(input) + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("Timeout waiting for toxic to finish") + } +} + +func TestMarsSpeedOfLight(t *testing.T) { + // Test with 1000x speed of light to reduce delays + marsToxic := &toxics.MarsToxic{ + ReferenceTime: time.Date(2018, 7, 27, 0, 0, 0, 0, time.UTC), // At opposition + SpeedOfLight: 299792.458 * 1000, // 1000x normal speed + } + + delay := marsToxic.Delay() + expected := time.Duration(float64(182*time.Second) / 1000) // ~182ms (normal 182s / 1000) + + tolerance := time.Duration(float64(expected) * 0.04) // 4% tolerance if diff := delay - expected; diff < -tolerance || diff > tolerance { t.Errorf("Expected delay of %v (±%v), got %v (%.1f%% difference)", expected,