golang Mqtt服务端

2022/4/21 6:16:36

本文主要是介绍golang Mqtt服务端,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

package main

import (
    "context"
    "fmt"
    "net"
    _ "net/http/pprof"
    "os"
    "os/signal"
    "syscall"
    "time"

    "go.uber.org/zap"

    "github.com/DrmagicE/gmqtt"
    _ "github.com/DrmagicE/gmqtt/persistence"
    "github.com/DrmagicE/gmqtt/persistence/subscription"
    "github.com/DrmagicE/gmqtt/pkg/packets"
    "github.com/DrmagicE/gmqtt/server"
    _ "github.com/DrmagicE/gmqtt/topicalias/fifo"
)

func main() {

    ln, err := net.Listen("tcp", ":1883")
    if err != nil {
        fmt.Println(err.Error())
        return
    }
    cfg := zap.NewDevelopmentConfig()
    cfg.Level.SetLevel(zap.InfoLevel)
    l, _ := cfg.Build()
    srv := server.New(
        server.WithTCPListener(ln),
        server.WithLogger(l),
    )

    var subService server.SubscriptionService
    err = srv.Init(server.WithHook(server.Hooks{
        OnConnected: func(ctx context.Context, client server.Client) {
            // add subscription for a client when it is connected
            subService.Subscribe(client.ClientOptions().ClientID, &gmqtt.Subscription{
                TopicFilter: "topic",
                QoS:         packets.Qos0,
            })
        },
    }))
    subService = srv.SubscriptionService()

    if err != nil {
        fmt.Println(err.Error())
        return
    }

    // retained service
    retainedService := srv.RetainedService()

    // publish service
    pub := srv.Publisher()

    // add a retained message
    retainedService.AddOrReplace(&gmqtt.Message{
        QoS:      packets.Qos1,
        Retained: true,
        Topic:    "a/b/c",
        Payload:  []byte("retained message"),
    })

    // publish service
    go func() {
        for {
            <-time.NewTimer(5 * time.Second).C
            // iterate all topics
            subService.Iterate(func(clientID string, sub *gmqtt.Subscription) bool {
                fmt.Printf("client id: %s, topic: %v \n", clientID, sub.TopicFilter)
                return true
            }, subscription.IterationOptions{
                Type: subscription.TypeAll,
            })
            // publish a message to the broker
            pub.Publish(&gmqtt.Message{
                Topic:   "topic",
                Payload: []byte("abc"),
                QoS:     packets.Qos1,
            })

        }

    }()

    go func() {
        signalCh := make(chan os.Signal, 1)
        signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
        <-signalCh
        srv.Stop(context.Background())
    }()
    err = srv.Run()
    if err != nil {
        panic(err)
    }

}

 



这篇关于golang Mqtt服务端的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程