From 99b793676d36fb7fdcafa433004a5c2b25954969 Mon Sep 17 00:00:00 2001 From: Conor McCarthy Date: Wed, 28 Aug 2019 19:56:15 +0100 Subject: [PATCH 1/7] Addition of flush functionality for messages in buffer (provide access to rd_kafka_flush) --- kfk.c | 12 ++++++++++++ kfk.q | 2 ++ 2 files changed, 14 insertions(+) diff --git a/kfk.c b/kfk.c index f518146..aeefa84 100644 --- a/kfk.c +++ b/kfk.c @@ -350,6 +350,18 @@ rd_kafka_topic_partition_list_t* plistoffsetdict(S topic,K partitions){ return t_partition; } +EXP K2(kfkFlush){ + rd_kafka_t *rk; + if(!checkType("ii",x , y)) + return KNL; + if(!(rk= clientIndex(x))) + return KNL; + rd_kafka_resp_err_t err= rd_kafka_flush(rk,(I)y); + if(KFK_OK != err) + return krr((S) rd_kafka_err2str(err)); + return KNL; + } + // producer api EXP K4(kfkPub){ rd_kafka_topic_t *rkt; diff --git a/kfk.q b/kfk.q index 9d021fb..030262b 100644 --- a/kfk.q +++ b/kfk.q @@ -35,6 +35,8 @@ funcs:( (`kfkPoll;3); // .kfk.Version[]:i (`kfkVersion;1); + // .kfk.Flush[producer_id:i;timeout_ms:i] + (`kfkFlush;2); // .kfk.ExportErr[]:T (`kfkExportErr;1); // .kfk.CommitOffsets[client_id;topic:s;partition_offsets:I!J;async:b]:() From 38730e6495239967371e8ce5c2fc612d3f94686c Mon Sep 17 00:00:00 2001 From: Conor McCarthy Date: Thu, 29 Aug 2019 13:26:18 +0200 Subject: [PATCH 2/7] Flush function now should handle long, short and int as input for timeout --- kfk.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/kfk.c b/kfk.c index aeefa84..0e347b6 100644 --- a/kfk.c +++ b/kfk.c @@ -352,11 +352,19 @@ rd_kafka_topic_partition_list_t* plistoffsetdict(S topic,K partitions){ EXP K2(kfkFlush){ rd_kafka_t *rk; - if(!checkType("ii",x , y)) + I qy; + if(!checkType("i",x)) return KNL; if(!(rk= clientIndex(x))) return KNL; - rd_kafka_resp_err_t err= rd_kafka_flush(rk,(I)y); + SW(y->t){ + CS(-KH,qy=y->h); + CS(-KI,qy=y->i); + CS(-KJ,qy=y->j); + CS(-KE,qy=y->e); + CD:R krr("timeout type"); + } + rd_kafka_resp_err_t err= rd_kafka_flush(rk,qy); if(KFK_OK != err) return krr((S) rd_kafka_err2str(err)); return KNL; From 291d22be0c724134ceb8986a781ae1211dfec06c Mon Sep 17 00:00:00 2001 From: Conor McCarthy Date: Thu, 29 Aug 2019 15:36:45 +0200 Subject: [PATCH 3/7] modification to Makefile to remove repeated download of k.h --- Makefile | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/Makefile b/Makefile index 2f366ef..a6c5ada 100644 --- a/Makefile +++ b/Makefile @@ -21,17 +21,15 @@ else ifeq ($(shell uname),Darwin) OSXOPTS = -undefined dynamic_lookup -mmacosx-version-min=10.12 endif -KH = curl -s -O -J -L https://github.com/KxSystems/kdb/raw/master/c/c/k.h - QARCH = $(OSFLAG)$(MS) Q = $(QHOME)/$(QARCH) -all: - $(KH) +all: k.h $(CC) kfk.c -m$(MS) $(OPTS) $(LDOPTS_DYNAMIC) $(LD_COMMON) -I$(KFK_INCLUDE) $(LNK) -o $(TGT) $(OSXOPTS) -static: - $(KH) - $(CC) kfk.c -m$(MS) $(OPTS) $(LDOPTS_STATIC) $(LD_COMMON) -I$(KFK_INCLUDE) $(LNK) -o $(TGT) $(OSXOPTS) +static: k.h + $(CC) kfk.c -m$(MS) $(OPTS) $(LDOPTS_STATIC) $(LD_COMMON) -I$(KFK_INCLUDE) $(LNK) -o $(TGT) $(OSXOPTS) +k.h: + curl -s -O -J -L https://github.com/KxSystems/kdb/raw/master/c/c/k.h install: install $(TGT) $(Q) clean: From 6bd137a8b767c650ef2645917eda8e5d4f923945 Mon Sep 17 00:00:00 2001 From: Conor McCarthy Date: Tue, 3 Sep 2019 16:19:59 +0100 Subject: [PATCH 4/7] Removal of -J flag in make to allow for Rhel6 support --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index a6c5ada..c0e4442 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ all: k.h static: k.h $(CC) kfk.c -m$(MS) $(OPTS) $(LDOPTS_STATIC) $(LD_COMMON) -I$(KFK_INCLUDE) $(LNK) -o $(TGT) $(OSXOPTS) k.h: - curl -s -O -J -L https://github.com/KxSystems/kdb/raw/master/c/c/k.h + curl -s -O -L https://github.com/KxSystems/kdb/raw/master/c/c/k.h install: install $(TGT) $(Q) clean: From 35762fe6e15ac3ca119d2466395612fa366d607b Mon Sep 17 00:00:00 2001 From: Conor McCarthy Date: Mon, 9 Sep 2019 14:59:05 +0100 Subject: [PATCH 5/7] Change to indentation for switch case --- kfk.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/kfk.c b/kfk.c index 0e347b6..f30bdd0 100644 --- a/kfk.c +++ b/kfk.c @@ -357,13 +357,13 @@ EXP K2(kfkFlush){ return KNL; if(!(rk= clientIndex(x))) return KNL; - SW(y->t){ - CS(-KH,qy=y->h); - CS(-KI,qy=y->i); - CS(-KJ,qy=y->j); - CS(-KE,qy=y->e); - CD:R krr("timeout type"); - } + SW(y->t){ + CS(-KH,qy=y->h); + CS(-KI,qy=y->i); + CS(-KJ,qy=y->j); + CS(-KE,qy=y->e); + CD:R krr("timeout type"); + } rd_kafka_resp_err_t err= rd_kafka_flush(rk,qy); if(KFK_OK != err) return krr((S) rd_kafka_err2str(err)); From 43d1fb110752cd61fa46f43e3c3b62558d16aa27 Mon Sep 17 00:00:00 2001 From: Conor McCarthy Date: Mon, 9 Sep 2019 17:11:44 +0100 Subject: [PATCH 6/7] Code clean up in line with set standard within remainder of code --- kfk.c | 59 +++++++++++++++++++++++++++++------------------------------ 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/kfk.c b/kfk.c index f30bdd0..809b8ed 100644 --- a/kfk.c +++ b/kfk.c @@ -142,8 +142,7 @@ static K loadConf(rd_kafka_conf_t *conf, K x){ char b[512]; J i; for(i= 0; i < xx->n; ++i){ - if(RD_KAFKA_CONF_OK != - rd_kafka_conf_set(conf, kS(xx)[i], kS(xy)[i], b, sizeof(b))){ + if(RD_KAFKA_CONF_OK !=rd_kafka_conf_set(conf, kS(xx)[i], kS(xy)[i], b, sizeof(b))){ return krr((S) b); } } @@ -351,20 +350,20 @@ rd_kafka_topic_partition_list_t* plistoffsetdict(S topic,K partitions){ } EXP K2(kfkFlush){ - rd_kafka_t *rk; - I qy; - if(!checkType("i",x)) - return KNL; - if(!(rk= clientIndex(x))) - return KNL; - SW(y->t){ - CS(-KH,qy=y->h); - CS(-KI,qy=y->i); - CS(-KJ,qy=y->j); - CS(-KE,qy=y->e); - CD:R krr("timeout type"); - } - rd_kafka_resp_err_t err= rd_kafka_flush(rk,qy); + rd_kafka_t *rk; + I qy; + if(!checkType("i",x)) + return KNL; + if(!(rk= clientIndex(x))) + return KNL; + SW(y->t){ + CS(-KH,qy=y->h); + CS(-KI,qy=y->i); + CS(-KJ,qy=y->j); + CS(-KE,qy=y->e); + CD:R krr("timeout type"); + } + rd_kafka_resp_err_t err= rd_kafka_flush(rk,qy); if(KFK_OK != err) return krr((S) rd_kafka_err2str(err)); return KNL; @@ -485,20 +484,20 @@ EXP K3(kfkPositionOffsets){ } EXP K1(kfkSubscription){ - K r; - rd_kafka_topic_partition_list_t *t; - rd_kafka_t *rk; - rd_kafka_resp_err_t err; - if (!checkType("i", x)) - return KNL; - if (!(rk = clientIndex(x))) - return KNL; - err = rd_kafka_subscription(rk, &t); - if (KFK_OK != err) - return krr((S)rd_kafka_err2str(err)); - r = decodeParList(t); - rd_kafka_topic_partition_list_destroy(t); - return r; + K r; + rd_kafka_topic_partition_list_t *t; + rd_kafka_t *rk; + rd_kafka_resp_err_t err; + if (!checkType("i", x)) + return KNL; + if (!(rk = clientIndex(x))) + return KNL; + err = rd_kafka_subscription(rk, &t); + if (KFK_OK != err) + return krr((S)rd_kafka_err2str(err)); + r = decodeParList(t); + rd_kafka_topic_partition_list_destroy(t); + return r; } static J pu(J u){return 1000000LL*(u-10957LL*86400000LL);} From 443f11e8fe87e846545cee9619eccc2716fb97e3 Mon Sep 17 00:00:00 2001 From: awilson-kx Date: Tue, 10 Sep 2019 15:17:39 +0100 Subject: [PATCH 7/7] check y type using checkType --- kfk.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/kfk.c b/kfk.c index 809b8ed..5d18756 100644 --- a/kfk.c +++ b/kfk.c @@ -351,8 +351,8 @@ rd_kafka_topic_partition_list_t* plistoffsetdict(S topic,K partitions){ EXP K2(kfkFlush){ rd_kafka_t *rk; - I qy; - if(!checkType("i",x)) + I qy=0; + if(!checkType("i[hij]",x,y)) return KNL; if(!(rk= clientIndex(x))) return KNL; @@ -360,8 +360,6 @@ EXP K2(kfkFlush){ CS(-KH,qy=y->h); CS(-KI,qy=y->i); CS(-KJ,qy=y->j); - CS(-KE,qy=y->e); - CD:R krr("timeout type"); } rd_kafka_resp_err_t err= rd_kafka_flush(rk,qy); if(KFK_OK != err)