Go/iter でPipelineパターンやってみた


Goのv1.23で追加されたiterパッケージでPipelineパターンをやってみる。

  • Pipelineパターンはiterと併用できる?
  • Pipelineパターンをiterと併用するメリットは?
  • Pipelineパターンやiterはどこで使うべき?

要件

アプリケーションのバックエンド開発を想定。 商品を単価と個数をrepositoryから取り出して、単価x個数の合計金額をserviceで計算、最後handlerで計算結果を確認という流れ。

実装

iterパターンの実装を確認。

type Repository struct {
	items [][2]int // [][2]int{price, num}
}

func (r Repository) Generate() iter.Seq2[int, [2]int] {
    // r.itemsはコンストラクタによってlenは100
	return slices.All(r.items)
}

type Service struct {
	r Repository
}

func (s Service) Iter() iter.Seq[int] {
	return func(yield func(int) bool) {
		for _, item := range s.r.Generate() {
			p := s.sumPrice(item[0], item[1])
			_ = yield(p)
		}
	}
}

type Router struct {
	s Service
}

func (r Router) HandleWithIter() {
	var count int
	for sum := range r.s.Iter() {
		_ = sum
		count++
	}
	if count != 100 {
		log.Println("Iter", count)
	}
}

func BenchmarkRouter(b *testing.B) {
	var router = Router{s: Service{r: NewRepository()}}
	b.Run("Iter", func(b *testing.B) {
		for i := 0; i < b.N; i++ {
			router.HandleWithIter()
		}
	})
}

iter

iterの処理イメージ。

  1. i = 0
  2. i < len(repository.items) 限り継続
  3. repository.items[i]repository から取り出し
  4. 合計値をserviceで計算
  5. routerで利用
  6. i++して 2. から繰り返す

だいたいこんな感じ。 階層化している分、複雑だけど、だいたいやっていることは、itemsをループして直列して処理をしてる。

Pipeline

A repositoryからserviceに渡す B serviceで合計値を計算して、routerに受け渡す C routerで利用する。

A,B,Cがそれぞれループして実行する。 カフェのオペレーションに例えれば、A売り子,B珈琲,C受け渡しのイメージ。 それぞれがそれぞれの仕事に集中して、前が詰まらない限り次の仕事に取り掛かる。

Fun-Out

A repositoryからserviceに渡す B serviceで合計値を計算して、routerに受け渡す x コア数 C routerで利用する。

pipelineパターンにBをCPUのコア数だけ並列して実行する。 カフェの例だと、B珈琲の人員をコア数だけ配置するイメージ。 Bも受け取り側が詰まればみんなの作業は止まってしまう。

実装

処理の流れを簡潔に見るため、ctxの考慮、iterがbreakされることを考慮しない実装をした。

コードサンプル: https://github.com/yyyoichi/iter-pipeline-sample

Pipeline

iter.Seq[int,int]を一度、チャネルで受け取る。 こうすることで、repositoryからの受け取りと合計値の計算を別で実行できる。

func (s Service) Pipeline() iter.Seq[int] {
	ch := make(chan [2]int)
	go func() {
		defer close(ch)
		for _, item := range s.r.Generate() {
			ch <- item
		}
	}()
	return func(yield func(int) bool) {
		for item := range ch {
			p := s.sumPrice(item[0], item[1])
			_ = yield(p)
		}
	}
}

Pipelineアンチパターン

参考にしてはいけないコード。 これは、直接ゴルーチンを起動するパターン。最終的なカウント(itemsの数100)に不整合が起こる。

アンチパターン: https://github.com/yyyoichi/iter-pipeline-sample/blob/fbc7e322e7845e3f76e96311c5a0d93d2db8f5dd/service.go#L22

ベンチマーク

軽い処理のとき意味なし

コードサンプル: https://github.com/yyyoichi/iter-pipeline-sample/tree/no-wait-pipeline

