Skip to content

Commit

Permalink
Merge pull request #18 from cmccarthy1/dev
Browse files Browse the repository at this point in the history
Additional functionality and type checking update
  • Loading branch information
awilson-kx authored Sep 20, 2019
2 parents c0d7136 + b28af06 commit 6e6b55d
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 27 deletions.
32 changes: 31 additions & 1 deletion kfk.c
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ K decodeParList(rd_kafka_topic_partition_list_t *t){
rd_kafka_topic_partition_list_t* plistoffsetdict(S topic,K partitions){
K dk=kK(partitions)[0],dv=kK(partitions)[1];
I*p;J*o,i;
if(dk->n==0) return NULL; // empty dicts for offsetless commit
p=kI(dk);o=kJ(dv);
rd_kafka_topic_partition_list_t *t_partition=
rd_kafka_topic_partition_list_new(dk->n);
Expand Down Expand Up @@ -425,6 +424,8 @@ EXP K3(kfkAssignOffsets){
rd_kafka_resp_err_t err;
if(!checkType("is!", x,y,z))
return KNL;
if(!checkType("IJ",kK(z)[0],kK(z)[1]))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
partitions = plistoffsetdict(y->s,z);
Expand All @@ -440,6 +441,8 @@ EXP K4(kfkCommitOffsets){
rd_kafka_t *rk;rd_kafka_topic_partition_list_t *t_partition;
if(!checkType("is!b", x, y, z, r))
return KNL;
if(!checkType("IJ",kK(z)[0],kK(z)[1]))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
t_partition = plistoffsetdict(y->s,z);
Expand All @@ -457,6 +460,8 @@ EXP K3(kfkCommittedOffsets){
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
if(!checkType("IJ",kK(z)[0],kK(z)[1]))
return KNL;
t_partition = plistoffsetdict(y->s,z);
if(KFK_OK != (err= rd_kafka_committed(rk, t_partition,5000)))
return krr((S) rd_kafka_err2str(err));
Expand All @@ -471,6 +476,8 @@ EXP K3(kfkPositionOffsets){
rd_kafka_t *rk;rd_kafka_topic_partition_list_t *t_partition;
if(!checkType("is!", x, y, z))
return KNL;
if(!checkType("IJ",kK(z)[0],kK(z)[1]))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
t_partition = plistoffsetdict(y->s,z);
Expand Down Expand Up @@ -553,8 +560,31 @@ EXP K1(kfkOutQLen){
return ki(rd_kafka_outq_len(rk));
}

// logger level is set based on Severity levels in syslog https://en.wikipedia.org/wiki/Syslog#Severity_level
EXP K2(kfkSetLoggerLevel){
rd_kafka_t *rk;
I qy=0;
if(!checkType("i[hij]",x,y))
return KNL;
if(!(rk=clientIndex(x)))
return KNL;
SW(y->t){
CS(-KH,qy=y->h);
CS(-KI,qy=y->i);
CS(-KJ,qy=y->j);
}
rd_kafka_set_log_level(rk, qy);
return KNL;
}

// Returns the number of threads currently being used by librdkafka
EXP K kfkThreadCount(K UNUSED(x)){return ki(rd_kafka_thread_cnt());}

EXP K kfkVersion(K UNUSED(x)){return ki(rd_kafka_version());}

// Returns the human readable librdkafka version
EXP K kfkVersionSym(K UNUSED(x)){return ks((S)rd_kafka_version_str());}

EXP K kfkExportErr(K UNUSED(dummy)){
const struct rd_kafka_err_desc *errdescs;
size_t i,n;
Expand Down
61 changes: 35 additions & 26 deletions kfk.q
Original file line number Diff line number Diff line change
@@ -1,55 +1,58 @@
\d .kfk
LIBPATH:`:libkfk 2:
funcs:(
// .kfk.init[]:i
// .kfk.init[]:i
(`kfkInit;1);
// .kfk.Client[client_type:c;conf:S!S]:i
// .kfk.Client[client_type:c;conf:S!S]:i
(`kfkClient;2);
// .kfk.ClientDel[client_id:i]:_
// .kfk.ClientDel[client_id:i]:_
(`kfkClientDel;1);
// .kfk.ClientName[client_id:i]:s
// .kfk.ClientName[client_id:i]:s
(`kfkClientName;1);
// .kfk.ClientMemberId[client_id:i]:s
// .kfk.ClientMemberId[client_id:i]:s
(`kfkClientMemberId;1);
// .kfk.Topic[client_id:i;topicname:s;conf:S!S]:i
// .kfk.Topic[client_id:i;topicname:s;conf:S!S]:i
(`kfkTopic;3);
// .kfk.TopicDel[topic_id:i]:_
// .kfk.TopicDel[topic_id:i]:_
(`kfkTopicDel;1);
// .kfk.TopicName[topic_id:i]:s
// .kfk.TopicName[topic_id:i]:s
(`kfkTopicName;1);
// .kfk.Metadata[client_id:i]:S!()
// .kfk.Metadata[client_id:i]:S!()
(`kfkMetadata;1);
// PRODUCER API
// .kfk.Pub[topic_id:i;partid:i;data;key]:_
// .kfk.Pub[topic_id:i;partid:i;data;key]:_
(`kfkPub;4);
// .kfk.OutQLen[client_id:i]:i
// .kfk.OutQLen[client_id:i]:i
(`kfkOutQLen;1);
// CONSUMER API
// .kfk.Sub[client_id:i;topicname:s;partition_list|partition_offsets:I!J]:()
// .kfk.Sub[client_id:i;topicname:s;partition_list|partition_offsets:I!J]:()
(`kfkSub;3);
// .kfk.Unsub[client_id:i]:()
// .kfk.Unsub[client_id:i]:()
(`kfkUnsub;1);
// .kfk.Subscription[client_id:i]
// .kfk.Subscription[client_id:i]
(`kfkSubscription;1);
// .kfk.Poll[client_id:i;timeout;max_messages]
// .kfk.Poll[client_id:i;timeout;max_messages]
(`kfkPoll;3);
// .kfk.Version[]:i
// .kfk.Version[]:i
(`kfkVersion;1);
// .kfk.Flush[producer_id:i;timeout_ms:i]
// .kfk.Flush[producer_id:i;timeout_ms:i]:()
(`kfkFlush;2);
// .kfk.ExportErr[]:T
// .kfk.ExportErr[]:T
(`kfkExportErr;1);
// .kfk.CommitOffsets[client_id;topic:s;partition_offsets:I!J;async:b]:()
// .kfk.CommitOffsets[client_id;topic:s;partition_offsets:I!J;async:b]:()
(`kfkCommitOffsets;4);
// .kfk.PositionOffsets[client_id:i;topic:s;partition_offsets:I!J]:partition_offsets
// .kfk.PositionOffsets[client_id:i;topic:s;partition_offsets:I!J]:partition_offsets
(`kfkPositionOffsets;3);
// .kfk.CommittedOffsets[client_id:i;topic:s;partition_offsets:I!J]:partition_offsets
// .kfk.CommittedOffsets[client_id:i;topic:s;partition_offsets:I!J]:partition_offsets
(`kfkCommittedOffsets;3);
// .kfk.AssignOffsets[client_id:i;topic:s;partition_offsets:I!J]:()
(`kfkAssignOffsets;3)
// .kfk.AssignOffsets[client_id:i;topic:s;partition_offsets:I!J]:()
(`kfkAssignOffsets;3);
// .kfk.Threadcount[]:i
(`kfkThreadCount;1);
// .kfk.VersionSym[]:s
(`kfkVersionSym;1);
// .kfk.SetLoggerLevel[client_id:i;int_level:i]:()
(`kfkSetLoggerLevel;2)
);


// binding functions from dictionary funcs using rule
// kfk<Name> -> .kfk.<Name>
.kfk,:(`$3_'string funcs[;0])!LIBPATH@/:funcs
Expand All @@ -62,6 +65,12 @@ Version:Version[];
// Table with all errors return by kafka with codes and description
Errors:ExportErr[];

// projection function for handling int/long lists of partitions for offset functions
osetp:{[cf;x;y;z]cf[x;y;$[99h=type z;z;("i"$z,())!count[z]#0]]}
// Allow Offset functionality to take topics as a list in z argument
CommittedOffsets:osetp[CommittedOffsets;;]
PositionOffsets :osetp[PositionOffsets;;]

// Unassigned partition.
// The unassigned partition is used by the producer API for messages
// that should be partitioned using the configured or default partitioner.
Expand Down

0 comments on commit 6e6b55d

Please sign in to comment.