Golang 1.23 Iterator Functions

For a long long long time, Golang have no standard way to represent a iterable sequence.

C++ has range adaptor and iterator (although not strictly typed, only by concept), Python has iterable/iterator by __iter__/__next__, JavaScript has standardized for-of and Symbol.iterator since ES6.

Now it’s time for Golang. Starting from Golang 1.23 Aug., we have iterator functions.

How It Works.

Sample code explains faster.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func Iota() func(yield func(idx int) bool) {
return func(yield func(idx int) bool) {
idx := 0
for {
if !yield(idx) {
return
}
idx += 1
}
}
}

func main() {
for idx := range Iota() {
if idx == 3 {
break
}
fmt.Println(idx) // print 0 1 2
}
}

According to Go 1.23 Release Notes Now the range keyword accept three kinds of functions, for which takes another yield function that yield zero/one/two values.

1
2
3
func(func() bool)
func(func(K) bool)
func(func(K, V) bool)

The loop control variable and the body of the for-loop is translated into the yield function by language definition. So we can still write imperative-style loop structure even though we are actually doing some functional-style function composition here.

Why Do We Need This?

Standardize the iterable/iterator interface is a important pre-condition for lazy evaluation. For example, how should we do when we need to iterates through all non-negative integer, and doing some map/filter/reduce on them? It waste space to allocate a list for all these integers (if possible).

Someone may say “we already have channel types”. Well, but that requires a separate coroutine instance. We probably don’t want such heavy cost every time we are doing some iterate operations.

Also a separate coroutine means additional synchronization and lifecycle control. For example, how can we terminate the Count coroutine when we need early break in loop?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func Count(start int) chan int {
output := make(chan int)
go func() {
idx := start
for {
output <- idx
idx += 1
}
}()
return output
}

func main() {
for idx := range Count(0) {
if idx == 10 {
break
}
fmt.Println("Loop: ", idx)
}
}

We need some mechanism like context object or another channel right? That’s a burden for such easy task here.

On the other hand, iterator functions are just ordinary function that accept another function to yield/output the iterated values, so it’s much lightweight than a separate coroutine. We want fast program, right? :D

The Stop Design

For languages like Python and JavaScript, the iterator function (or generator in Python terms) is paused and the control is transfer back to the function that iterates the values. When break/return happens and no more value are required, the iterator function just got collected by the runtime since that there are no more references to the function object.

But how do we early break the iteration process, if the control is transfer into the iterator function? Let’s look at the function signature again. (Take one value iterator function for example).

1
func(yield func(idx int) bool)

The yield function returns a bool to indicate that whether the loop body does reach the end, or encounter a break statement. So in normal case, we continue to next possible value after yield return, but if we got false from yield, our iterator function can return immediately.

Ecosystem around Iterator

The beauty of iterator only appears if the ecosystem, or we say, the common operations around iterator are already implemented in standard library. That means:

  • Conversion from and to standard container types, like slice map and chan
  • Operations and compositions of iterators, e.g. map/filter/reduce/chain/take

In Python, there are generator expressions, which evolves implicit map/filter. reduce is a function at global scope, also there are many useful functions in itertools package, e.g. pairwise, batched, chain. Most builtin container types takes iterable as first argument in it’s constructor.

In Golang, the first part is mostly done along the release of Golang 1.23. For example, to convert slice from and to iterator, we can use slices.Collect and slices.Values.

For second part, there is a plan to add x/exp/xiter package under golang.org namespace. There should be at least Concat, Map, Filter, Reduce, Zip … once it’s released. But unfortunately it’s not compete yet.

See: iter: new package for iterators · Issue #61897 · golang/go

Also I create a toy package github.com/wdhongtw/mice/flow to address some important building wheel around iterators

  • Empty/Pack return a iterator for zero/one value
  • Any/All short-circuit lazy evaluation of a predicate on a sequence of values
  • Forward/Backward absorb input and iterate in reversed order.

For example, if we want to define a iterator function for a binary tree in recursive manner, we can use Empty and Pack together with Chain to implement this easily.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Node struct {
val int
left *Node
right *Node
}

func Traverse(node *Node) iter.Seq[int] {
// Empty is useful as base case during recursive generator chaining.
if node == nil {
return Empty[int]()
}
// Pack is useful to promote a single value into a iterable for chaining.
return Chain(
Traverse(node.left),
Pack(node.val),
Traverse(node.right),
)
}

Looks cool, doesn’t it? :D

Golang 1.18 Generics 終於來臨

今天 Golang 1.18 終於正式釋出啦! 我們終於有辦法在 Golang 裡做 generic programming 啦!

