Skip to content

Commit

Permalink
Merge pull request #3 from CalvoM/client
Browse files Browse the repository at this point in the history
Client can disconnect with SIGINT
  • Loading branch information
CalvoM authored Dec 5, 2020
2 parents 7af5938 + 34e2d54 commit 1143f0d
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 86 deletions.
52 changes: 42 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"fmt"
"log"
"net"
"os"
"os/signal"
"sync"
"syscall"
"time"
)

Expand All @@ -15,16 +18,17 @@ const (

//Client implements the mqtt client
type Client struct {
Host string
Port string
options *ConnectOptions
conn net.Conn
done chan int
readPong chan bool
commLock sync.Mutex
topic string
qos QOS
OnMessage MessageHandler
Host string
Port string
options *ConnectOptions
conn net.Conn
done chan int
readPong chan bool
getSignals chan os.Signal
commLock sync.Mutex
topic string
qos QOS
OnMessage MessageHandler
}

type client interface {
Expand All @@ -46,6 +50,8 @@ func (c *Client) Init(options *ConnectOptions) {
c.options = options
c.done = make(chan int)
c.readPong = make(chan bool)
c.getSignals = make(chan os.Signal)
signal.Notify(c.getSignals, syscall.SIGINT)
addr := net.JoinHostPort(c.Host, c.Port)
c.conn, err = net.Dial("tcp", addr)
if err != nil {
Expand Down Expand Up @@ -102,6 +108,10 @@ func (c *Client) AwaitMessages() {
var i int
for {
select {
case <-c.done:
c.conn.Close()
os.Exit(0)
break
case pong := <-c.readPong:
for pong == true {
pong = <-c.readPong
Expand Down Expand Up @@ -144,6 +154,8 @@ func (c *Client) SendPing() {
select {
case <-c.done:
c.conn.Close()
os.Exit(0)
break
default:
time.Sleep(d * time.Second)
c.readPong <- true
Expand Down Expand Up @@ -198,6 +210,7 @@ func (c *Client) Subscribe(topic string, qos QOS) error {
}
go c.SendPing()
go c.AwaitMessages()
go c.handleSignals()
return nil
}

Expand Down Expand Up @@ -239,3 +252,22 @@ func (c *Client) DecodePublish(data []byte) (MessageData, error) {
msg.Payload = string(message)
return msg, nil
}

//Disconnect send disconnect packet
func (c *Client) Disconnect() error {
pkt := packet{}
pkt.configureDisconnect()
sendBytes := pkt.FormulateMQTTOutputData()
c.conn.Write(sendBytes)
return nil
}

func (c *Client) handleSignals() {
select {
case signal := <-c.getSignals:
if signal == syscall.SIGINT {
c.Disconnect()
c.done <- 1
}
}
}
158 changes: 82 additions & 76 deletions packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type packet struct {
variableHeader []byte
payload []byte
}

//ConnectOptions These correspond to the connect flags
type ConnectOptions struct {
Username string
Expand All @@ -19,27 +20,28 @@ type ConnectOptions struct {
WillFlag bool
CleanSession bool
KeepAlive uint16
WillTopic string
WillMessage string
ClientId string
WillTopic string
WillMessage string
ClientId string
}

//FormulateMQTTOutputData
func(pkt *packet) FormulateMQTTOutputData()[]byte{
func (pkt *packet) FormulateMQTTOutputData() []byte {
var s []byte
s = append(s,pkt.fixedHeader...)
s = append(s,pkt.variableHeader...)
s = append(s,pkt.payload...)
s = append(s, pkt.fixedHeader...)
s = append(s, pkt.variableHeader...)
s = append(s, pkt.payload...)
return s
}

func (pkt *packet) configureConnectPackets(options *ConnectOptions) {
pkt.fixedHeader = append(pkt.fixedHeader,ControlPktConnect)
pkt.fixedHeader = append(pkt.fixedHeader, ControlPktConnect)
//Protocol Name
pkt.variableHeader = append(pkt.variableHeader,0x0)
pkt.variableHeader = append(pkt.variableHeader,byte(len("MQTT")))
pkt.variableHeader=append(pkt.variableHeader,[]byte("MQTT")...)
pkt.variableHeader = append(pkt.variableHeader, 0x0)
pkt.variableHeader = append(pkt.variableHeader, byte(len("MQTT")))
pkt.variableHeader = append(pkt.variableHeader, []byte("MQTT")...)
//Protocol Level
pkt.variableHeader= append(pkt.variableHeader,byte(MqttProtocolLvl))
pkt.variableHeader = append(pkt.variableHeader, byte(MqttProtocolLvl))
//Connect Flags
connectFlag := byte(0)
if options.CleanSession {
Expand All @@ -66,90 +68,94 @@ func (pkt *packet) configureConnectPackets(options *ConnectOptions) {
connectFlag |= FlagPassword
}
}
pkt.variableHeader=append(pkt.variableHeader,connectFlag)
pkt.variableHeader = append(pkt.variableHeader, connectFlag)
//KeepAlive bytes
var keepAlive uint16 = options.KeepAlive
k:=make([]byte,2)
binary.BigEndian.PutUint16(k,keepAlive)
pkt.variableHeader = append(pkt.variableHeader,k...)
k := make([]byte, 2)
binary.BigEndian.PutUint16(k, keepAlive)
pkt.variableHeader = append(pkt.variableHeader, k...)
//configure the payload
pkt.SetConnectPayload(options)
length:=len(pkt.variableHeader)+len(pkt.payload)
remLength:=encodeRemainingLength(uint64(length))
pkt.fixedHeader = append(pkt.fixedHeader,remLength...)
length := len(pkt.variableHeader) + len(pkt.payload)
remLength := encodeRemainingLength(uint64(length))
pkt.fixedHeader = append(pkt.fixedHeader, remLength...)
}

func(pkt *packet) SetConnectPayload(options *ConnectOptions){
func (pkt *packet) SetConnectPayload(options *ConnectOptions) {
var data uint16 = uint16(len(options.ClientId))
b :=make([]byte,2)
binary.BigEndian.PutUint16(b,data)
pkt.payload = append(pkt.payload,b...)
pkt.payload = append(pkt.payload,[]byte(options.ClientId)...)
if options.WillFlag{
b := make([]byte, 2)
binary.BigEndian.PutUint16(b, data)
pkt.payload = append(pkt.payload, b...)
pkt.payload = append(pkt.payload, []byte(options.ClientId)...)
if options.WillFlag {
data = uint16(len(options.WillTopic))
binary.BigEndian.PutUint16(b,data)
pkt.payload = append(pkt.payload,b...)
pkt.payload = append(pkt.payload,[]byte(options.WillTopic)...)
binary.BigEndian.PutUint16(b, data)
pkt.payload = append(pkt.payload, b...)
pkt.payload = append(pkt.payload, []byte(options.WillTopic)...)
data = uint16(len(options.WillMessage))
binary.BigEndian.PutUint16(b,data)
pkt.payload = append(pkt.payload,b...)
pkt.payload = append(pkt.payload,[]byte(options.WillMessage)...)
binary.BigEndian.PutUint16(b, data)
pkt.payload = append(pkt.payload, b...)
pkt.payload = append(pkt.payload, []byte(options.WillMessage)...)
}
if len(options.Username)>0{
if len(options.Username) > 0 {
data = uint16(len(options.Username))
binary.BigEndian.PutUint16(b,data)
pkt.payload = append(pkt.payload,b...)
pkt.payload = append(pkt.payload,[]byte(options.Username)...)
if len(options.Password)>0{
binary.BigEndian.PutUint16(b, data)
pkt.payload = append(pkt.payload, b...)
pkt.payload = append(pkt.payload, []byte(options.Username)...)
if len(options.Password) > 0 {
data = uint16(len(options.Password))
binary.BigEndian.PutUint16(b,data)
pkt.payload = append(pkt.payload,b...)
pkt.payload = append(pkt.payload,[]byte(options.Password)...)
binary.BigEndian.PutUint16(b, data)
pkt.payload = append(pkt.payload, b...)
pkt.payload = append(pkt.payload, []byte(options.Password)...)
}
}

}
func (pkt *packet) configureSubscribePackets(topic string, qos QOS){
pkt.fixedHeader = append(pkt.fixedHeader,ControlPktSubscribe|0x02)
func (pkt *packet) configureSubscribePackets(topic string, qos QOS) {
pkt.fixedHeader = append(pkt.fixedHeader, ControlPktSubscribe|0x02)
var packetId uint16 = PacketIdentifier
k:=make([]byte,2)
binary.BigEndian.PutUint16(k,packetId)
pkt.variableHeader = append(pkt.variableHeader,k...)
n:=make([]byte,2)
k := make([]byte, 2)
binary.BigEndian.PutUint16(k, packetId)
pkt.variableHeader = append(pkt.variableHeader, k...)
n := make([]byte, 2)
var topicLen uint16 = uint16(len(topic))
binary.BigEndian.PutUint16(n,topicLen)
pkt.payload = append(pkt.payload,n...)
pkt.payload = append(pkt.payload,[]byte(topic)...)
pkt.payload = append(pkt.payload,byte(qos))
length:=len(pkt.variableHeader)+len(pkt.payload)
remLength:=encodeRemainingLength(uint64(length))
pkt.fixedHeader = append(pkt.fixedHeader,remLength...)
binary.BigEndian.PutUint16(n, topicLen)
pkt.payload = append(pkt.payload, n...)
pkt.payload = append(pkt.payload, []byte(topic)...)
pkt.payload = append(pkt.payload, byte(qos))
length := len(pkt.variableHeader) + len(pkt.payload)
remLength := encodeRemainingLength(uint64(length))
pkt.fixedHeader = append(pkt.fixedHeader, remLength...)
}
func (pkt *packet) configurePingRequest(){
pkt.fixedHeader = append(pkt.fixedHeader,ControlPktPingReq)
pkt.fixedHeader = append(pkt.fixedHeader,0)
func (pkt *packet) configurePingRequest() {
pkt.fixedHeader = append(pkt.fixedHeader, ControlPktPingReq)
pkt.fixedHeader = append(pkt.fixedHeader, 0)
}
func (pkt *packet) configurePublish(topic string,message string,dup bool,qos QOS){
fixed:=ControlPktPublish
if dup{
fixed|=0x08
func (pkt *packet) configurePublish(topic string, message string, dup bool, qos QOS) {
fixed := ControlPktPublish
if dup {
fixed |= 0x08
}
if qos == QOS1{
fixed|=0x02
}else if qos == QOS2{
fixed|=0x06
if qos == QOS1 {
fixed |= 0x02
} else if qos == QOS2 {
fixed |= 0x06
}
pkt.fixedHeader = append(pkt.fixedHeader,byte(fixed))
k:=make([]byte,2)
binary.BigEndian.PutUint16(k,uint16(len(topic)))
pkt.variableHeader = append(pkt.variableHeader,k...)
pkt.variableHeader = append(pkt.variableHeader,[]byte(topic)...)
k=make([]byte,2)
pkt.fixedHeader = append(pkt.fixedHeader, byte(fixed))
k := make([]byte, 2)
binary.BigEndian.PutUint16(k, uint16(len(topic)))
pkt.variableHeader = append(pkt.variableHeader, k...)
pkt.variableHeader = append(pkt.variableHeader, []byte(topic)...)
k = make([]byte, 2)
var packetId uint16 = PacketIdentifier
binary.BigEndian.PutUint16(k,packetId)
pkt.variableHeader = append(pkt.variableHeader,k...)
pkt.variableHeader = append(pkt.variableHeader,[]byte(message)...)
length:=len(pkt.variableHeader)+len(pkt.payload)
remLength:=encodeRemainingLength(uint64(length))
pkt.fixedHeader = append(pkt.fixedHeader,remLength...)
}
binary.BigEndian.PutUint16(k, packetId)
pkt.variableHeader = append(pkt.variableHeader, k...)
pkt.variableHeader = append(pkt.variableHeader, []byte(message)...)
length := len(pkt.variableHeader) + len(pkt.payload)
remLength := encodeRemainingLength(uint64(length))
pkt.fixedHeader = append(pkt.fixedHeader, remLength...)
}
func (pkt *packet) configureDisconnect() {
pkt.fixedHeader = append(pkt.fixedHeader, ControlPktDisconnect)
pkt.fixedHeader = append(pkt.fixedHeader, 0)
}

0 comments on commit 1143f0d

Please sign in to comment.