diff --git a/generic/kafkatcl.c b/generic/kafkatcl.c index 0e6eaca..9fff1d7 100644 --- a/generic/kafkatcl.c +++ b/generic/kafkatcl.c @@ -25,6 +25,9 @@ kafkatcl_subscriber_poll(kafkatcl_handleClientData *kh); void kafkatcl_consume_stop_all_partitions (kafkatcl_topicClientData *kt); +int +kafkatcl_handleObjectObjCmd(ClientData cData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[]); + // DEBUG #ifdef DEBUGPRINTF void kafkatcl_dump_topic_partition_list(rd_kafka_topic_partition_list_t *topics) @@ -2160,13 +2163,15 @@ kafkatcl_handle_topic_info (Tcl_Interp *interp, kafkatcl_topicClientData *kt, in "name", "partitions", "consistent_partition", + "output_queue_length", NULL }; enum subOptions { SUBOPT_NAME, SUBOPT_PARTITIONS, - SUBOPT_CONSISTENT_PARTITION + SUBOPT_CONSISTENT_PARTITION, + SUBOPT_OUTPUT_QUEUE_LENGTH }; // argument must be one of the subOptions defined above @@ -2229,6 +2234,16 @@ kafkatcl_handle_topic_info (Tcl_Interp *interp, kafkatcl_topicClientData *kt, in Tcl_SetObjResult (interp, Tcl_NewIntObj (whichPartition)); break; } + + case SUBOPT_OUTPUT_QUEUE_LENGTH: { + if (objc != 3) { + Tcl_WrongNumArgs (interp, 3, objv, ""); + return TCL_ERROR; + } + + Tcl_SetObjResult (interp, Tcl_NewIntObj (rd_kafka_outq_len (kh->rk))); + break; + } } return TCL_OK; } @@ -2530,6 +2545,7 @@ kafkatcl_topicConsumerObjectObjCmd(ClientData cData, Tcl_Interp *interp, int obj "start", "start_queue", "stop", + "creator", "delete", "consume_start", "consume_start_queue", @@ -2544,6 +2560,7 @@ kafkatcl_topicConsumerObjectObjCmd(ClientData cData, Tcl_Interp *interp, int obj OPT_CONSUME_START, OPT_CONSUME_START_QUEUE, OPT_CONSUME_STOP, + OPT_CREATOR, OPT_DELETE, OPT_LEGACY_CONSUME_START, OPT_LEGACY_CONSUME_START_QUEUE, @@ -2670,6 +2687,10 @@ kafkatcl_topicConsumerObjectObjCmd(ClientData cData, Tcl_Interp *interp, int obj break; } + case OPT_CREATOR: { + return kafkatcl_handleObjectObjCmd(kt->kh, interp, objc-1, objv+1); + } + case OPT_INFO: { return kafkatcl_handle_topic_info (interp, kt, objc, objv); } @@ -2800,6 +2821,7 @@ kafkatcl_topicProducerObjectObjCmd(ClientData cData, Tcl_Interp *interp, int obj "produce", "produce_batch", "info", + "creator", "delete", NULL }; @@ -2808,6 +2830,7 @@ kafkatcl_topicProducerObjectObjCmd(ClientData cData, Tcl_Interp *interp, int obj OPT_PRODUCE, OPT_PRODUCE_BATCH, OPT_INFO, + OPT_CREATOR, OPT_DELETE }; @@ -2924,6 +2947,10 @@ kafkatcl_topicProducerObjectObjCmd(ClientData cData, Tcl_Interp *interp, int obj break; } + case OPT_CREATOR: { + return kafkatcl_handleObjectObjCmd(kt->kh, interp, objc-1, objv+1); + } + case OPT_INFO: { return kafkatcl_handle_topic_info (interp, kt, objc, objv); }