Goのv1.23で追加されたiter
パッケージでPipeline
パターンをやってみる。
Pipeline
パターンはiter
と併用できる?Pipeline
パターンをiter
と併用するメリットは?Pipeline
パターンやiter
はどこで使うべき?
要件
アプリケーションのバックエンド開発を想定。
商品を単価と個数をrepository
から取り出して、単価x個数の合計金額をservice
で計算、最後handler
で計算結果を確認という流れ。
実装
iter
パターンの実装を確認。
type Repository struct {
items [][2]int // [][2]int{price, num}
}
func (r Repository) Generate() iter.Seq2[int, [2]int] {
// r.itemsはコンストラクタによってlenは100
return slices.All(r.items)
}
type Service struct {
r Repository
}
func (s Service) Iter() iter.Seq[int] {
return func(yield func(int) bool) {
for _, item := range s.r.Generate() {
p := s.sumPrice(item[0], item[1])
_ = yield(p)
}
}
}
type Router struct {
s Service
}
func (r Router) HandleWithIter() {
var count int
for sum := range r.s.Iter() {
_ = sum
count++
}
if count != 100 {
log.Println("Iter", count)
}
}
func BenchmarkRouter(b *testing.B) {
var router = Router{s: Service{r: NewRepository()}}
b.Run("Iter", func(b *testing.B) {
for i := 0; i < b.N; i++ {
router.HandleWithIter()
}
})
}
iter
iter
の処理イメージ。
i = 0
i < len(repository.items)
限り継続repository.items[i]
がrepository
から取り出し- 合計値を
service
で計算 router
で利用i++
して 2. から繰り返す
だいたいこんな感じ。 階層化している分、複雑だけど、だいたいやっていることは、itemsをループして直列して処理をしてる。
Pipeline
A repository
からservice
に渡す
B service
で合計値を計算して、router
に受け渡す
C router
で利用する。
A,B,Cがそれぞれループして実行する。 カフェのオペレーションに例えれば、A売り子,B珈琲,C受け渡しのイメージ。 それぞれがそれぞれの仕事に集中して、前が詰まらない限り次の仕事に取り掛かる。
Fun-Out
A repository
からservice
に渡す
B service
で合計値を計算して、router
に受け渡す x コア数
C router
で利用する。
pipeline
パターンにBをCPUのコア数だけ並列して実行する。
カフェの例だと、B珈琲の人員をコア数だけ配置するイメージ。
Bも受け取り側が詰まればみんなの作業は止まってしまう。
実装
処理の流れを簡潔に見るため、ctxの考慮、iterがbreakされることを考慮しない実装をした。
コードサンプル: https://github.com/yyyoichi/iter-pipeline-sample
Pipeline
iter.Seq[int,int]
を一度、チャネルで受け取る。
こうすることで、repository
からの受け取りと合計値の計算を別で実行できる。
func (s Service) Pipeline() iter.Seq[int] {
ch := make(chan [2]int)
go func() {
defer close(ch)
for _, item := range s.r.Generate() {
ch <- item
}
}()
return func(yield func(int) bool) {
for item := range ch {
p := s.sumPrice(item[0], item[1])
_ = yield(p)
}
}
}
Pipeline
アンチパターン
参考にしてはいけないコード。 これは、直接ゴルーチンを起動するパターン。最終的なカウント(itemsの数100)に不整合が起こる。
ベンチマーク
軽い処理のとき意味なし
コードサンプル: https://github.com/yyyoichi/iter-pipeline-sample/tree/no-wait-pipeline
goos: linux
goarch: amd64
pkg: github.com/yyyoichi/iter-pipeline-sample
cpu: 13th Gen Intel(R) Core(TM) i7-1360P
=== RUN BenchmarkRouter
BenchmarkRouter
=== RUN BenchmarkRouter/Iter
BenchmarkRouter/Iter
BenchmarkRouter/Iter-16 1562252 860.9 ns/op 152 B/op 7 allocs/op
=== RUN BenchmarkRouter/Loop
BenchmarkRouter/Loop
BenchmarkRouter/Loop-16 4037108 295.3 ns/op 896 B/op 1 allocs/op
=== RUN BenchmarkRouter/Pipeline
BenchmarkRouter/Pipeline
BenchmarkRouter/Pipeline-16 54279 21092 ns/op 224 B/op 7 allocs/op
=== RUN BenchmarkRouter/FunOut
BenchmarkRouter/FunOut
BenchmarkRouter/FunOut-16 12694 94362 ns/op 3325 B/op 59 allocs/op
PASS
ok github.com/yyyoichi/iter-pipeline-sample 8.147s
そりゃ遅いよねという。 普通に各階層でループしたほうが速いのね。
では、パイプラインパターンを実装するメリットを発揮するように、各階層の処理を重くする。
重い処理でパイプラインするとGood
合計金額の計算に、無駄な処理を入れて故意に処理を重くする。
コードサンプル: https://github.com/yyyoichi/iter-pipeline-sample
func (s *Service) sumPrice(price, num int) int {
- return price * num
+ n := 100_000
+
+ var sum int
+ for range n {
+ for p := range price {
+ sum += p
+ }
+ for n := range num {
+ sum += n
+ }
+ }
+ for range n {
+ for p := range price {
+ sum -= p
+ }
+ for n := range num {
+ sum -= n
+ }
+ }
+ return sum + price*num
}
goos: linux
goarch: amd64
pkg: github.com/yyyoichi/iter-pipeline-sample
cpu: 13th Gen Intel(R) Core(TM) i7-1360P
=== RUN BenchmarkRouter
BenchmarkRouter
=== RUN BenchmarkRouter/Iter
BenchmarkRouter/Iter
BenchmarkRouter/Iter-16 2 591060918 ns/op 160 B/op 7 allocs/op
=== RUN BenchmarkRouter/Loop
BenchmarkRouter/Loop
BenchmarkRouter/Loop-16 2 595090432 ns/op 896 B/op 1 allocs/op
=== RUN BenchmarkRouter/Pipeline
BenchmarkRouter/Pipeline
BenchmarkRouter/Pipeline-16 3 465564291 ns/op 224 B/op 7 allocs/op
=== RUN BenchmarkRouter/FunOut
BenchmarkRouter/FunOut
BenchmarkRouter/FunOut-16 20 55785737 ns/op 8379 B/op 78 allocs/op
PASS
ok github.com/yyyoichi/iter-pipeline-sample 8.420s
Fun-Out
が速い。メモリをうまく使って処理を終了できたよう。iter
やloop
と比較して約1/10のスピードで完了。
Pipeline
が次点。やや速い。
チャネルのみのPipeline
パターンではダメか
従来のように、チャネルを利用したとする場合、エラーの取り扱いが難しくなる。 例えば、各処理結果をエラーと一緒にした構造体にするか、コンテキストのキャンセルを受け取るか、になる。
func pipeline() <-chan struct {
Result int
Err error
} {
// ...
}
func pipeline(cancel func(err)) <-chan int {
// ...
}
ちょっと複雑か。
もう少し煩雑感を減らしたい場合には有効かもという選択肢の一つとしてありでは。
↓↓↓
func pipeline() iter.Seq2[int, error] {
// ...
}
パッケージ化
iter
も戻り値が複雑なので、どっちもどっちか。
どちらも、関数で切り出して呼び出せばわかりやすくなる。
ただ処理を切り出して複雑性を隠蔽しても、引数と戻り値の問題がチャネルのみの場合は残る。
時機にiter
とPipeline
のパッケージ書く予定。
まとめ
Pipeline
パターンはiter
と併用できる?Pipeline
パターンはiter
と併用する方法はある。
Pipeline
パターンをiter
と併用するメリットは?Pipeline
パターンをiter
と併用することで、実装が簡潔になる可能性がある。Pipeline
パターンをiter
と併用することで、処理速度が上がる可能性がある。
Pipeline
パターンやiter
はどこで使うべき?Pipeline
パターンやiter
はベンチマークを測ろう。
https://github.com/yyyoichi/iter-pipeline-sample