Skip to content

Commit

Permalink
(segmentio#1308) Rename *Id -> *ID, following Go naming convention.
Browse files Browse the repository at this point in the history
  • Loading branch information
Konstantin Ivanov committed Dec 20, 2024
1 parent fa69231 commit a35bd8d
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 27 deletions.
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (batch *Batch) ReadMessage() (Message, error) {
msg.Time = makeTime(timestamp)
msg.Headers = headers
if batch.conn != nil {
msg.GenerationId = batch.conn.generationId
msg.GenerationID = batch.conn.generationID
}

return msg, err
Expand Down
4 changes: 2 additions & 2 deletions commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type commit struct {
topic string
partition int
offset int64
generationId int32
generationID int32
}

// makeCommit builds a commit value from a message, the resulting commit takes
Expand All @@ -16,7 +16,7 @@ func makeCommit(msg Message) commit {
topic: msg.Topic,
partition: msg.Partition,
offset: msg.Offset + 1,
generationId: msg.GenerationId,
generationID: msg.GenerationID,
}
}

Expand Down
10 changes: 5 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (
"time"
)

const undefinedGenerationID int32 = -1

var (
errInvalidWriteTopic = errors.New("writes must NOT set Topic on kafka.Message")
errInvalidWritePartition = errors.New("writes must NOT set Partition on kafka.Message")
)

const undefinedGenerationId int32 = -1

// Conn represents a connection to a kafka broker.
//
// Instances of Conn are safe to use concurrently from multiple goroutines.
Expand Down Expand Up @@ -68,7 +68,7 @@ type Conn struct {

transactionalID *string

generationId int32
generationID int32
}

type apiVersionMap map[apiKey]ApiVersion
Expand Down Expand Up @@ -186,7 +186,7 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
offset: FirstOffset,
requiredAcks: -1,
transactionalID: emptyToNullable(config.TransactionalID),
generationId: undefinedGenerationId,
generationID: undefinedGenerationID,
}

c.wb.w = &c.wbuf
Expand Down Expand Up @@ -393,7 +393,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
return joinGroupResponseV1{}, Error(response.ErrorCode)
}

c.generationId = response.GenerationID
c.generationID = response.GenerationID
return response, nil
}

