Go 安装与使用 mqtt

1. 安装mqtt

  1. 引入mosquitto仓库并更新
    sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
    sudo apt-get update
  2. 执行以下命令安装mosquitto包
    sudo apt-get install mosquitto
  3. 安装mosquitto开发包
    sudo apt-get install mosquitto-dev
  4. 安装mosquitto客户端
    sudo apt-get install mosquitto-clients
  5. 查询mosquitto是否正确运行
    sudo service mosquitto status 

    active则为正常运行

2. 测试

2.1 注册一个top进行接收

mosquitto_sub -h localhost -t "mqtt" -v

2.2 发布消息到刚注册的top

需要另开一个终端,当执行完下面命令后,会在上一个终端出打印出我们发布的消息

mosquitto_pub -h localhost -t "mqtt" -m "Hello MQTT"

3. 配置Mqtt

设置一下用户名和密码,这样我们的mqtt才会比较安全。mqtt总配置在/etc/mosquitto/mosquitto.conf

在该文件末尾添加下面三个配置

# 关闭匿名
allow_anonymous false
# 设置用户名和密码
password_file /etc/mosquitto/pwfile
# 配置访问控制列表(topic和用户的关系)
acl_file /etc/mosquitto/acl
# 配置端口
port 8000

3.1 添加用户

添加一个用户,用户名为pibigstar,这个命令执行之后会让你输入密码,密码自己定义就好

sudo mosquitto_passwd -c /etc/mosquitto/pwfile pibigstar

3.2 添加Topic和用户的关系

新增acl文件

sudo vim /etc/mosquitto/acl

添加下面内容

# 用户test只能发布以test为前缀的主题
# 订阅以mqtt开头的主题
user test
topic write test/#
topic read mqtt/#

user pibigstar
topic write mqtt/#
topic write mqtt/#

3.3 重启mqtt

sudo /etc/init.d/mosquitto restart

3.4 测试

3.4.1 监听消费

mosquitto_sub -h 127.0.0.1 -p 8000 -t "mqtt" -v -u pibigstar -P 123456

3.4.2 发布消息

mosquitto_pub -h 127.0.0.1 -p 8000 -t "mqtt" -m "Hello MQTT" -u pibigstar -P 123456

4. Go语言使用Mqtt

package mqtt

import (
    "encoding/json"
    "errors"
    "fmt"
    "strings"
    "sync"
    "time"

    gomqtt "github.com/eclipse/paho.mqtt.golang"
)

const (
    Host     = "192.168.1.101:8000"
    UserName = "pibigstar"
    Password = "123456"
)

type Client struct {
    nativeClient  gomqtt.Client
    clientOptions *gomqtt.ClientOptions
    locker        *sync.Mutex
    // 消息收到之后处理函数
    observer func(c *Client, msg *Message)
}

type Message struct {
    ClientID string `json:"clientId"`
    Type     string `json:"type"`
    Data     string `json:"data,omitempty"`
    Time     int64  `json:"time"`
}

func NewClient(clientId string) *Client {
    clientOptions := gomqtt.NewClientOptions().
        AddBroker(Host).
        SetUsername(UserName).
        SetPassword(Password).
        SetClientID(clientId).
        SetCleanSession(false).
        SetAutoReconnect(true).
        SetKeepAlive(120 * time.Second).
        SetPingTimeout(10 * time.Second).
        SetWriteTimeout(10 * time.Second).
        SetOnConnectHandler(func(client gomqtt.Client) {
            // 连接被建立后的回调函数
            fmt.Println("Mqtt is connected!", "clientId", clientId)
        }).
        SetConnectionLostHandler(func(client gomqtt.Client, err error) {
            // 连接被关闭后的回调函数
            fmt.Println("Mqtt is disconnected!", "clientId", clientId, "reason", err.Error())
        })

    nativeClient := gomqtt.NewClient(clientOptions)

    return &Client{
        nativeClient:  nativeClient,
        clientOptions: clientOptions,
        locker:        &sync.Mutex{},
    }
}

func (client *Client) GetClientID() string {
    return client.clientOptions.ClientID
}

func (client *Client) Connect() error {
    return client.ensureConnected()
}

