Skip to content

Commit

Permalink
Refactored the code
Browse files Browse the repository at this point in the history
- messages keeps a dirty flag to indicate if it's changed. If message is
  clean then encoding is a direct copy of the byte slice, rather than
  going through the encoding process.
- some updates to messages are done directly to the buffer (thus keeping
  it clean, i.e., dirty flag not set).
- QoS 1 and 2 messages are copied into ackqueue while waiting so the
  incoming/outgoing buffer can keep moving along (major issue in the
  last commit)
- Performance is slightly increased (~5-10%) overall
  • Loading branch information
zhenjl committed Dec 14, 2014
1 parent 3dd9e23 commit 19e7a4f
Show file tree
Hide file tree
Showing 15 changed files with 342 additions and 166 deletions.
20 changes: 18 additions & 2 deletions connack.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ func NewConnackMessage() *ConnackMessage {

// String returns a string representation of the CONNACK message
func (this ConnackMessage) String() string {
return fmt.Sprintf("%s, Session Present=%t, Return code=%s\n",
this.header, this.sessionPresent, this.returnCode)
return fmt.Sprintf("%s, Session Present=%t, Return code=%q\n", this.header, this.sessionPresent, this.returnCode)
}

// SessionPresent returns the session present flag value
Expand All @@ -58,6 +57,8 @@ func (this *ConnackMessage) SetSessionPresent(v bool) {
} else {
this.sessionPresent = false
}

this.dirty = true
}

// ReturnCode returns the return code received for the CONNECT message. The return
Expand All @@ -68,9 +69,14 @@ func (this *ConnackMessage) ReturnCode() ConnackCode {

func (this *ConnackMessage) SetReturnCode(ret ConnackCode) {
this.returnCode = ret
this.dirty = true
}

func (this *ConnackMessage) Len() int {
if !this.dirty {
return len(this.dbuf)
}

ml := this.msglen()

if err := this.SetRemainingLength(int32(ml)); err != nil {
Expand Down Expand Up @@ -108,10 +114,20 @@ func (this *ConnackMessage) Decode(src []byte) (int, error) {
this.returnCode = ConnackCode(b)
total++

this.dirty = false

return total, nil
}

func (this *ConnackMessage) Encode(dst []byte) (int, error) {
if !this.dirty {
if len(dst) < len(this.dbuf) {
return 0, fmt.Errorf("connack/Encode: Insufficient buffer size. Expecting %d, got %d.", len(this.dbuf), len(dst))
}

return copy(dst, this.dbuf), nil
}

// CONNACK remaining length fixed at 2 bytes
hl := this.header.msglen()
ml := this.msglen()
Expand Down
50 changes: 42 additions & 8 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
var clientIdRegexp *regexp.Regexp

func init() {
clientIdRegexp, _ = regexp.Compile("^[0-9a-zA-Z ]*$")
clientIdRegexp = regexp.MustCompile("^[0-9a-zA-Z ]*$")
}

// After a Network Connection is established by a Client to a Server, the first Packet
Expand Down Expand Up @@ -69,7 +69,7 @@ func NewConnectMessage() *ConnectMessage {

// String returns a string representation of the CONNECT message
func (this ConnectMessage) String() string {
return fmt.Sprintf("%s, Connect Flags=%08b, Version=%d, KeepAlive=%d, Client ID=%s, Will Topic=%s, Will Message=%s, Username=%s, Password=%s",
return fmt.Sprintf("%s, Connect Flags=%08b, Version=%d, KeepAlive=%d, Client ID=%q, Will Topic=%q, Will Message=%q, Username=%q, Password=%q",
this.header,
this.connectFlags,
this.Version(),
Expand All @@ -96,6 +96,8 @@ func (this *ConnectMessage) SetVersion(v byte) error {
}

this.version = v
this.dirty = true

return nil
}

Expand All @@ -114,6 +116,8 @@ func (this *ConnectMessage) SetCleanSession(v bool) {
} else {
this.connectFlags &= 253 // 11111101
}

this.dirty = true
}

// WillFlag returns the bit that specifies whether a Will Message should be stored
Expand All @@ -132,6 +136,8 @@ func (this *ConnectMessage) SetWillFlag(v bool) {
} else {
this.connectFlags &= 251 // 11111011
}

this.dirty = true
}

// WillQos returns the two bits that specify the QoS level to be used when publishing
Expand All @@ -148,6 +154,8 @@ func (this *ConnectMessage) SetWillQos(qos byte) error {
}

this.connectFlags = (this.connectFlags & 231) | (qos << 3) // 231 = 11100111
this.dirty = true

return nil
}

Expand All @@ -165,6 +173,8 @@ func (this *ConnectMessage) SetWillRetain(v bool) {
} else {
this.connectFlags &= 223 // 11011111
}

this.dirty = true
}

// UsernameFlag returns the bit that specifies whether a user name is present in the
Expand All @@ -181,6 +191,8 @@ func (this *ConnectMessage) SetUsernameFlag(v bool) {
} else {
this.connectFlags &= 127 // 01111111
}

this.dirty = true
}

// PasswordFlag returns the bit that specifies whether a password is present in the
Expand All @@ -197,6 +209,8 @@ func (this *ConnectMessage) SetPasswordFlag(v bool) {
} else {
this.connectFlags &= 191 // 10111111
}

this.dirty = true
}

// KeepAlive returns a time interval measured in seconds. Expressed as a 16-bit word,
Expand All @@ -211,6 +225,8 @@ func (this *ConnectMessage) KeepAlive() uint16 {
// alive.
func (this *ConnectMessage) SetKeepAlive(v uint16) {
this.keepAlive = v

this.dirty = true
}

// ClientId returns an ID that identifies the Client to the Server. Each Client
Expand All @@ -228,6 +244,8 @@ func (this *ConnectMessage) SetClientId(v []byte) error {
}

this.clientId = v
this.dirty = true

return nil
}

Expand All @@ -246,6 +264,8 @@ func (this *ConnectMessage) SetWillTopic(v []byte) {
} else if len(this.willMessage) == 0 {
this.SetWillFlag(false)
}

this.dirty = true
}