Golang 是一個近 10 年快速竄紅的程式語言,但在很多面向其實還是非常土炮。 得靠後續社群不斷的討論與貢獻才達到一個比較完善的水準。 像是..

  • context package in Golang 1.7: 解決 long job cancelling 的問題
  • errors package in Golang 1.13: 滿足其他語言常見的 error 嵌套需求和提供統一的判斷方式
  • Generic support in Golang 1.18: 提供開發者實作各種型無關演算法的機會

一直以來,在 Golang 的標準函式庫中,碰到類型無關的抽象問題時,最後給出來的解法大概就兩種

  1. 所有 input / output 參數都定義成 interface{},大家一起把型別檢查往 run-time 丟
  2. 同一個 class / function 對於常用的 data type 通通實作一遍

前者最典型的大概就是 sync 這個函示庫,後者.. 大家應該都看過那個慘不忍睹的 sort..

不過這些都是過去式了,從今天開始,大家都可以寫自己想要的 generic code / library / framework。 :D

Usage

基本語法很簡單,只要在想做成 generic 的 function / struct 後面多加一個 [T TypeName] 即可, TypeName 是用原本就有的 interface{...} 語法來表示,可以自己描述這個 generic function / struct 支援的型態必須滿足什麼樣的介面。

以 Python style 的 sort by key 當例子。 我們可以定義一個 generic 的 sort function,並且明定送進來的 list 內的每個元素需要支援一個 Key 函示, 作為排序時的根據。

範例 code 如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

type Keyable interface {
Key() int
}

func Sort[T Keyable](items []T) []T {
if len(items) <= 1 {
return items
}

pivot := items[0]
less, greater := []T{}, []T{}
for _, item := range items[1:] {
if item.Key() < pivot.Key() {
less = append(less, item)
} else {
greater = append(greater, item)
}
}

return append(append(less, pivot), greater...)
}

type Person struct {
Name string
Age int
}

func (n Person) Key() int {
return n.Age
}

func main() {
persons := []Person{{Name: "alice", Age: 33}, {Name: "bob", Age: 27}}

persons = Sort(persons)
}

在這個範例中,Sort 要求送進來的 []T 當中的 T 要實作 Keyable 介面 (提供 Key method)。 當我們想排序一堆 Person 時,我們可以在這個 Person 物件上定義 Key method,取出 Person 的年齡。 完成之後,我們就可以依年齡來排序 []Person 了。

期許自己未來可以多加利用這個遲來的功能.. XD

References

Google 更新 Go 的社群行為準則

昨天 Go blog 上出新文章,說要更新 Code of Conduct。

一直一來覺得每個社群的 CoC 都寫得差不多,不外乎是要互相尊重、開放透明、建設性發言等等。 也因為都差不多,平常也不會去細看。反正就是些正常人該有的道德觀。 因此,看到說要更新 Code of Conduct 讓我感到有點好奇。

讀一讀讀下去,其實這次 Go community 的 CoC 就是新增一條:

Be responsible. What you say and do matters. Take responsibility …

沒想到連這個都要寫進 CoC …。可能 Go 的核心開發團隊看 issue 真的看到心累了? XD

See: Code of Conduct Updates - go.dev

NATS 與 JetStream 簡易介紹

最近因公司業務在玩一套相對新的 MQ: NATS。 因為官方文件不慎清楚且有些地方與直覺不同,造成起步緩慢。

以下簡單紀錄一下剛入門時應知道的事情。

Guide

相較 RabbitMQ, Kafka 等,NATS 是一套較為年輕的 MQ。 雖然有部分子專案的版本未達 v1.0,但官方宣稱已經接近 production ready。

NATS 從一開始就是針對 cloud service 設計,cluster mode 的水平擴展, node 之間的身分驗證及 TLS 通訊設計看起來都還不錯。

NATS 的 message 並無特別限制,在 client library 內任何的 byte sequence 都可以成為 message。

NATS 有以下三個模式(以及其對應的 client library)。

NATS (NATS Core)

NATS 專案從一開始發展時的基本模式。 支援 Pub/Sub pattern 並提供 at-most-once 語意。

NATS Streaming

NATS Streaming 是一套疊在 NATS 上面形成的 solution。

因為設計上的問題,後來又有了 JetStream,所以我們基本上不用理它,只要知道 NATS Streaming 和 JetStream 不一樣,翻文件的時候不要翻錯即可。

JetStream

JetStream 是後來做在 NATS 內,可選擇是否啟用的子系統。 藉由 JetStream, 可以實作 Producer/Consumer pattern 並提供 at-least-once 語意。