goos: linux
goarch: amd64
pkg: github.com/yyyoichi/iter-pipeline-sample
cpu: 13th Gen Intel(R) Core(TM) i7-1360P
=== RUN   BenchmarkRouter
BenchmarkRouter
=== RUN   BenchmarkRouter/Iter
BenchmarkRouter/Iter
BenchmarkRouter/Iter-16                  1562252               860.9 ns/op           152 B/op          7 allocs/op
=== RUN   BenchmarkRouter/Loop
BenchmarkRouter/Loop
BenchmarkRouter/Loop-16                  4037108               295.3 ns/op           896 B/op          1 allocs/op
=== RUN   BenchmarkRouter/Pipeline
BenchmarkRouter/Pipeline
BenchmarkRouter/Pipeline-16                54279             21092 ns/op             224 B/op          7 allocs/op
=== RUN   BenchmarkRouter/FunOut
BenchmarkRouter/FunOut
BenchmarkRouter/FunOut-16                  12694             94362 ns/op            3325 B/op         59 allocs/op
PASS
ok      github.com/yyyoichi/iter-pipeline-sample        8.147s

そりゃ遅いよねという。 普通に各階層でループしたほうが速いのね。

では、パイプラインパターンを実装するメリットを発揮するように、各階層の処理を重くする。

重い処理でパイプラインするとGood

合計金額の計算に、無駄な処理を入れて故意に処理を重くする。

コードサンプル: https://github.com/yyyoichi/iter-pipeline-sample

func (s *Service) sumPrice(price, num int) int {
-       return price * num
+       n := 100_000
+
+       var sum int
+       for range n {
+               for p := range price {
+                       sum += p
+               }
+               for n := range num {
+                       sum += n
+               }
+       }
+       for range n {
+               for p := range price {
+                       sum -= p
+               }
+               for n := range num {
+                       sum -= n
+               }
+       }
+       return sum + price*num
 }
goos: linux
goarch: amd64
pkg: github.com/yyyoichi/iter-pipeline-sample
cpu: 13th Gen Intel(R) Core(TM) i7-1360P
=== RUN   BenchmarkRouter
BenchmarkRouter
=== RUN   BenchmarkRouter/Iter
BenchmarkRouter/Iter
BenchmarkRouter/Iter-16                        2         591060918 ns/op             160 B/op          7 allocs/op
=== RUN   BenchmarkRouter/Loop
BenchmarkRouter/Loop
BenchmarkRouter/Loop-16                        2         595090432 ns/op             896 B/op          1 allocs/op
=== RUN   BenchmarkRouter/Pipeline
BenchmarkRouter/Pipeline
BenchmarkRouter/Pipeline-16                    3         465564291 ns/op             224 B/op          7 allocs/op
=== RUN   BenchmarkRouter/FunOut
BenchmarkRouter/FunOut
BenchmarkRouter/FunOut-16                     20          55785737 ns/op            8379 B/op         78 allocs/op
PASS
ok      github.com/yyyoichi/iter-pipeline-sample        8.420s

Fun-Outが速い。メモリをうまく使って処理を終了できたよう。iterloopと比較して約1/10のスピードで完了。

Pipelineが次点。やや速い。

チャネルのみのPipelineパターンではダメか

従来のように、チャネルを利用したとする場合、エラーの取り扱いが難しくなる。 例えば、各処理結果をエラーと一緒にした構造体にするか、コンテキストのキャンセルを受け取るか、になる。

func pipeline() <-chan struct {
    Result int
    Err error
} {
    // ...
}
func pipeline(cancel func(err)) <-chan int {
    // ...
}

ちょっと複雑か。

もう少し煩雑感を減らしたい場合には有効かもという選択肢の一つとしてありでは。

↓↓↓

func pipeline() iter.Seq2[int, error] {
    // ...
}

パッケージ化

iterも戻り値が複雑なので、どっちもどっちか。

どちらも、関数で切り出して呼び出せばわかりやすくなる。

ただ処理を切り出して複雑性を隠蔽しても、引数と戻り値の問題がチャネルのみの場合は残る。

時機にiterPipelineのパッケージ書く予定。

まとめ

  • Pipelineパターンはiterと併用できる?
    • Pipelineパターンはiterと併用する方法はある。
  • Pipelineパターンをiterと併用するメリットは?
    • Pipelineパターンをiterと併用することで、実装が簡潔になる可能性がある。
    • Pipelineパターンをiterと併用することで、処理速度が上がる可能性がある。
  • Pipelineパターンやiterはどこで使うべき?
    • Pipelineパターンやiterはベンチマークを測ろう。

https://github.com/yyyoichi/iter-pipeline-sample