// WillMessage returns the Will Message that is to be published to the Will Topic.
Expand All @@ -262,6 +282,8 @@ func (this *ConnectMessage) SetWillMessage(v []byte) {
} else if len(this.willTopic) == 0 {
this.SetWillFlag(false)
}

this.dirty = true
}

// Username returns the username from the payload. If the User Name Flag is set to 1,
Expand All @@ -280,6 +302,8 @@ func (this *ConnectMessage) SetUsername(v []byte) {
} else {
this.SetUsernameFlag(false)
}

this.dirty = true
}

// Password returns the password from the payload. If the Password Flag is set to 1,
Expand All @@ -298,9 +322,15 @@ func (this *ConnectMessage) SetPassword(v []byte) {
} else {
this.SetPasswordFlag(false)
}

this.dirty = true
}

func (this *ConnectMessage) Len() int {
if !this.dirty {
return len(this.dbuf)
}

ml := this.msglen()

if err := this.SetRemainingLength(int32(ml)); err != nil {
Expand Down Expand Up @@ -331,10 +361,20 @@ func (this *ConnectMessage) Decode(src []byte) (int, error) {
}
total += n

this.dirty = false

return total, nil
}

func (this *ConnectMessage) Encode(dst []byte) (int, error) {
if !this.dirty {
if len(dst) < len(this.dbuf) {
return 0, fmt.Errorf("connect/Encode: Insufficient buffer size. Expecting %d, got %d.", len(this.dbuf), len(dst))
}

return copy(dst, this.dbuf), nil
}

if this.Type() != CONNECT {
return 0, fmt.Errorf("connect/Encode: Invalid message type. Expecting %d, got %d", CONNECT, this.Type())
}
Expand Down Expand Up @@ -530,12 +570,6 @@ func (this *ConnectMessage) decodeMessage(src []byte) (int, error) {
}
}

/*
if len(src[total:]) > 0 {
return total, fmt.Errorf("connect/decodeMessage: Invalid buffer size. Still has %d bytes at the end.", len(src[total:]))
}
*/

return total, nil
}

Expand Down
10 changes: 10 additions & 0 deletions disconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package message

import "fmt"

// The DISCONNECT Packet is the final Control Packet sent from the Client to the Server.
// It indicates that the Client is disconnecting cleanly.
type DisconnectMessage struct {
Expand All @@ -35,5 +37,13 @@ func (this *DisconnectMessage) Decode(src []byte) (int, error) {
}

func (this *DisconnectMessage) Encode(dst []byte) (int, error) {
if !this.dirty {
if len(dst) < len(this.dbuf) {
return 0, fmt.Errorf("disconnect/Encode: Insufficient buffer size. Expecting %d, got %d.", len(this.dbuf), len(dst))
}

return copy(dst, this.dbuf), nil
}

return this.header.encode(dst)
}
Loading

0 comments on commit 19e7a4f

Please sign in to comment.