Yazılarımız

Veri Akademi

GO GOROUTİNE VE CHANNEL PATTERN’LERİ: FAN-İN / FAN-OUT ÖRNEKLERLE

Go’nun concurrency yaklaşımı “çok çekirdeği kullanayım” motivasyonundan çok daha fazlasını sunar: sistemin yük altında tahmin edilebilir davranmasını sağlamak, iş akışlarını sadeleştirmek ve hata/iptal senaryolarını yönetilebilir kılmak. Bu noktada goroutine’ler ve channel’lar birer yapı taşıdır; asıl gücü ise onları doğru pattern’lerle bir araya getirdiğinizde görürsünüz.

Bu makalede fan-out ile işi paralelleştirmeyi, fan-in ile çoklu akışları tek kanalda birleştirmeyi, worker pool ve pipeline gibi pratik düzenleri gerçekçi örneklerle ele alacağız. Sadece “çalışan kod” değil, aynı zamanda kapanmayan goroutine (leak), backpressure ve iptal sinyali gibi üretimde can yakan ayrıntılara da odaklanacağız.

Hedef: Elinizdeki işi goroutine’lere dağıtırken kontrolü kaybetmemek; sonuçları toplarken sistemin dengesini bozmamak; kapanış/iptal disiplinini oturtmak. Bu disiplin oturduğunda Go kodu hem okunur hem de ölçeklenebilir hale gelir.

Goroutine’lerle paralel çalışan worker havuzu ve işlerin kanalla dağıtılması yaklaşımı

Primary Keyword: Go goroutine ve channel pattern’leri nedir, ne işe yarar?

Bu makalenin ana odağı Go goroutine ve channel pattern’leri. Pattern derken “kütüphane” değil, tekrarlayan concurrency problemlerine karşı denenmiş bir düzeni kastediyoruz. Örneğin aynı anda çok sayıda işi çalıştırmak istiyorsunuz; ama sınırsız goroutine açmak CPU’yu, belleği ve dış servisleri boğabilir. Ya da çok kaynaktan gelen sonuçları tek noktada toplamak istiyorsunuz; ama hangi kanalı ne zaman kapatacağınızı bilemezseniz goroutine’ler asılı kalabilir.

Pattern’lerin ortak amacı üç şeye hizmet eder: (1) yük kontrolü (fan-out/worker pool), (2) akış birleştirme (fan-in/merge), (3) yaşam döngüsü yönetimi (close, context ile iptal, timeout). Bu üçü birlikte ele alınmadığında sistem genellikle “bazen takılıyor” veya “nadir ama ölümcül” bug’lara davetiye çıkarır.

Goroutine maliyeti küçük, ama sınırsız değil

Goroutine’ler işletim sistemi thread’lerine göre daha hafif olsa da “bedava” değildir. Her goroutine bir stack ile başlar, büyür; scheduler üzerinde maliyeti vardır; ayrıca çoğu zaman kanal üzerinden veri taşırken ek yük oluşur. Bu yüzden fan-out yaparken çoğunlukla bounded concurrency yani sınırlı eşzamanlılık isteriz. Sınırsız fan-out; CPU dalgalanması, GC baskısı ve downstream servislerde kuyruk taşması olarak geri döner.

Channel semantiği: veri akışı + senkronizasyon

Channel sadece veri taşımaz; aynı zamanda senkronizasyon aracıdır. Unbuffered channel, gönderici ve alıcıyı buluşturur; buffered channel ise geçici bir kuyruk sağlar. Bu fark “backpressure” davranışını belirler. Pattern tasarlarken “bu noktada sistem yavaşlamalı mı, yoksa buffer’la mı tolere etmeli?” sorusunu açıkça cevaplamak gerekir.


Fan-out: İşleri paralel worker’lara dağıtma

