Skip to content

Commit

Permalink
allow string/char etc for config (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
sshanks-kx authored May 10, 2024
1 parent dfc6970 commit 15361b4
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 35 deletions.
6 changes: 3 additions & 3 deletions docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ Where

- `id` is a consumer or producer ID
- `topic` is a name to be assigned to the topic (symbol)
- `cfg` is a user-defined topic configuration (dictionary) where both keys and values must be of symbol type: default: `()!()`
- `cfg` is a user-defined topic configuration (dictionary) for adding/overloading configuration. Both keys and values must be of symbol,string,char or symbol list type. Default: `()!()`

returns the topic ID (integer).

Expand Down Expand Up @@ -508,7 +508,7 @@ _Create a consumer according to user-defined configuration_
.kfk.Consumer cfg
```

Where `cfg` is a dictionary user-defined configuration where the keys must be of symbol type and the values must be of symbol or symbol list type. Returns the ID of the consumer as an integer.
Where `cfg` is a dictionary for adding/overloading configuration. Both keys and values must be of symbol,string,char or symbol list type. Returns the ID of the consumer as an integer.

```q
q)kfk_cfg
Expand All @@ -532,7 +532,7 @@ _Create a producer according to user-defined configuration_
.kfk.Producer cfg
```

Where `cfg` is a user-defined dictionary configuration where the keys must be of symbol type and the values must be of symbol or symbol list type. Returns the ID of the producer as an integer.
Where `cfg` is a dictionary for adding/overloading configuration. Both keys and values must be of symbol,string,char or symbol list type. Returns the ID of the producer as an integer.

```q
q)kfk_cfg
Expand Down
68 changes: 36 additions & 32 deletions kfk.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,39 +161,43 @@ static I createStr(K x,char* b,I bLen){
return str-b+1;
}

// client api
// x - config dict sym->sym
static int loadConf(rd_kafka_conf_t *conf, K x){
static char b[512];
char o[1024],*v;
J i;
if(xy->t!=KS&&xy->t)
return krr("bad cfg values type"),0;
for(i= 0; i < xx->n; ++i){
if(xy->t==KS)
v=kS(xy)[i];
else if(!xy->t)
if(kK(xy)[i]->t==-KS)
v=kK(xy)[i]->s;
else if(kK(xy)[i]->t==KS){
if(!createStr(kK(xy)[i],o,sizeof(o)))return krr("cfg value too long"),0;
v=o;
}
else
return krr("bad cfg value type"),0;
else
return krr("bad cfg value type"),0;
if(RD_KAFKA_CONF_OK !=rd_kafka_conf_set(conf, kS(xx)[i], v, b, sizeof(b)))
return krr((S) b),0;
static char* getConf(K x,I i,S buf,I bufLen){
if(xt==KS)
return kS(x)[i];
else if(!xt){
x=kK(x)[i];
if(xt==KC && xn<bufLen){
buf[xn]=0;
return memcpy(buf,xG,xn);
}else if(xt==-KC){
buf[0]=xg;
buf[1]=0;
return buf;
}else if(xt==-KS){
return x->s;
}else if(xt==KS){
if(!createStr(x,buf,bufLen))return 0;
return buf;
}
}
return 1;
return 0;
}
static int loadTopConf(rd_kafka_topic_conf_t *conf, K x){
static char b[512];
J i;

static int loadConf(void*conf, K x,I topic){
static char err[128];
char nbuff[128],vbuff[512];
char *name,*val;
J i;I r;
for(i= 0; i < xx->n; ++i){
if(RD_KAFKA_CONF_OK !=rd_kafka_topic_conf_set(conf, kS(xx)[i], kS(xy)[i], b, sizeof(b)))
return krr((S) b),0;
name=getConf(xx,i,nbuff,sizeof(nbuff));
val=getConf(xy,i,vbuff,sizeof(vbuff));
if(!name||!val)return krr("bad cfg value type"),0;
if(topic)
r=rd_kafka_topic_conf_set((rd_kafka_topic_conf_t*)conf, name, val, err, sizeof(err));
else
r=rd_kafka_conf_set((rd_kafka_conf_t*)conf, name, val, err, sizeof(err));
if(RD_KAFKA_CONF_OK!=r)
return krr((S) err),0;
}
return 1;
}
Expand All @@ -211,7 +215,7 @@ EXP K2(kfkClient){
return krr("type: unknown client type");
type= 'p' == xg ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER;
conf=rd_kafka_conf_new();
if(!loadConf(conf, y)){
if(!loadConf(conf, y, 0)){
rd_kafka_conf_destroy(conf);
return KNL;
}
Expand Down Expand Up @@ -274,7 +278,7 @@ EXP K3(kfkgenerateTopic){
if(!checkType("is!",x ,y ,z)||!(rk= clientIndex(x)))
return KNL;
rd_topic_conf= rd_kafka_topic_conf_new();
if(!loadTopConf(rd_topic_conf, z)){
if(!loadConf(rd_topic_conf, z, 1)){
rd_kafka_topic_conf_destroy(rd_topic_conf);
return KNL;
}
Expand Down

0 comments on commit 15361b4

Please sign in to comment.