전공공부
Go의 동시성 패턴 본문
클라우드 네이티브 서비스는 확장에 따른 비용 증가와 문제를 겪지 않으면서 여러 프로세스를 효율적으로 조정하고 높은 수준의 부하를 견뎌 낼 수 있도록 요청이 됩니다. 따라서, 고도의 동시성을 제공해야 하고 다수의 사용자로부터 유입되는 요청을 관리하여야 합니다.
Fan-In
- 다수의 입력 채널을 하나의 출력 채널로 다중화 하는 것
적용성
- 출력을 만들어 내는 워커를 가진 서비스는 워커의 출력을 하나의 단일화된 스트림으로 합치는 것이 유용할 수 있습니다.
*워커
워커 : 특정 작업을 수행하는 고루틴 함수의 단위로 보고 이해 할 수 있습니다.
아래 코드를 통해서 설명드리겠습니다.
우선 dest는 목적지이며 출력 채널이고 이는 하나의 단일화된 스트림입니다.
Funnel은 소스로 부터 받은 값을 수신하여 목적지로 바로 반환합니다.
Source 최초 보내는 요청의 값이고 입력 채널의 집합체 입니다.
*WaitGroup
WaitGroup : Go 언어의 sync 패키지에 포함된 동시성 제어 도구로, 여러 고루틴의 완료를 기다릴 수 있도록 해줍니다.
WaitGroup은 Add, Done, Wait 세 가지 주요 메서드를 사용하여 동작합니다.
- Add(delta int): WaitGroup의 카운터를 delta만큼 증가시킵니다. 일반적으로 고루틴을 시작할 때마다 1씩 증가시킵니다.
- Done(): WaitGroup의 카운터를 1만큼 감소시킵니다. 일반적으로 고루틴이 종료될 때 호출합니다.
- Wait(): WaitGroup의 카운터가 0이 될 때까지 블록(block)합니다. 모든 고루틴이 완료될 때까지 대기합니다.
package main
import (
"fmt"
"sync"
"time"
)
func Funnel(sources ...<-chan int) <-chan int {
dest := make(chan int) //공유 출력 채널 선언
//모든 sources의 채널이 닫혔을 때 출력 채널을 자동으로 닫기 위해 사용됩니다.
//입력 채널의 갯수 만큼 생성되고 설정 되기 때문에 입력 채널이 닫히면 해당 채널과
//연결된 고루틴이 종료되면서 wg.Done 을 호출하여 카운터가 -1이 됩니다.
var wg sync.WaitGroup
wg.Add(len(sources)) //WaitGroup의 크기를 지정합니다.
for _, ch := range sources { //각 채널에 대해서 고루틴을 시작합니다.
go func(c <-chan int) {
defer wg.Done() //채널이 닫히면 WaitGroup으로 알려줍니다.
for n := range c {
dest <- n
}
}(ch)
}
go func() { //모든 입력 채널이 닫힌 후
wg.Wait() //출력 채널을 닫기 위한 고루틴 시작
close(dest)
}()
return dest
}
func main() {
sources := make([]<-chan int, 0) //빈 채널 슬라이스 생성
for i := 0; i < 3; i++ {
ch := make(chan int)
sources = append(sources, ch) //채널을 생성하여 sources에 추가합니다.
go func() { //각 채널에 대해 간단한 고루틴을 실행 합시다.
defer close(ch) // 고루틴 실행이 끝나면 채널을 닫습니다.
for i := 1; i <= 5; i++ {
ch <- i
time.Sleep(time.Second)
}
}()
}
dest := Funnel(sources...)
for d := range dest {
fmt.Println(d)
}
}
위 코드에 고루틴을 사용하게 되므로 동시적으로 실행이 되어 아래와 같은 출력이 나오게 됩니다.
1.
1
2
3
4
5
1
2
3
4
5
1
2
3
4
5
2.
1
1
1
2
2
2
3
3
3
4
4
4
5
5
5
동시성 패턴의 FanIn에 대해서는 위와 같이 간단하게만 알아보겠습니다.
퓨처
퓨처는 아직 알지 못하는 값의 플레이스 홀더입니다.
Promises나 Delays라고 알려진 퓨처는 비동기 프로세스에 의해 생성되는 값에 플레이스 홀더로 두는 동기화 생성자입니다.
플레이스홀더 : 미래의 완료될 결과를 나타냄
동기화 생성자 : 비동기 작업의 결과를 기다리고 이에 따라서, 동기화된 방식으로 비동기 작업을 처리 할 수 있음
package main
import (
"fmt"
"time"
)
// Matrix 구조체 정의
type Matrix struct {
values [][]float64
}
// BlockingInverse 함수는 긴 시간 동안 수행되는 블로킹 함수입니다.
func BlockingInverse(m Matrix) Matrix {
// 여기서 인버스 계산을 시뮬레이션합니다.
time.Sleep(2 * time.Second)
// 실제 인버스 계산은 생략하고, 원본 행렬을 반환합니다.
return m
}
// ConcurrentInverse 함수는 비동기적으로 행렬의 인버스를 계산합니다.
func ConcurrentInverse(m Matrix) <-chan Matrix {
out := make(chan Matrix)
go func() {
out <- BlockingInverse(m)
close(out)
}()
return out
}
// Product 함수는 두 행렬의 곱을 계산합니다.
func Product(a, b Matrix) Matrix {
// 간단한 행렬 곱 계산
// 실제 구현은 생략하고, 첫 번째 행렬을 반환합니다.
return a
}
// InverseProduct 함수는 두 행렬의 인버스를 비동기적으로 계산하고, 그 결과를 곱합니다.
func InverseProduct(a, b Matrix) Matrix {
inva := ConcurrentInverse(a)
invb := ConcurrentInverse(b)
return Product(<-inva, <-invb)
}
func main() {
start := time.Now() // 시작 시간 기록
// 두 행렬 정의
matrixA := Matrix{
values: [][]float64{
{1, 2},
{3, 4},
},
}
matrixB := Matrix{
values: [][]float64{
{5, 6},
{7, 8},
},
}
result := InverseProduct(matrixA, matrixB)
// 결과 출력
fmt.Println("Result matrix:")
for _, row := range result.values {
fmt.Println(row)
}
fmt.Println("Calculating inverse products...")
fmt.Println("Calculation complete.")
elapsed := time.Since(start) // 경과 시간 계산
fmt.Printf("Execution time: %s\n", elapsed)
}
결과물
Result matrix:
[1 2]
[3 4]
Calculating inverse products...
Calculation complete.
Execution time: 2.010494s
그런데 말입니다. 위 퓨처를 잘못 쓰게 되면 어떤 결과를 초래하게 될까요?
퓨처는 우선 비동기의 결과물을 동기적으로 내 뱉는 함수임을 기억합시다.
func InverseProduct(a, b Matrix) Matrix {
return Product(<-ConcurrentInverse(a), <-ConcurrentInverse(b))
}
이렇게하면 아래와 같은 결과물이 나옵니다. 같은 것을 정의한 것 같은데 훨씬 딜레이가 걸려버립니다.
무슨 문제나면 ConcurrentInverse 함수가 실제로 호출되기 전까지 계산이 시작되지 않았기 때문에 이 생성자는 연속적으로 해당 함수를 실행하며 실행 시간이 두배가 되어 버립니다.
Result matrix:
[1 2]
[3 4]
Calculating inverse products...
Calculation complete.
Execution time: 4.0178238s
그 뿐만 아니라 위 코드 처럼 사용하면 한 개 이상의 반환 값을 가진 함수들은 각 반환 값에 대해서 전용 채널을 할당 해야 합니다. 따라서, 반환값의 리스트가 증가하거나 하나 이상의 고루틴에 의해서 값을 읽게 되면 난처한 상황이 벌어 질 수 있습니다.
위의 문제점을 퓨처를 사용하므로써 해결 할 수 있습니다. 복잡한 채널 할당 문제나 실행 시간이 길어지는 API 사용과 같은 문제점을요.
구현을 통해서 알아봅시다.
Future를 사용한 구현
Future : 잠재적인 결과값을 받기 위한 인터페이스
SlowFunction : 비동기적으로 실행되며 퓨쳐를 제공합니다.
InnerFuture : 인터페이스의 조건을 충족하며 접근 결과 로직을 제공하는 메서드를 포함함.
import (
"context"
"sync"
"time"
)
type Future interface {
Result() (string, error)
}
type InnerFuture struct {
//sync.Once는 Result 메서드가 한 번만 실행되도록 보장합니다.
once sync.Once
//sync.WaitGroup은 최초 함수 호출 이후의 결과를 기다리기 위해 사용됩니다. (Thread-Safe)
wg sync.WaitGroup
//res string, err error: 비동기 작업의 결과와 오류를 저장합니다.
res string
err error
//비동기 작업의 결과와 오류를 받을 채널입니다.
resCh <-chan string
errCh <-chan error
}
//Go의 인터페이스 구현은 자동으로 컴파일러가 알아 듣습니다.
//InnerFuture 구현체가 Result 메서드를 구현하고 있으므로
//Future 인터페이스에 정의된 Result 메서드를 사용합니다.
func (f *InnerFuture) Result() (string, error) {
f.once.Do(func() {
f.wg.Add(1)
defer f.wg.Done()
f.res = <-f.resCh
f.err = <-f.errCh
})
f.wg.Wait()
return f.res, f.err
}
func SlowFunction(ctx context.Context) Future {
resCh := make(chan string)
errCh := make(chan error)
go func() {
select {
case <-time.After(time.Second * 2):
resCh <- "I slept for 2 seconds"
errCh <- nil
case <-ctx.Done():
resCh <- ""
errCh <- ctx.Err()
}
}()
//퓨처 구현체를 반환 함
return &InnerFuture{resCh: resCh, errCh: errCh}
}
func main() {
ctx := context.Background()
future := SlowFunction(ctx)
res, err := future.Result()
if(err != nil){
fmt.Println("error : ", err)
return
}
fmt.Println(res)
}
샤딩
샤딩은 대규모 데이터 구조를 여러 개의 파티션으로 나눠 읽기 및 쓰기 락의 영향 범위를 최소화 합니다.
수직적 샤딩 : 단일 인스턴스 내에서 파티셔닝 진행, 결과로 프로세스 내부에서는 읽기 쓰기 경쟁이 줄어들지만 확장성이 낮고 데이터의 중복 저장 따위 제공되지 않습니다.
수평적 샤딩 : 여러 서비스의 인스턴스로 파티셔닝하여 데이터 중복성을 제공하고 인스턴스들 사이에 부하가 분배되지만 데이터 분산으로 인한 지연과 복잡도 자체의 증가 문제가 있습니다.
구현
맵과 같은 구조는 동시에 접근하기에 안전하지 않은 타입이기 때문에 락을 통한 동기화 구조를 사용해야 합니다. 이때 sync.RXMutex를 사용합니다.
그런데 아래와 같은 코드는 맵을 부를때마다 전부 맵에 락을 걸어서 락 경합이 발생하기 쉽습니다.
따라서, 동시성을 많이 해치게 되는데요 이를 해결 해준 것이 수직적 샤딩입니다.
var items = struct{
sync.RWMutex
m map[string]int
} {m: make(map[string]int)}
func ThreadSafeRead(key string) int {
items.RLock()
value := items.m[key]
itemes.RUnlock()
return value
}
func ThreadSafeWrite(key string, value int){
items.Lock()
items.m[key] = value
itemes.Unlock()
}
package main
import (
"crypto/sha1"
"fmt"
"sync"
)
type Shard struct{
sync.RWMutex //sync.RWMutex 사용
m map[string]interface{} //m은 샤드의 데이터를 가지고 있습니다.
}
type ShardedMap []*Shard
//ShardedMap은 *Shard 값의 슬라이스 타입입니다.
//위 처럼 타입으로 정의해 메서드를 붙일 수 있습니다.
//func (sm ShardedMap) ThreadSafeRead (key string) int 이런식으로 쓰면 해당 메서드는 위 타입에 붙은 것이라 합니다.
//Java와 달리 생성자가 없어서 생성자 역할을 하는 메서드 제작
func NewShardedMap(nshards int) ShardedMap{
shards := make([]*Shard, nshards) //*Shard 슬라이스를 nshards개 만큼 초기화 합니다.
for i := 0; i < nshards; i++{
shard := make(map[string] interface{})
shards[i] = &Shard{m: shard}
//왜 이따위로 초기화를 할까요?
// -> shards[i].m = shard 이게 안되는 이유는 슬라이스를 위와 같이 초기화하면 바라 보는 곳이 nil이기 때문입니다.
//(객체의 초기 값 참고)
//따라서, 객체 자체의 주소 위치값을 주고 이 객체가 바라보는 interface는 이 것으로 초기화 한다고 선언해야 합니다.
}
return shards //ShardedMap은 *Shards 슬라이스입니다.
}
func (m ShardedMap) getShardIndex(key string) int {
checksum := sha1.Sum([]byte(key)) // crypto/sha1 패키지의 Sum 메서드를 씁니다.
hash := int(checksum[17]) // 해시로 사용할 임의 바이트를 고릅니다.
return hash % len(m) // 인덱스를 얻기 위해서 len(m)으로 나머지를 고릅니다.
}
func (m ShardedMap) getShard(key string) *Shard {
index := m.getShardIndex(key)
return m[index]
}
func (m ShardedMap) Get(key string) interface{} {
shard := m.getShard(key)
shard.RLock()
defer shard.RUnlock()
return shard.m[key]
}
func (m ShardedMap) Set(key string, value interface{}) {
shard := m.getShard(key)
shard.Lock()
defer shard.Unlock()
shard.m[key] = value
}
func (m ShardedMap) Delete(key string) {
shard := m.getShard(key)
shard.Lock()
defer shard.Unlock()
delete(shard.m, key)
}
func (m ShardedMap) Contains(key string) bool {
shard := m.getShard(key)
shard.RLock()
defer shard.RUnlock()
_, ok := shard.m[key]
return ok
}
func (m ShardedMap) Keys() []string {
var keys []string // Declare an empty keys slice
var mutex sync.Mutex // Mutex for write safety to keys
var wg sync.WaitGroup // Create a wait group and add a
wg.Add(len(m)) // wait value for each slice
for _, shard := range m { // Run a goroutine for each slice in m
go func(s *Shard) {
s.RLock() // Establish a read lock on s
defer wg.Done() // Release of the read lock
defer s.RUnlock() // Tell the WaitGroup it's done
for key := range s.m { // Get the slice's keys
mutex.Lock()
keys = append(keys, key)
mutex.Unlock()
}
}(shard)
}
wg.Wait() // Block until all goroutines are done
return keys // Return combined keys slice
}
func main() {
sm := NewShardedMap(4) // 4개의 샤드를 가지는 ShardedMap 생성
// ShardedMap에 값을 쓰기
sm.Set("a", 1)
sm.Set("b", 2)
sm.Set("c", 3)
sm.Set("d", 4)
// ShardedMap에서 값을 읽기
fmt.Println(sm.Get("a")) // 출력: 1
fmt.Println(sm.Get("b")) // 출력: 2
fmt.Println(sm.Get("c")) // 출력: 3
fmt.Println(sm.Get("d")) // 출력: 4
// ShardedMap에서 값을 삭제
sm.Delete("a")
fmt.Println(sm.Get("a")) // 출력: <nil>
// ShardedMap에 키가 존재하는지 확인
fmt.Println(sm.Contains("b")) // 출력: true
fmt.Println(sm.Contains("a")) // 출력: false
// 모든 키를 가져오기
keys := sm.Keys()
fmt.Println("All keys:", keys)
}
출력
1
2
3
4
<nil>
true
false
All keys: [b d c]
위 코드를 대강 이해했다면 보이실거 같습니다. 해당 코드의 샤딩된 맵에 대해서만 락이 걸리니 락 경합이 현저하게 줄어 들 겁니다. 특히나