Fan-out, tek bir iş kaynağından gelen işleri birden fazla worker goroutine’e dağıtma yaklaşımıdır. Tipik senaryo: bir API’den gelen istekleri işleyip dış servise çağrı yapmak, dosya parçalarını işlemek, mesaj kuyruğundan okunan event’leri dönüştürmek. Fan-out size throughput kazandırır; ama kontrolsüz fan-out sistemi hızlandırmak yerine kilitleyebilir.

Worker pool ile bounded concurrency

En yaygın fan-out uygulaması worker pool’dur: N tane worker belirlenir; iş kanalı üzerinden gelen işleri worker’lar paylaşır. Böylece eşzamanlılık üst sınırla belirlenir. İki kritik nokta: (1) İş kanalını kim kapatacak? (2) Sonuçlar nasıl toplanacak ve ne zaman kapanacak?

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type Job struct {
	ID   int
	Data string
}

type Result struct {
	JobID int
	Value string
	Err   error
}

func worker(ctx context.Context, id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
	defer wg.Done()

	for {
		select {
		case <-ctx.Done():
			// İptal geldi; goroutine'i temiz kapat.
			return
		case job, ok := <-jobs:
			if !ok {
				// İş kanalı kapandı; çık.
				return
			}

			// Simülasyon: işleme süresi
			time.Sleep(50 * time.Millisecond)

			// Üretimde burada I/O olabilir; hata taşıyın.
			results <- Result{
				JobID: job.ID,
				Value: fmt.Sprintf("worker-%d processed %s", id, job.Data),
				Err:   nil,
			}
		}
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	jobs := make(chan Job)
	results := make(chan Result)

	const workerCount = 4
	var wg sync.WaitGroup
	wg.Add(workerCount)

	for i := 1; i <= workerCount; i++ {
		go worker(ctx, i, jobs, results, &wg)
	}

	// Sonuç kanalını doğru kapatmak için: worker'lar bitince kapat.
	go func() {
		wg.Wait()
		close(results)
	}()

	// Üretici: işleri gönder, sonra jobs kanalını kapat.
	go func() {
		defer close(jobs)
		for i := 1; i <= 20; i++ {
			j := Job{ID: i, Data: fmt.Sprintf("item-%d", i)}
			select {
			case <-ctx.Done():
				return
			case jobs <- j:
			}
		}
	}()

	// Tüketici: results kapanana kadar oku.
	for r := range results {
		if r.Err != nil {
			fmt.Println("error:", r.Err)
			continue
		}
		fmt.Println(r.Value)
	}
}

Bu örnekte fan-out düzenini “kapanış disiplini” ile birleştirdik: jobs kanalını üretici kapatıyor; results kanalını ise worker’lar bittikten sonra kapatan ayrı bir goroutine var. Bu ayrım, “kanalı kim kapatır?” sorusuna net yanıt verir. Kanalı kapatma sorumluluğu üreticide olmalı; alıcı tarafın kapatması genellikle yarış koşullarına yol açar.

Backpressure ve kuyruk davranışı seçimi

Fan-out tasarımında buffered channel boyutu kritik bir ayardır. Buffer, kısa süreli dalgalanmaları yumuşatır; ancak çok büyük buffer sorunları gizleyebilir. Unbuffered jobs kanalı “her iş bir worker’a teslim edilmeden ilerleme” şeklinde doğal bir fren mekanizması sağlar. Buffered jobs kanalı ise üreticinin bir süre önden gitmesine izin verir. Seçimi yaparken şu sorular işinizi kolaylaştırır:

  • İş üretimi patlamalı mı? Kısa süreli spike’ları tolere etmek için küçük bir buffer yeterli olabilir.
  • Downstream servis hassas mı? Dış servis rate limit’liyse fan-out sayısını ve buffer’ı agresif tutmayın.
  • Gecikme mi, throughput mu? Bazı sistemlerde düşük gecikme, bazı sistemlerde yüksek throughput önceliklidir.
Sınırlı sayıda worker ile işleri sırayla alıp sonuçları ayrı bir kanala yazma yaklaşımı

Fan-in: Çoklu kanalı tek akışta birleştirme

Fan-in, birden fazla goroutine veya kaynaktan gelen sonuçları tek bir çıktı kanalında toplama yaklaşımıdır. Worker pool kullandığınızda doğal olarak fan-in ihtiyacı ortaya çıkar: Her worker sonuç üretir; tüketici ise “tek bir yerden” okumak ister. Fan-in doğru yapılmazsa en sık görülen problem şudur: bazı üreticiler bitmiş olsa bile çıktı kanalı kapanmaz ve tüketici sonsuza kadar bekler.

Merge (fan-in) fonksiyonu: Kapanışı garanti altına alma

Fan-in için yaygın yaklaşım, her giriş kanalını okuyan birer goroutine açmak ve sonuçları tek bir out kanalına aktarmaktır. Sonra bu goroutine’lerin tamamı bittiğinde out kanalını kapatırsınız. Burada WaitGroup en net yöntemlerden biridir. Ayrıca context ile iptal desteği eklemek, üretimde “kuyruk birikti, artık vazgeç” senaryolarında hayati olur.

package main

import (
	"context"
	"sync"
	"time"
)

func merge(ctx context.Context, inputs ...<-chan int) <-chan int {
	out := make(chan int)

	var wg sync.WaitGroup
	wg.Add(len(inputs))

	pipe := func(ch <-chan int) {
		defer wg.Done()
		for {
			select {
			case <-ctx.Done():
				return
			case v, ok := <-ch:
				if !ok {
					return
				}
				select {
				case <-ctx.Done():
					return
				case out <- v:
				}
			}
		}
	}

	for _, ch := range inputs {
		go pipe(ch)
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func producer(ctx context.Context, start int, count int) <-chan int {
	ch := make(chan int)
	go func() {
		defer close(ch)
		for i := 0; i < count; i++ {
			select {
			case <-ctx.Done():
				return
			case ch <- (start + i):
				time.Sleep(10 * time.Millisecond)
			}
		}
	}()
	return ch
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	a := producer(ctx, 0, 30)
	b := producer(ctx, 100, 30)
	c := producer(ctx, 1000, 30)

	out := merge(ctx, a, b, c)

	for v := range out {
		_ = v
		// Burada tüketim yapılır (log, DB, agregasyon vb.)
	}
}

Bu fan-in düzeninde iki ayrı “select katmanı” var: girişten okurken iptali dinliyoruz; out’a yazarken de iptali dinliyoruz. Böylece tüketici taraf yavaşladığında veya context iptal olduğunda goroutine’ler asılı kalmıyor. Bu yaklaşım, goroutine leak riskini ciddi biçimde azaltır.

select ile timeout, öncelik ve kapanış senaryoları

Fan-in/fan-out örneklerinde select çoğu zaman kontrol mekanizmasıdır. Timeout, “en fazla şu kadar bekle” demektir; fakat timeout kullanırken kaynak goroutine’lerin de durduğundan emin olun. Sadece tüketici timeout yaparsa üreticiler arkada üretmeye devam edebilir. Bu yüzden timeout genellikle context ile birlikte ele alınır: tek bir iptal sinyali, tüm akışın yaşam döngüsünü belirler.


Pipeline: Aşamalı işleme ile okunabilir concurrency

Pipeline pattern, veriyi aşamalar (stages) halinde işlemek için kullanılır: üretici bir kanala yazar, sonraki aşama okur-dönüştürür-yazar, bir sonraki aşama devam eder. Pipeline özellikle veri dönüşümü, filtreleme, zenginleştirme, batch işleme gibi işlerde çok okunaklı olur. Ayrıca her aşamanın concurrency düzeyini ayrı ayarlayabilmeniz büyük esneklik sağlar.

Stage tasarımı: input, output ve sorumluluk sınırları

Bir stage’in iyi tasarımı üç özelliğe sahip olur: (1) input kanalını okur, (2) output kanalına yazar, (3) output’u kapatma sorumluluğu stage’e aittir. Yani stage bir “üretici” gibi davranır. Bu disiplin korunursa pipeline’ın her halkası izole ve test edilebilir olur. Üstelik aşamalar arası buffer miktarı, sistemi ayarlamak için güçlü bir kaldıraçtır.

Hata yönetimi: sonuçla birlikte error taşımak

Pipeline’da hatayı “panic” ile yükseltmek yerine, veriyle birlikte hata taşıyan bir tip kullanmak daha güvenlidir. Özellikle fan-out ile paralelleştirdiğiniz aşamalarda bazı işler başarısız olabilir; tüm akışı durdurmak mı yoksa sadece hatalı işi raporlamak mı istediğinizi açıkça belirleyin. Birçok üretim senaryosunda “partial success” kabul edilebilir; ama bu kararı pattern belirlemez, iş kuralı belirler.

Üretici-tüketici zincirinde veri işleme aşamalarını iptal sinyaliyle güvenli biçimde durdurma

Pipeline kurarken aşağıdaki adımlar pratik bir kontrol listesi gibi düşünülebilir:

  1. Stage sınırlarını belirleyin: I/O mu yapıyor, CPU mu harcıyor, filtre mi uyguluyor?
  2. Her stage için kapanış sorumluluğunu netleştirin: Output’u kim kapatıyor?
  3. İptal sinyalini (context) uçtan uca taşıyın; “yarıda bırakma” senaryosunu tasarlayın.
  4. Gözlemlenebilirlik ekleyin: gecikme, kuyruk, hata oranı metriklerini yakalayın.

Cancellation ve leak önleme: En pahalı bug’ları erken yakalayın

Concurrency dünyasında en can sıkıcı problemler “nadiren olur” ve “tekrarlaması zordur”. Go’da bunun sık örneği goroutine leak’tir: bir goroutine, bir kanaldan okuma/yazma bekler ve hiçbir zaman ilerleyemez; testte yakalanmaz, üretimde yavaş yavaş kaynak tüketir. Bu yüzden pattern tasarımının merkezinde kapanış ve iptal olmalı.

Close disiplini: Kanalı kapatacak tek bir yer olsun

Kanalı kapatma sorumluluğu, o kanala yazan tarafta olmalıdır. Birden fazla üretici varsa ya (1) tek bir “kapatıcı” goroutine üreticilerin bitişini izleyip kapatır, ya da (2) üreticiler tek bir noktadan fan-in edilip o nokta kapatır. “Tüketici kapatsın” yaklaşımı çoğu zaman yarış koşullarına kapı açar. Ayrıca kapalı kanala yazmak panic üretir; bu nedenle kapanış sorumluluğunu dağıtmak risklidir.

Buffered vs unbuffered: Performans değil davranış seçimi

Buffer, sadece performans değil, sistem davranışı seçimidir. Unbuffered channel, üretici-tüketici hızını birbirine kilitler; bu çoğu zaman doğal bir backpressure sağlar. Buffered channel, kısa süreli dalgalanmaları yutar; ama tüketici uzun süre yavaşsa buffer dolacak ve yine bloklayacaktır. Bu “nereye kadar” sorusunun yanıtı işin doğasına bağlıdır: örneğin log işleme pipeline’ında daha büyük buffer makul olabilirken, ödeme sisteminde gecikmeyi büyütecek buffer’lar riskli olabilir.


Fan-in / fan-out kombinasyonu: Üretimde en sık görülen senaryo

Gerçek sistemlerde fan-out çoğu zaman tek başına değildir; sonuçları toplamak için fan-in gerekir. Tipik akış: işleri sıraya koy (jobs), worker pool ile fan-out et, sonuçları tek kanala fan-in et, ardından bir tüketici ile yaz (DB, cache, API, dosya). Bu kombinasyonda iki yerde tıkanma yaşanabilir: iş dağıtımında ve sonuç tüketiminde. Bu nedenle kanal boyutlarını ve worker sayısını birlikte düşünmek gerekir.

Pratik bir yaklaşım olarak şu heuristikler iş görür: CPU-ağır işlerde worker sayısını çekirdek sayısına yakın tutmak; I/O-ağır işlerde worker sayısını biraz artırmak; fakat dış servis limitlerini dikkate almak. Yine de en sağlıklısı ölçmektir: kuyruk uzunluğu, işlem süresi, hata oranı ve timeout’lar size doğru ayarı söyler.

Rate limiting ve yük koruması eklemek

Worker sayısını sınırlamak çoğu zaman yeterli değildir; özellikle dış servislere çağrı yapıyorsanız rate limiting gerekebilir. Bu, token bucket veya zamanlayıcı tabanlı bir kanal ile uygulanabilir. Önemli olan, “yük altındayken ne olacak?” sorusuna sistemin net bir cevabı olmasıdır: Yavaşlayacak mı, reddedecek mi, kuyruğa mı alacak? Pattern’ler bu kararı görünür kılar.


Test edilebilirlik ve okunabilirlik: Pattern’leri ekip standardına dönüştürün

Pattern’ler sadece performans için değil, ekip içi iletişim için de değerlidir. “Burada worker pool var”, “şu fonksiyon fan-in merge yapıyor” dediğinizde herkes aynı zihinsel modeli paylaşır. Bu, kod review süresini düşürür ve hata olasılığını azaltır. Üstelik her stage’i ayrı test etmek kolaylaşır; mock channel’lar ile deterministik senaryolar yazabilirsiniz.

İzolasyon: saf fonksiyonlar ve küçük goroutine’ler

Concurrency kodu karmaşıklaşmaya meyillidir. Bunu azaltmanın yolu aşamaları küçük tutmak, yan etkileri sınırlandırmak ve mümkünse “işleme” kısmını saf fonksiyonlara ayırmaktır. Goroutine’ler sadece orkestrasyon yapsın, iş mantığı ayrı fonksiyonda olsun. Böylece veri yarışı ve kilitlenme riskini azaltırsınız.

Gözlemlenebilirlik: metrik, log ve izleme kancaları

Üretimde concurrency problemleri çoğu zaman “neden yavaşladı?” sorusuyla başlar. Bu yüzden kanal kuyruklarını (buffer doluluk), işlem sürelerini ve hata oranlarını gözlemlemek değerlidir. Basit bir ölçüm bile (ör. stage latency histogram’ı) bottleneck’i hızla buldurur. Pattern’leri standardize ettiğinizde, bu ölçüm noktalarını da standardize etmiş olursunuz.


Ne zaman hangi pattern? Kısa bir karar rehberi

Özetlemek gerekirse: fan-out, işi paralelleştirir; fan-in, sonuçları birleştirir; pipeline ise adımları sade bir akışa dönüştürür. Bu üçü çoğu sistemde birlikte kullanılır. Eğer “N istek geldi, hepsini aynı anda çalıştırayım” diyorsanız fan-out ile başlayın ama mutlaka bounded concurrency kurun. Eğer “birden fazla kaynaktan veri geliyor, tek tüketiciye akıtacağım” diyorsanız fan-in merge uygulayın. Eğer “veri birkaç aşamadan geçiyor” diyorsanız pipeline kurgulayın ve kapanış/iptal disiplinini en baştan tasarlayın.

Bu konuları daha sistematik ve uygulamalı çalışmak isterseniz, kapsamlı örneklerle ilerleyen Go eğitimi içeriğinde concurrency tasarımına özel bölümlerle ilerlemek öğrenme eğrisini belirgin şekilde kısaltır.

Son mesaj: Go concurrency’de “çalışıyor” yetmez; kapanıyor mu, iptal olunca duruyor mu, yük altında dengeli mi sorularına da yanıt vermelidir. Fan-in / fan-out ve pipeline pattern’lerini bu bakışla kullandığınızda, goroutine ve channel’lar birer araç olmaktan çıkıp sürdürülebilir bir mimari diline dönüşür.

 VERİ AKADEMİ