diff --git a/docs/reference.md b/docs/reference.md index 9e1d499..9ba5f09 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -166,7 +166,7 @@ Where - `id` is a consumer or producer ID - `topic` is a name to be assigned to the topic (symbol) -- `cfg` is a user-defined topic configuration (dictionary) where both keys and values must be of symbol type: default: `()!()` +- `cfg` is a user-defined topic configuration (dictionary) for adding/overloading configuration. Both keys and values must be of symbol,string,char or symbol list type. Default: `()!()` returns the topic ID (integer). @@ -508,7 +508,7 @@ _Create a consumer according to user-defined configuration_ .kfk.Consumer cfg ``` -Where `cfg` is a dictionary user-defined configuration where the keys must be of symbol type and the values must be of symbol or symbol list type. Returns the ID of the consumer as an integer. +Where `cfg` is a dictionary for adding/overloading configuration. Both keys and values must be of symbol,string,char or symbol list type. Returns the ID of the consumer as an integer. ```q q)kfk_cfg @@ -532,7 +532,7 @@ _Create a producer according to user-defined configuration_ .kfk.Producer cfg ``` -Where `cfg` is a user-defined dictionary configuration where the keys must be of symbol type and the values must be of symbol or symbol list type. Returns the ID of the producer as an integer. +Where `cfg` is a dictionary for adding/overloading configuration. Both keys and values must be of symbol,string,char or symbol list type. Returns the ID of the producer as an integer. ```q q)kfk_cfg diff --git a/kfk.c b/kfk.c index 3ef6454..bee6484 100644 --- a/kfk.c +++ b/kfk.c @@ -161,39 +161,43 @@ static I createStr(K x,char* b,I bLen){ return str-b+1; } -// client api -// x - config dict sym->sym -static int loadConf(rd_kafka_conf_t *conf, K x){ - static char b[512]; - char o[1024],*v; - J i; - if(xy->t!=KS&&xy->t) - return krr("bad cfg values type"),0; - for(i= 0; i < xx->n; ++i){ - if(xy->t==KS) - v=kS(xy)[i]; - else if(!xy->t) - if(kK(xy)[i]->t==-KS) - v=kK(xy)[i]->s; - else if(kK(xy)[i]->t==KS){ - if(!createStr(kK(xy)[i],o,sizeof(o)))return krr("cfg value too long"),0; - v=o; - } - else - return krr("bad cfg value type"),0; - else - return krr("bad cfg value type"),0; - if(RD_KAFKA_CONF_OK !=rd_kafka_conf_set(conf, kS(xx)[i], v, b, sizeof(b))) - return krr((S) b),0; +static char* getConf(K x,I i,S buf,I bufLen){ + if(xt==KS) + return kS(x)[i]; + else if(!xt){ + x=kK(x)[i]; + if(xt==KC && xns; + }else if(xt==KS){ + if(!createStr(x,buf,bufLen))return 0; + return buf; + } } - return 1; + return 0; } -static int loadTopConf(rd_kafka_topic_conf_t *conf, K x){ - static char b[512]; - J i; + +static int loadConf(void*conf, K x,I topic){ + static char err[128]; + char nbuff[128],vbuff[512]; + char *name,*val; + J i;I r; for(i= 0; i < xx->n; ++i){ - if(RD_KAFKA_CONF_OK !=rd_kafka_topic_conf_set(conf, kS(xx)[i], kS(xy)[i], b, sizeof(b))) - return krr((S) b),0; + name=getConf(xx,i,nbuff,sizeof(nbuff)); + val=getConf(xy,i,vbuff,sizeof(vbuff)); + if(!name||!val)return krr("bad cfg value type"),0; + if(topic) + r=rd_kafka_topic_conf_set((rd_kafka_topic_conf_t*)conf, name, val, err, sizeof(err)); + else + r=rd_kafka_conf_set((rd_kafka_conf_t*)conf, name, val, err, sizeof(err)); + if(RD_KAFKA_CONF_OK!=r) + return krr((S) err),0; } return 1; } @@ -211,7 +215,7 @@ EXP K2(kfkClient){ return krr("type: unknown client type"); type= 'p' == xg ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER; conf=rd_kafka_conf_new(); - if(!loadConf(conf, y)){ + if(!loadConf(conf, y, 0)){ rd_kafka_conf_destroy(conf); return KNL; } @@ -274,7 +278,7 @@ EXP K3(kfkgenerateTopic){ if(!checkType("is!",x ,y ,z)||!(rk= clientIndex(x))) return KNL; rd_topic_conf= rd_kafka_topic_conf_new(); - if(!loadTopConf(rd_topic_conf, z)){ + if(!loadConf(rd_topic_conf, z, 1)){ rd_kafka_topic_conf_destroy(rd_topic_conf); return KNL; }