Server side 沒什麼需要注意的,只要用較新版的 NATS image 並啟用設定即可。 Client 開發則需要注意一些概念。

  • Subject: NATS 最初的概念,代表一些 message 的集合。
  • Stream: 建立於一或多個 Subject 之上,可將這些 subject 內的 message 統整起來,並放入 persistent storage。
  • Consumer: 建立在某個 Stream 之下,可以依序的 consume 屬於此 stream 的特定 message。

需要注意的是,不只 Subject 與 Stream,Consumer 本身也是建立在 NATS server 中的一個物件。 當利用 client library create 一個 Consumer 時,並不是該 process 本身成為一個 consumer, 而是 NATS server 中被創了一個 Consumer 物件,準備去使用 Stream 裡面的 message。

JetStream client library 並沒有提供一個對稱的 producer/consumer API。 基於術語的限制以及為了避免誤會,以下在稱呼一般所稱的 producer/consumer 時, 會特別加上 role 後綴來表示。

Producer role: 要使用 NATS library 內的 Publish API,將產生的 message 推送至 某個 Subject 內。

Consumer role: 要使用 JetStream library 內的 Stream API,在 NATS server 上對目標 Subject 建立 Stream,接著使用 JetStream Consumer API,在 NATS server 中 建立屬於該 Stream 的 Consumer。以上都完成之後,即可利用 Consumer 上的 NextMsg 來 消耗 message。

Conclusion

JetStream 的 API 設計並不常見,需要先認知到與既有設計的差別之處才能開始開發。 不過其 cloud native 的架構設計或許可以在維運上面勝過其他老牌的 MQ solution。

今天就先寫到這裡,如果有哪天有興趣再補吧。 :D

Reference

Appendix

Golang sample code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package main

import (
"context"
"fmt"
"log"
"math/rand"
"os"
"testing"
"time"

"github.com/nats-io/jsm.go"
"github.com/nats-io/jsm.go/api"
"github.com/nats-io/nats.go"
)

var fullSubject string = "report_task.scheduled"
var wildcardSubject string = "report_task.*"

func consumeOne(doneChan chan bool) {
msg, err := consumer.NextMsg()
if err != nil {
fmt.Printf("Fail to get message: %v\n", err)
}
fmt.Printf("Consume get task: %s\n", string(msg.Data))
time.Sleep(2 * time.Second)
if err := msg.Ack(); err != nil {
fmt.Printf("Fail to ack the message %s: %v\n", string(msg.Data), err)
}
doneChan <- true
}

func TestProduceAndConsume(t *testing.T) {
producerStopChan := make(chan bool)
consumerStopChan := make(chan bool)
var taskCount int = 1

go func() {
for idx := 0; idx < taskCount; idx++ {
taskName := fmt.Sprintf("task #%d", rand.Int())
nc.Publish(fullSubject, []byte(taskName))

fmt.Printf("Producer produce: %s\n", taskName)
}
producerStopChan <- true
}()

for idx := 0; idx < taskCount; idx++ {
go func() {
consumeOne(consumerStopChan)
}()
}

<-producerStopChan
for idx := 0; idx < taskCount; idx++ {
<-consumerStopChan
}

fmt.Println("Done")
}

var ctx context.Context
var cancel context.CancelFunc
var nc *nats.Conn
var stream *jsm.Stream
var consumer *jsm.Consumer

func setup() {
var err error

ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
nc, err = nats.Connect(nats.DefaultURL, nats.UserInfo("name", "password"), nats.UseOldRequestStyle())
if err != nil {
log.Fatal(err)
}

jsmgr, err := jsm.New(nc)
if err != nil {
log.Fatal(err)
}

streamName := "ReportTask"
stream, err = jsmgr.LoadOrNewStreamFromDefault(streamName,
api.StreamConfig{
Subjects: []string{wildcardSubject},
Storage: api.FileStorage,
Retention: api.LimitsPolicy,
Discard: api.DiscardOld,
MaxConsumers: -1,
MaxMsgs: -1,
MaxBytes: -1,
MaxAge: 24 * time.Hour,
MaxMsgSize: -1,
Replicas: 1,
NoAck: false,
})
if err != nil {
log.Fatal(err)
}

consumerName := "Generator"
consumer, err = stream.LoadOrNewConsumerFromDefault(consumerName,
api.ConsumerConfig{
Durable: consumerName,
DeliverPolicy: api.DeliverNew,
FilterSubject: fullSubject,
AckPolicy: api.AckExplicit,
AckWait: 30 * time.Second,
MaxDeliver: 5,
ReplayPolicy: api.ReplayInstant,
SampleFrequency: "0%",
})
if err != nil {
log.Fatal(err)
}

}

func shutdown() {
cancel()
}

func TestMain(m *testing.M) {
setup()
code := m.Run()
shutdown()
os.Exit(code)
}