Expand Down
2 changes: 1 addition & 1 deletion message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Message struct {

// If the message has been sent by a consumer group, it contains the
// generation's id. Value is -1 if not using consumer groups.
GenerationId int32
GenerationID int32

// This field is used to hold arbitrary data you wish to include, so it
// will be available when handle it on the Writer's `Completion` method,
Expand Down
18 changes: 9 additions & 9 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (r *Reader) unsubscribe() {
// another consumer to avoid such a race.
}

func (r *Reader) subscribe(generationId int32, allAssignments map[string][]PartitionAssignment) {
func (r *Reader) subscribe(generationID int32, allAssignments map[string][]PartitionAssignment) {
offsets := make(map[topicPartition]int64)
for topic, assignments := range allAssignments {
for _, assignment := range assignments {
Expand All @@ -134,7 +134,7 @@ func (r *Reader) subscribe(generationId int32, allAssignments map[string][]Parti
}

r.mutex.Lock()
r.start(generationId, offsets)
r.start(generationID, offsets)
r.mutex.Unlock()

r.withLogger(func(l Logger) {
Expand Down Expand Up @@ -215,7 +215,7 @@ func (o offsetStash) merge(commits []commit) {
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset.offset {
offsetsByPartition[c.partition] = offsetEntry{
offset: c.offset,
generationID: c.generationId,
generationID: c.generationID,
}
}
}
Expand Down Expand Up @@ -874,7 +874,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
r.mutex.Lock()

if !r.closed && r.version == 0 {
r.start(undefinedGenerationId, r.getTopicPartitionOffset())
r.start(undefinedGenerationID, r.getTopicPartitionOffset())
}

version := r.version
Expand Down Expand Up @@ -1095,7 +1095,7 @@ func (r *Reader) SetOffset(offset int64) error {
r.offset = offset

if r.version != 0 {
r.start(undefinedGenerationId, r.getTopicPartitionOffset())
r.start(undefinedGenerationID, r.getTopicPartitionOffset())
}

r.activateReadLag()
Expand Down Expand Up @@ -1233,7 +1233,7 @@ func (r *Reader) readLag(ctx context.Context) {
}
}

func (r *Reader) start(generationId int32, offsetsByPartition map[topicPartition]int64) {
func (r *Reader) start(generationID int32, offsetsByPartition map[topicPartition]int64) {
if r.closed {
// don't start child reader if parent Reader is closed
return
Expand Down Expand Up @@ -1271,7 +1271,7 @@ func (r *Reader) start(generationId int32, offsetsByPartition map[topicPartition

// backwards-compatibility flags
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
}).run(ctx, generationId, offset)
}).run(ctx, generationID, offset)
}(ctx, key, offset, &r.join)
}
}
Expand Down Expand Up @@ -1308,7 +1308,7 @@ type readerMessage struct {
error error
}

func (r *reader) run(ctx context.Context, generationId int32, offset int64) {
func (r *reader) run(ctx context.Context, generationID int32, offset int64) {
// This is the reader's main loop, it only ends if the context is canceled
// and will keep attempting to reader messages otherwise.
//
Expand Down Expand Up @@ -1361,7 +1361,7 @@ func (r *reader) run(ctx context.Context, generationId int32, offset int64) {
}
continue
}
conn.generationId = generationId
conn.generationID = generationID

// Resetting the attempt counter ensures that if a failure occurs after
// a successful initialization we don't keep increasing the backoff
Expand Down
18 changes: 9 additions & 9 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1462,44 +1462,44 @@ func TestCommitOffsetsWithRetry(t *testing.T) {
Offsets offsetStash
Config ReaderConfig
ExpectedOffsets offsetStash
GenerationId int32
GenerationID int32
}{
"happy path": {
Invocations: 1,
Offsets: offsets(),
ExpectedOffsets: offsets(),
GenerationId: 1,
GenerationID: 1,
},
"1 retry": {
Fails: 1,
Invocations: 2,
Offsets: offsets(),
ExpectedOffsets: offsets(),
GenerationId: 1,
GenerationID: 1,
},
"out of retries": {
Fails: defaultCommitRetries + 1,
Invocations: defaultCommitRetries,
HasError: true,
Offsets: offsets(),
ExpectedOffsets: offsets(),
GenerationId: 1,
GenerationID: 1,
},
"illegal generation error only 1 generation": {
Fails: 1,
Invocations: 1,
Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 1}}},
ExpectedOffsets: offsetStash{},
Config: ReaderConfig{ErrorOnWrongGenerationCommit: false},
GenerationId: 2,
GenerationID: 2,
},
"illegal generation error only 2 generations": {
Fails: 1,
Invocations: 1,
Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 2}}},
ExpectedOffsets: offsetStash{"topic": {1: {0, 2}}},
Config: ReaderConfig{ErrorOnWrongGenerationCommit: false},
GenerationId: 2,
GenerationID: 2,
},
"illegal generation error only 1 generation - error propagation": {
Fails: 1,
Expand All @@ -1508,7 +1508,7 @@ func TestCommitOffsetsWithRetry(t *testing.T) {
ExpectedOffsets: offsetStash{},
Config: ReaderConfig{ErrorOnWrongGenerationCommit: true},
HasError: true,
GenerationId: 2,
GenerationID: 2,
},
"illegal generation error only 2 generations - error propagation": {
Fails: 1,
Expand All @@ -1517,7 +1517,7 @@ func TestCommitOffsetsWithRetry(t *testing.T) {
ExpectedOffsets: offsetStash{"topic": {1: {0, 2}}},
Config: ReaderConfig{ErrorOnWrongGenerationCommit: true},
HasError: true,
GenerationId: 2,
GenerationID: 2,
},
}

Expand All @@ -1530,7 +1530,7 @@ func TestCommitOffsetsWithRetry(t *testing.T) {
offsetCommitFunc: func(r offsetCommitRequestV2) (offsetCommitResponseV2, error) {
requests = append(requests, r)
count++
if r.GenerationID != test.GenerationId {
if r.GenerationID != test.GenerationID {
return offsetCommitResponseV2{}, IllegalGeneration
}
if count <= test.Fails {
Expand Down

0 comments on commit a35bd8d

Please sign in to comment.