Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo: add TopicID to the FetchTopic type #794

Merged
merged 1 commit into from
Oct 15, 2024
Merged

kgo: add TopicID to the FetchTopic type #794

merged 1 commit into from
Oct 15, 2024

Conversation

twmb
Copy link
Owner

@twmb twmb commented Jul 29, 2024

Closes #790.

@twmb twmb added the minor label Jul 29, 2024
@@ -1041,6 +1041,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe

fetchTopic := FetchTopic{
Topic: topic,
TopicID: rt.TopicID,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, so if version >= 13, I should be assured that this is set?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's set if the broker returns it -- it's a new field in fetch response v13, yes -- and this will be all 0s if it is unset.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, I assume all 0s is an invalid topic id.

@@ -274,6 +274,9 @@ func (p *FetchPartition) EachRecord(fn func(*Record)) {
type FetchTopic struct {
// Topic is the topic this is for.
Topic string
// TopicID is the ID of the topic, if your cluster supports returning
// topic IDs in fetch responses (Kafka 3.1+).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe add fetch request version? Will make it easier for someone using the API, I think.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

End users generally don't actually know what version of Kafka introduced what version of a request. It's undocumented anywhere -- a person has to first know the version a field was introduced in, then figure out which KIP introduced that version, then figure out which version of Kafka the KIP was actually released in.

Mentioning the version of Kafka that introduced topic IDs in the fetch response circumvents that process and gets directly to the answer, IMO.

Copy link

@Imran-imtiaz48 Imran-imtiaz48 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the addition of the TopicID field enhances the FetchTopic struct, aligning it with newer Kafka versions. However, the initialization of this field with an empty value in EachTopic may need reconsideration based on its actual usage and availability. Ensuring proper initialization and documentation will improve the code’s robustness and clarity.

@twmb
Copy link
Owner Author

twmb commented Jul 31, 2024

Topics do not have all 0 topic IDs, if the cluster supports topic IDs. The user can be sure that a topic ID is present based on the fact it is not all 0s. I use this same comparison myself:

var noID [16]byte
if newTP.cursor.topicID == noID && oldTP.cursor.topicID != noID {
cl.cfg.logger.Log(LogLevelWarn, "metadata update is missing the topic ID when we previously had one, ignoring update",
"topic", topic,
"partition", part,
)
retryWhy.add(topic, int32(part), errMissingTopicID)
continue
}

franz-go/pkg/kgo/source.go

Lines 1895 to 1898 in 6b61d17

var noID [16]byte
if c.topicID == noID {
f.disableIDs = true
}

@twmb
Copy link
Owner Author

twmb commented Oct 15, 2024

Bit of a delay on my side ... releasing this evening.

@twmb twmb merged commit 6028e72 into master Oct 15, 2024
8 checks passed
@twmb twmb deleted the 790 branch October 15, 2024 00:36
ortuman pushed a commit to grafana/franz-go that referenced this pull request Oct 17, 2024
kgo: add TopicID to the FetchTopic type
ortuman added a commit to grafana/franz-go that referenced this pull request Oct 17, 2024
* Fix typo in kgo.Client.ResumeFetchTopics() docs

Signed-off-by: Mihai Todor <todormihai@gmail.com>

* add `NewOffsetFromRecord` helper function

* Fix typo in Record.ProducerID doc comment.

* Don't set nil config when seeding topics in kfake cluster

Setting the configs to `nil` causes it to panic later when trying to alter the topic configs, as it only checks for entry in the map not being present, not for it being nil

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Add Opts method for sr.Client

* Merge pull request twmb#826 from colega/don-t-set-nil-config-when-seeding-topics-in-kfake-cluster

Don't set nil config when seeding topics in kfake cluster

* Merge pull request twmb#821 from seizethedave/davidgrant/producer-doc

Fix typo in Record.ProducerID doc comment.

* Merge pull request twmb#812 from mihaitodor/fix-doc-typo

Fix typo in kgo.Client.ResumeFetchTopics() docs

* kgo: fix potential deadlock when reaching max buffered (records|bytes)

Problem:
* Record A exceeds max, is on path to block
* Record B finishes concurrently
* Record A's context cancels
* Record A's goroutine waiting to be unblocked returns, leaves accounting mutex in locked state
* Record A's select statement chooses context-canceled case, trying to grab the accounting mutex lock

See twmb#831 for more details.

Closes twmb#831.

* all: unlint what is now cropping up

gosec ones are delibate; govet ones are now randomly showing up (and
also deliberate)

* Merge pull request twmb#832 from twmb/831

kgo: fix potential deadlock when reaching max buffered (records|bytes)

* kgo: misc doc update

* kgo: ignore OOOSN where possible

See embedded comment. This preceeds handling KIP-890.

Closes twmb#805.

* kip-890 definitions

A bunch of version bumps to indicate TransactionAbortable is supported
as an error return.

* kip-848 more definitions

Added in Kafka 3.8:
* ListGroups.TypesFilter
* ConsumerGroupDescribe request

* kip-994 proto

Only ListTransactions was modified in 3.8

* sr: add StatusCode to ResponseError, and message if the body is empty

Closes twmb#819.

* generate / kmsg: update GroupMetadata{Key,Value}

Not much changed here.

Closes twmb#804.

* kgo: do not add all topics to internal tps map when regex consuming

The internal tps map is meant to be what we store topicPartitions in
that we are candidates to be consumed. This is filtered in
assignPartitions to only opt-in partitions that are actually being
consumed.

It's not BAD if we store all topics in that map, but it's not the
intent. The rest of the client worked fine even with extra topics in the
map.

When regex consuming, the metadata function previously put all topics
into the map always. Now, we move the regex evaluation logic --
duplicated in both the direct and group consumers -- into one function
and use that for filtering within metadata.

This introduces a required sequence of filtering THEN finding
assignments, which is fine / was the way things operated anyway.

Moving the filtering to metadata (only in the regex consuming logic)
means that we no longer store information for topics we are not
consuming. Indirectly, this fixes a bug where `GetConsumeTopics` would
always return ALL topics when regex consuming, because
`GetConsumeTopics` always just returned what was in the `tps` field.

This adds a test for the fixed behavior, as well as tests that NOT regex
consuming always returns all topics the user is interested in.

Closes twmb#810.

* Merge pull request twmb#833 from twmb/proto-3.8.0

Proto 3.8.0

* kgo: support Kafka 3.8's kip-890 modifications

STILL NOT ALL OF KIP-890, despite what I originally coded.
Kafka 3.8 only added support for TransactionAbortable.
Producers still need to send AddPartitionsToTxn.

* kversion: note kip-848 additions for kafka 3.8

* kversion: note kip-994 added in 3.8, finalize 3.8

* kversion: ignore API keys 74,75 when guessing versions

These are in Kraft only, and are two requests from two separate KIPs
that aren't fully supported yet. Not sure why only these two were
stabilized.

* README: note 3.8 KIPs

* kgo: bump kmsg pinned dep

* Merge pull request twmb#840 from twmb/kafka-3.8.0

Kafka 3.8.0

* Merge pull request twmb#760 from twmb/753

kgo: add AllowRebalance and CloseAllowingRebalance to GroupTransactSession

* Merge pull request twmb#789 from sbuliarca/errgroupsession-export-err

kgo: export the wrapped error from ErrGroupSession

* Merge pull request twmb#794 from twmb/790

kgo: add TopicID to the FetchTopic type

* Merge pull request twmb#814 from noamcohen97/new-offset-helper

kadm: add `NewOffsetFromRecord` helper function

* Merge pull request twmb#829 from andrewstucki/sr-client-opts

Add Opts method for sr.Client

* Merge pull request twmb#834 from twmb/805

kgo: ignore OOOSN where possible

* Merge pull request twmb#835 from twmb/819

sr: add StatusCode to ResponseError, and message if the body is empty

* Merge pull request twmb#838 from twmb/810

kgo: do not add all topics to internal tps map when regex consuming

* CHANGELOG: note incoming release

* Merge pull request twmb#841 from twmb/1.18-changelog

CHANGELOG: note incoming release

* pkg/sr: require go 1.22

No real reason, no real reason not to. This also allows one commit after
the top level franz tag.

* Merge pull request twmb#842 from twmb/sr-1.22

pkg/sr: require go 1.22

* pkg/kadm: bump go deps

* Merge pull request twmb#843 from twmb/kadm

pkg/kadm: bump go deps

---------

Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
Co-authored-by: Mihai Todor <todormihai@gmail.com>
Co-authored-by: Noam Cohen <noam@noam.me>
Co-authored-by: David Grant <seizethedave@gmail.com>
Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com>
Co-authored-by: Andrew Stucki <andrew.stucki@redpanda.com>
Co-authored-by: Travis Bischel <travis.bischel+github@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

fetch using topic id
3 participants