Skip to content

Commit

Permalink
Merge branch 'cmccarthy1-dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
awilson-kx committed Sep 10, 2019
2 parents 6d1277d + 443f11e commit c0d7136
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 23 deletions.
12 changes: 5 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@ else ifeq ($(shell uname),Darwin)
OSXOPTS = -undefined dynamic_lookup -mmacosx-version-min=10.12
endif

KH = curl -s -O -J -L https://github.com/KxSystems/kdb/raw/master/c/c/k.h

QARCH = $(OSFLAG)$(MS)
Q = $(QHOME)/$(QARCH)

all:
$(KH)
all: k.h
$(CC) kfk.c -m$(MS) $(OPTS) $(LDOPTS_DYNAMIC) $(LD_COMMON) -I$(KFK_INCLUDE) $(LNK) -o $(TGT) $(OSXOPTS)
static:
$(KH)
$(CC) kfk.c -m$(MS) $(OPTS) $(LDOPTS_STATIC) $(LD_COMMON) -I$(KFK_INCLUDE) $(LNK) -o $(TGT) $(OSXOPTS)
static: k.h
$(CC) kfk.c -m$(MS) $(OPTS) $(LDOPTS_STATIC) $(LD_COMMON) -I$(KFK_INCLUDE) $(LNK) -o $(TGT) $(OSXOPTS)
k.h:
curl -s -O -L https://github.com/KxSystems/kdb/raw/master/c/c/k.h
install:
install $(TGT) $(Q)
clean:
Expand Down
49 changes: 33 additions & 16 deletions kfk.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ static K loadConf(rd_kafka_conf_t *conf, K x){
char b[512];
J i;
for(i= 0; i < xx->n; ++i){
if(RD_KAFKA_CONF_OK !=
rd_kafka_conf_set(conf, kS(xx)[i], kS(xy)[i], b, sizeof(b))){
if(RD_KAFKA_CONF_OK !=rd_kafka_conf_set(conf, kS(xx)[i], kS(xy)[i], b, sizeof(b))){
return krr((S) b);
}
}
Expand Down Expand Up @@ -350,6 +349,24 @@ rd_kafka_topic_partition_list_t* plistoffsetdict(S topic,K partitions){
return t_partition;
}

EXP K2(kfkFlush){
rd_kafka_t *rk;
I qy=0;
if(!checkType("i[hij]",x,y))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
SW(y->t){
CS(-KH,qy=y->h);
CS(-KI,qy=y->i);
CS(-KJ,qy=y->j);
}
rd_kafka_resp_err_t err= rd_kafka_flush(rk,qy);
if(KFK_OK != err)
return krr((S) rd_kafka_err2str(err));
return KNL;
}

// producer api
EXP K4(kfkPub){
rd_kafka_topic_t *rkt;
Expand Down Expand Up @@ -465,20 +482,20 @@ EXP K3(kfkPositionOffsets){
}

EXP K1(kfkSubscription){
K r;
rd_kafka_topic_partition_list_t *t;
rd_kafka_t *rk;
rd_kafka_resp_err_t err;
if (!checkType("i", x))
return KNL;
if (!(rk = clientIndex(x)))
return KNL;
err = rd_kafka_subscription(rk, &t);
if (KFK_OK != err)
return krr((S)rd_kafka_err2str(err));
r = decodeParList(t);
rd_kafka_topic_partition_list_destroy(t);
return r;
K r;
rd_kafka_topic_partition_list_t *t;
rd_kafka_t *rk;
rd_kafka_resp_err_t err;
if (!checkType("i", x))
return KNL;
if (!(rk = clientIndex(x)))
return KNL;
err = rd_kafka_subscription(rk, &t);
if (KFK_OK != err)
return krr((S)rd_kafka_err2str(err));
r = decodeParList(t);
rd_kafka_topic_partition_list_destroy(t);
return r;
}

static J pu(J u){return 1000000LL*(u-10957LL*86400000LL);}
Expand Down
2 changes: 2 additions & 0 deletions kfk.q
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ funcs:(
(`kfkPoll;3);
// .kfk.Version[]:i
(`kfkVersion;1);
// .kfk.Flush[producer_id:i;timeout_ms:i]
(`kfkFlush;2);
// .kfk.ExportErr[]:T
(`kfkExportErr;1);
// .kfk.CommitOffsets[client_id;topic:s;partition_offsets:I!J;async:b]:()
Expand Down

0 comments on commit c0d7136

Please sign in to comment.