Skip to content

Commit

Permalink
make StreamerV2 itself an io.Reader
Browse files Browse the repository at this point in the history
  • Loading branch information
hayeah committed May 18, 2024
1 parent aa1d9b0 commit 67fe23e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 45 deletions.
75 changes: 31 additions & 44 deletions stream_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,47 +49,6 @@ type DeltaImageURL struct {
Detail string `json:"detail"`
}

// streamTextReader is an io.Reader of the text deltas of thread.message.delta events
type streamTextReader struct {
streamer *StreamerV2
buffer []byte
}

func newStreamTextReader(streamer *StreamerV2) io.Reader {
return &streamTextReader{
streamer: streamer,
}
}

func (r *streamTextReader) Read(p []byte) (int, error) {
// If we have data in the buffer, copy it to p first.
if len(r.buffer) > 0 {
n := copy(p, r.buffer)
r.buffer = r.buffer[n:]
return n, nil
}

for r.streamer.Next() {
// Read only text deltas
text, ok := r.streamer.MessageDeltaText()
if !ok {
continue
}

r.buffer = []byte(text)
n := copy(p, r.buffer)
r.buffer = r.buffer[n:]
return n, nil
}

// Check for streamer error
if err := r.streamer.Err(); err != nil {
return 0, err
}

return 0, io.EOF
}

func NewStreamerV2(r io.Reader) *StreamerV2 {
var rc io.ReadCloser

Expand All @@ -111,6 +70,9 @@ type StreamerV2 struct {

scanner *SSEScanner
next any

// buffer for implementing io.Reader
buffer []byte
}

// Close closes the underlying io.ReadCloser

Check failure on line 78 in stream_v2.go

View workflow job for this annotation

GitHub Actions / Sanity check

Comment should end in a period (godot)
Expand Down Expand Up @@ -144,9 +106,34 @@ func (s *StreamerV2) Next() bool {
return true
}

// Reader returns io.Reader of the text deltas of thread.message.delta events
func (s *StreamerV2) Reader() io.Reader {
return newStreamTextReader(s)
// Read implements io.Reader of the text deltas of thread.message.delta events

Check failure on line 109 in stream_v2.go

View workflow job for this annotation

GitHub Actions / Sanity check

Comment should end in a period (godot)
func (r *StreamerV2) Read(p []byte) (int, error) {

Check warning on line 110 in stream_v2.go

View workflow job for this annotation

GitHub Actions / Sanity check

receiver-naming: receiver name r should be consistent with previous receiver name s for StreamerV2 (revive)
// If we have data in the buffer, copy it to p first.
if len(r.buffer) > 0 {
n := copy(p, r.buffer)
r.buffer = r.buffer[n:]
return n, nil
}

for r.Next() {
// Read only text deltas
text, ok := r.MessageDeltaText()
if !ok {
continue
}

r.buffer = []byte(text)
n := copy(p, r.buffer)
r.buffer = r.buffer[n:]
return n, nil
}

// Check for streamer error
if err := r.Err(); err != nil {
return 0, err
}

return 0, io.EOF
}

func (s *StreamerV2) Event() any {
Expand Down
2 changes: 1 addition & 1 deletion stream_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ data: {"id":"msg_KFiZxHhXYQo6cGFnGjRDHSee","object":"thread.message.delta","delt
event: done
data: [DONE]
`
reader := NewStreamerV2(strings.NewReader(raw)).Reader()
reader := NewStreamerV2(strings.NewReader(raw))

expected := "helloworld"
buffer := make([]byte, len(expected))
Expand Down

0 comments on commit 67fe23e

Please sign in to comment.