// 确保连接
func (client *Client) ensureConnected() error {
    if !client.nativeClient.IsConnected() {
        client.locker.Lock()
        defer client.locker.Unlock()
        if !client.nativeClient.IsConnected() {
            if token := client.nativeClient.Connect(); token.Wait() && token.Error() != nil {
                return token.Error()
            }
        }
    }
    return nil
}

// 发布消息
// retained: 是否保留信息
func (client *Client) Publish(topic string, qos byte, retained bool, data []byte) error {
    if err := client.ensureConnected(); err != nil {
        return err
    }

    token := client.nativeClient.Publish(topic, qos, retained, data)
    if err := token.Error(); err != nil {
        return err
    }

    // return false is the timeout occurred
    if !token.WaitTimeout(time.Second * 10) {
        return errors.New("mqtt publish wait timeout")
    }

    return nil
}

// 消费消息
func (client *Client) Subscribe(observer func(c *Client, msg *Message), qos byte, topics ...string) error {
    if len(topics) == 0 {
        return errors.New("the topic is empty")
    }

    if observer == nil {
        return errors.New("the observer func is nil")
    }

    if client.observer != nil {
        return errors.New("an existing observer subscribed on this client, you must unsubscribe it before you subscribe a new observer")
    }
    client.observer = observer

    filters := make(map[string]byte)
    for _, topic := range topics {
        filters[topic] = qos
    }
    client.nativeClient.SubscribeMultiple(filters, client.messageHandler)

    return nil
}

func (client *Client) messageHandler(c gomqtt.Client, msg gomqtt.Message) {
    if client.observer == nil {
        fmt.Println("not subscribe message observer")
        return
    }
    message, err := decodeMessage(msg.Payload())
    if err != nil {
        fmt.Println("failed to decode message")
        return
    }
    client.observer(client, message)
}

func decodeMessage(payload []byte) (*Message, error) {
    message := new(Message)
    decoder := json.NewDecoder(strings.NewReader(string(payload)))
    decoder.UseNumber()
    if err := decoder.Decode(&message); err != nil {
        return nil, err
    }
    return message, nil
}

func (client *Client) Unsubscribe(topics ...string) {
    client.observer = nil
    client.nativeClient.Unsubscribe(topics...)
}

4.1 测试

package mqtt

import (
    "encoding/json"
    "fmt"
    "sync"
    "testing"
    "time"
)

func TestMqtt(t *testing.T) {
    var (
        clientId = "pibigstar"
        wg       sync.WaitGroup
    )
    client := NewClient(clientId)
    err := client.Connect()
    if err != nil {
        t.Errorf(err.Error())
    }

    wg.Add(1)
    go func() {
        err := client.Subscribe(func(c *Client, msg *Message) {
            fmt.Printf("接收到消息: %+v \n", msg)
            wg.Done()
        }, 1, "mqtt")

        if err != nil {
            panic(err)
        }
    }()

    msg := &Message{
        ClientID: clientId,
        Type:     "text",
        Data:     "Hello Pibistar",
        Time:     time.Now().Unix(),
    }
    data, _ := json.Marshal(msg)

    err = client.Publish("mqtt", 1, false, data)
    if err != nil {
        panic(err)
    }

    wg.Wait()
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 6

大佬,因为才接触go和mqtt协议,不是很了解。 请问必须先在服务器上面安装了mqtt了才能使用吗? 我本地开发下载了mqtt的包能直接使用吗?以后迁移到服务器上面后需要安装吗?小白求解答 :heart: :heart: :heart:

4年前 评论

@james23 你把它想象成mysql就行了,go只是在操作mqtt,你如果想在服务器上用mqtt肯定要先装下mqtt这个服务的

4年前 评论
james23 3年前
james23 3年前
pibigstar (作者) (楼主) 3年前
james23 3年前
pibigstar (作者) (楼主) 3年前

大佬,请问下Subscribe这个函数的第二个参数 qos是啥意思

3年前 评论
pibigstar (楼主) 3年前

请问大佬这种如何封装成 RESTful API 可以发送 http 请求调用?

3年前 评论

@dengchi 你要请求需要的参数抽成一个结构体,在其之上加一层http router就可以了啊

3年前 评论

http 一般不都是一个请求过来然后给一个返回,这个进程就结束了。但咱们这个好像是不是守护进程?我比较菜描述不知道准不准确不要见怪 :grin:

file屏幕快照 2021-01-27 下午6.10.32

3年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!