最近因公司業務在玩一套相對新的 MQ: NATS。
因為官方文件不慎清楚且有些地方與直覺不同,造成起步緩慢。
以下簡單紀錄一下剛入門時應知道的事情。
Guide
相較 RabbitMQ, Kafka 等,NATS 是一套較為年輕的 MQ。
雖然有部分子專案的版本未達 v1.0,但官方宣稱已經接近 production ready。
NATS 從一開始就是針對 cloud service 設計,cluster mode 的水平擴展,
node 之間的身分驗證及 TLS 通訊設計看起來都還不錯。
NATS 的 message 並無特別限制,在 client library 內任何的 byte sequence 都可以成為 message。
NATS 有以下三個模式(以及其對應的 client library)。
NATS (NATS Core)
NATS 專案從一開始發展時的基本模式。 支援 Pub/Sub pattern 並提供 at-most-once
語意。
NATS Streaming
NATS Streaming 是一套疊在 NATS 上面形成的 solution。
因為設計上的問題,後來又有了 JetStream,所以我們基本上不用理它,只要知道 NATS Streaming 和
JetStream 不一樣,翻文件的時候不要翻錯即可。
JetStream
JetStream 是後來做在 NATS 內,可選擇是否啟用的子系統。 藉由 JetStream,
可以實作 Producer/Consumer pattern 並提供 at-least-once
語意。
Server side 沒什麼需要注意的,只要用較新版的 NATS image 並啟用設定即可。
Client 開發則需要注意一些概念。
- Subject: NATS 最初的概念,代表一些 message 的集合。
- Stream: 建立於一或多個 Subject 之上,可將這些 subject 內的 message 統整起來,並放入 persistent storage。
- Consumer: 建立在某個 Stream 之下,可以依序的 consume 屬於此 stream 的特定 message。
需要注意的是,不只 Subject 與 Stream,Consumer 本身也是建立在 NATS server 中的一個物件。
當利用 client library create 一個 Consumer 時,並不是該 process 本身成為一個 consumer,
而是 NATS server 中被創了一個 Consumer 物件,準備去使用 Stream 裡面的 message。
JetStream client library 並沒有提供一個對稱的 producer/consumer API。
基於術語的限制以及為了避免誤會,以下在稱呼一般所稱的 producer/consumer 時,
會特別加上 role 後綴來表示。
Producer role: 要使用 NATS library 內的 Publish API,將產生的 message 推送至
某個 Subject 內。
Consumer role: 要使用 JetStream library 內的 Stream API,在 NATS server 上對目標
Subject 建立 Stream,接著使用 JetStream Consumer API,在 NATS server 中
建立屬於該 Stream 的 Consumer。以上都完成之後,即可利用 Consumer 上的 NextMsg
來
消耗 message。
Conclusion
JetStream 的 API 設計並不常見,需要先認知到與既有設計的差別之處才能開始開發。
不過其 cloud native 的架構設計或許可以在維運上面勝過其他老牌的 MQ solution。
今天就先寫到這裡,如果有哪天有興趣再補吧。 :D
Reference
Appendix
Golang sample code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
| package main
import ( "context" "fmt" "log" "math/rand" "os" "testing" "time"
"github.com/nats-io/jsm.go" "github.com/nats-io/jsm.go/api" "github.com/nats-io/nats.go" )
var fullSubject string = "report_task.scheduled" var wildcardSubject string = "report_task.*"
func consumeOne(doneChan chan bool) { msg, err := consumer.NextMsg() if err != nil { fmt.Printf("Fail to get message: %v\n", err) } fmt.Printf("Consume get task: %s\n", string(msg.Data)) time.Sleep(2 * time.Second) if err := msg.Ack(); err != nil { fmt.Printf("Fail to ack the message %s: %v\n", string(msg.Data), err) } doneChan <- true }
func TestProduceAndConsume(t *testing.T) { producerStopChan := make(chan bool) consumerStopChan := make(chan bool) var taskCount int = 1
go func() { for idx := 0; idx < taskCount; idx++ { taskName := fmt.Sprintf("task #%d", rand.Int()) nc.Publish(fullSubject, []byte(taskName))
fmt.Printf("Producer produce: %s\n", taskName) } producerStopChan <- true }()
for idx := 0; idx < taskCount; idx++ { go func() { consumeOne(consumerStopChan) }() }
<-producerStopChan for idx := 0; idx < taskCount; idx++ { <-consumerStopChan }
fmt.Println("Done") }
var ctx context.Context var cancel context.CancelFunc var nc *nats.Conn var stream *jsm.Stream var consumer *jsm.Consumer
func setup() { var err error
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) nc, err = nats.Connect(nats.DefaultURL, nats.UserInfo("name", "password"), nats.UseOldRequestStyle()) if err != nil { log.Fatal(err) }
jsmgr, err := jsm.New(nc) if err != nil { log.Fatal(err) }
streamName := "ReportTask" stream, err = jsmgr.LoadOrNewStreamFromDefault(streamName, api.StreamConfig{ Subjects: []string{wildcardSubject}, Storage: api.FileStorage, Retention: api.LimitsPolicy, Discard: api.DiscardOld, MaxConsumers: -1, MaxMsgs: -1, MaxBytes: -1, MaxAge: 24 * time.Hour, MaxMsgSize: -1, Replicas: 1, NoAck: false, }) if err != nil { log.Fatal(err) }
consumerName := "Generator" consumer, err = stream.LoadOrNewConsumerFromDefault(consumerName, api.ConsumerConfig{ Durable: consumerName, DeliverPolicy: api.DeliverNew, FilterSubject: fullSubject, AckPolicy: api.AckExplicit, AckWait: 30 * time.Second, MaxDeliver: 5, ReplayPolicy: api.ReplayInstant, SampleFrequency: "0%", }) if err != nil { log.Fatal(err) }
}
func shutdown() { cancel() }
func TestMain(m *testing.M) { setup() code := m.Run() shutdown() os.Exit(code) }
|