forked from shanna/mosquitto
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conn.go
141 lines (119 loc) · 3.25 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package mosquitto
/*
#cgo LDFLAGS: -lmosquitto
#include "mosquitto_ext.h"
*/
import "C"
import (
"fmt"
"net"
"strconv"
"sync"
"unsafe"
)
func init() {
C.mosquitto_lib_init()
}
type Conn struct {
Id string
mosq *C.struct_mosquitto
handlers map[string]Handler
wg sync.WaitGroup
}
func Dial(id string, address string, clean bool) (Conn, error) {
address_host, address_port, err := net.SplitHostPort(address)
if err != nil {
panic(err)
}
port, err := strconv.Atoi(address_port)
if err != nil {
panic(err)
}
cid := C.CString(id)
// TODO: Bug https://code.google.com/p/go/issues/detail?id=4417
// cclean := C.bool(clean)
cclean := bool_to_cint(clean)
chost := C.CString(address_host)
cport := C.int(port)
defer C.free(unsafe.Pointer(cid))
defer C.free(unsafe.Pointer(chost))
// Setup obj early so mosquitto can pass it around.
c := Conn{Id: id, handlers: make(map[string]Handler)}
// TODO: Keepalive.
// TODO: Bug https://code.google.com/p/go/issues/detail?id=4417
// c.mosq = C.mosquitto_new(cid, cclean, unsafe.Pointer(&c))
c.mosq = C.mosquitto_new2(cid, cclean, unsafe.Pointer(&c))
if c.mosq == nil {
err = ccode_to_error(C.MOSQ_ERR_ERRNO)
return c, err
}
err = ccode_to_error(C.mosquitto_connect(c.mosq, chost, cport, C.int(60)))
return c, err
}
func (c *Conn) HandleFunc(sub string, qos int, hf HandlerFunc) error {
handler, err := NewHandler(sub, qos, hf)
if err == nil {
c.handlers[sub] = handler
}
return err
}
func (c *Conn) Close() error {
c.wg.Wait()
C.mosquitto_disconnect(c.mosq)
C.mosquitto_destroy(c.mosq)
return nil
}
// TODO: Message.Id
func (c *Conn) Publish(m Message) error {
ctopic := C.CString(m.Topic)
cpayload := unsafe.Pointer(&m.Payload[0])
cpayloadlen := C.int(len(m.Payload))
cqos := C.int(m.Qos)
// TODO: Bug https://code.google.com/p/go/issues/detail?id=4417
// cretain := C.bool(m.Retain)
cretain := bool_to_cint(m.Retain)
defer C.free(unsafe.Pointer(ctopic))
// TODO: Bug https://code.google.com/p/go/issues/detail?id=4417
// return code_to_error(C.mosquitto_publish(c.mosq, nil, ctopic, cpayloadlen, cpayload, cqos, cretain))
return ccode_to_error(C.mosquitto_publish2(c.mosq, nil, ctopic, cpayloadlen, cpayload, cqos, cretain))
}
func (c *Conn) Listen() error {
return ccode_to_error(C.mosquitto_loop_forever(c.mosq, C.int(-1), C.int(1)))
}
//export on_connect
func on_connect(cconn unsafe.Pointer) {
c := (*Conn)(cconn)
// Setup handlers again.
for _, handler := range c.handlers {
cqos := C.int(handler.Qos)
csub := C.CString(handler.Sub)
C.mosquitto_subscribe(c.mosq, nil, csub, cqos)
C.free(unsafe.Pointer(csub))
}
}
//export on_message
func on_message(cconn unsafe.Pointer, ctopic *C.char, cpayload unsafe.Pointer, cpayloadlen C.int) {
c := (*Conn)(cconn)
topic := C.GoString(ctopic)
message, err := NewMessage(topic, C.GoBytes(cpayload, cpayloadlen))
if err != nil {
// TODO: Log error.
return
}
for _, handler := range c.handlers {
if !handler.Match(topic) {
continue
}
c.wg.Add(1)
go func(h Handler, c *Conn, m Message) {
h.Call(c, m)
c.wg.Done()
}(handler, c, message)
}
}
//export on_log
func on_log(cconn unsafe.Pointer, clevel C.int, cmessage *C.char) {
// TODO: Logging.
// c := (*Conn)(cconn)
fmt.Printf("%s\n", C.GoString(cmessage))
}