Skip to content

Commit

Permalink
Removal of kfkPartitionAvailable in line with https://docs.confluent.…
Browse files Browse the repository at this point in the history
…io/2.0.0/clients/librdkafka/rdkafka_8h.html#ad24c6cc7f37271e292f8105c64d77758 , handling of longs and shorts for offset function lists
  • Loading branch information
cmccarthy1 committed Sep 20, 2019
1 parent a361b7d commit b28af06
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 19 deletions.
17 changes: 2 additions & 15 deletions kfk.c
Original file line number Diff line number Diff line change
Expand Up @@ -560,25 +560,12 @@ EXP K1(kfkOutQLen){
return ki(rd_kafka_outq_len(rk));
}

EXP K2(kfkPartitionAvailable){
rd_kafka_topic_t *rkt;
I qy=0;
if(!checkType("[hij]",y))
return KNL;
if(!(rkt=topicIndex(x)))
return KNL;
SW(y->t){
CS(-KH,qy=y->h);
CS(-KI,qy=y->i);
CS(-KJ,qy=y->j);
}
return kb(rd_kafka_topic_partition_available(rkt, qy));
}

// logger level is set based on Severity levels in syslog https://en.wikipedia.org/wiki/Syslog#Severity_level
EXP K2(kfkSetLoggerLevel){
rd_kafka_t *rk;
I qy=0;
if(!checkType("i[hij]",x,y))
return KNL;
if(!(rk=clientIndex(x)))
return KNL;
SW(y->t){
Expand Down
8 changes: 4 additions & 4 deletions kfk.q
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ funcs:(
(`kfkCommittedOffsets;3);
// .kfk.AssignOffsets[client_id:i;topic:s;partition_offsets:I!J]:()
(`kfkAssignOffsets;3);
// .kfk.PartitionAvailable[topic_id:i]:i
(`kfkPartitionAvailable;2);
// .kfk.Threadcount[]:i
(`kfkThreadCount;1);
// .kfk.VersionSym[]:s
Expand All @@ -67,9 +65,11 @@ Version:Version[];
// Table with all errors return by kafka with codes and description
Errors:ExportErr[];

// projection function for handling int/long lists of partitions for offset functions
osetp:{[cf;x;y;z]cf[x;y;$[99h=type z;z;("i"$z,())!count[z]#0]]}
// Allow Offset functionality to take topics as a list in z argument
CommittedOffsets:{[cf;x;y;z]cf[x;y;$[99h=type z;z;(z,())!count[z]#0]]}CommittedOffsets
PositionOffsets:{[cf;x;y;z]cf[x;y;$[99h=type z;z;(z,())!count[z]#0]]}PositionOffsets
CommittedOffsets:osetp[CommittedOffsets;;]
PositionOffsets :osetp[PositionOffsets;;]

// Unassigned partition.
// The unassigned partition is used by the producer API for messages
Expand Down

0 comments on commit b28af06

Please sign in to comment.