Skip to content

Commit

Permalink
Update to AssignOffsets function to avoid purge of current assignment (
Browse files Browse the repository at this point in the history
…#74)

* Addition of support for AssignOffsets function to augment or modify a current list of assignments

* Update to add additional checking for calls to AssignOffsets with items that already exist or do not

* update to reflect requested change for removing current assignment

Co-authored-by: Conor McCarthy <conormccarthy@brainpool1.mynet>
Co-authored-by: mshimizu-kx <40049399+mshimizu-kx@users.noreply.github.com>
Co-authored-by: Conor McCarthy <conormccarthy@brainpool1.local>
  • Loading branch information
4 people authored Dec 7, 2020
1 parent 19b51e1 commit 932ebed
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
6 changes: 4 additions & 2 deletions kfk.c
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ EXP K1(kfkUnsub){
}

// https://github.com/edenhill/librdkafka/wiki/Manually-setting-the-consumer-start-offset
EXP K3(kfkAssignOffsets){
EXP K3(kfkassignOffsets){
rd_kafka_t *rk;
rd_kafka_topic_partition_list_t *t_partition;
rd_kafka_resp_err_t err;
Expand All @@ -559,7 +559,9 @@ EXP K3(kfkAssignOffsets){
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
t_partition = rd_kafka_topic_partition_list_new(z->n);
// retrieve the current assignment
if(KFK_OK != (err=rd_kafka_assignment(rk, &t_partition)))
return krr((S)rd_kafka_err2str(err));
plistoffsetdict(y->s,z,t_partition);
if(KFK_OK != (err=rd_kafka_assign(rk,t_partition)))
return krr((S) rd_kafka_err2str(err));
Expand Down
20 changes: 18 additions & 2 deletions kfk.q
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ funcs:(
(`kfkPositionOffsets;3);
// .kfk.CommittedOffsets[client_id:i;topic:s;partition_offsets:I!J]:partition_offsets
(`kfkCommittedOffsets;3);
// .kfk.AssignOffsets[client_id:i;topic:s;partition_offsets:I!J]:()
(`kfkAssignOffsets;3);
// .kfk.assignOffsets[client_id:i;topic:s;partition_offsets:I!J]:()
(`kfkassignOffsets;3);
// .kfk.Threadcount[]:i
(`kfkThreadCount;1);
// .kfk.VersionSym[]:s
Expand Down Expand Up @@ -185,6 +185,22 @@ ClientMemberId:{[cid]

// Assignment API logic

// Run assignment of a topic to a particular partition making
/* cid = Integer denoting the client ID
/* top = Topic to be subscribed to as a symbol
/* partoff = Dictionary mapping integer partition to long offset location
AssignOffsets:{[cid;top;partoff]
toppar:(count[partoff]#top)!key partoff;
tplist:distinct(,'/)(key;{"j"$value x})@\:toppar;
// Find locations where the current assigment needs to be overwritten
loc:where i.compAssign[cid;tplist];
if[count loc;
currentAssign:(!). flip tplist loc;
AssignDel[cid;currentAssign]
];
assignOffsets[cid;top;partoff]
}

// Assign a new topic-partition dictionary to be consumed by a designated clientid
/* cid = Integer denoting client ID
/* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition
Expand Down

0 comments on commit 932ebed

Please sign in to comment.