-
Notifications
You must be signed in to change notification settings - Fork 1
/
kafkatcl.tcl
163 lines (122 loc) · 2.85 KB
/
kafkatcl.tcl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#
# kafkatcl support functions
#
# simple usage:
#
# kafka::brokers $brokerList
# topic_producer commandName topic
# topic_consumer commandName topic
#
namespace eval ::kafka {
variable consumerIsSetup 0
variable producerIsSetup 0
variable masterIsSetup 0
variable brokers 127.0.0.1
variable loggingEnabled 0
proc logger {message} {
variable loggingEnabled
if {!$loggingEnabled} {
return
}
puts stderr "kafka: $message"
}
proc handle_args {list} {
variable brokers
foreach "key value" $list {
if {[string index $key 0] != "-"} {
error "argument '$key' doesn't start with a dash"
}
switch -exact -- $key {
"-brokers" {
set brokers $value
}
default {
error "argument '$key' unrecognized, must be one of '-brokers'"
}
}
}
}
#
# setup - create a kafka master object if one hasn't already been created
#
proc setup {args} {
variable masterIsSetup
handle_args $args
if {$masterIsSetup} {
return
}
::kafka::kafka create ::kafka::master
logger "created ::kafka::master"
set masterIsSetup 1
}
#
# setup_consumer - create a kafka consumer object if one hasn't already
# been created. perform basic setup if necessary.
#
proc setup_consumer {args} {
variable consumerIsSetup
variable brokers
if {$consumerIsSetup} {
return
}
setup {*}$args
master consumer_creator ::kafka::consumer
consumer add_brokers $brokers
set consumerIsSetup 1
logger "created consumer-creator with brokers $brokers"
}
#
# setup_producer - create a kafka producer object if one hasn't already
# been created. perform basic setup if necessary.
#
proc setup_producer {args} {
variable producerIsSetup
variable brokers
if {$producerIsSetup} {
return
}
setup {*}$args
master producer_creator ::kafka::producer
producer add_brokers $brokers
set producerIsSetup 1
logger "created producer-creator with brokers $brokers"
}
#
# subscriber - create and return a subscriber object
#
proc subscriber {groupid args} {
variable brokers
setup {*}$args
master config group.id $groupid
master config bootstrap.servers [join $brokers ,]
set subscriber [master subscriber #auto]
logger "created subscriber with brokers $brokers"
return $subscriber
}
#
# brokers - specify a list of brokers
#
proc brokers {brokerList} {
setup -brokers $brokerList
#logger "set brokers to $brokerList"
}
#
# topic_producer - given a name and a topic create a kafka command that can
# consume from the topic
#
proc topic_producer {name topic} {
setup_producer
#logger "creating producer $name for topic $topic"
return [producer new_topic $name $topic]
}
#
# topic_consumer - given a name and a topic create a kafka command that can
# produce to the topic
#
proc topic_consumer {name topic} {
setup_consumer
#logger "creating consumer $name for topic $topic"
return [consumer new_topic $name $topic]
}
} ;# namespace ::kafka
# vim: set ts=4 sw=4 sts=4 noet :