Loading
2022. 8. 6. 13:29 - lazykuna

Thread-safe queue design

꽤 자주 나오는 문제기도 하고, 운영체제 시간에 다루기도 하는 내용이기도 하고 이리저리 알아두면 좋은 내용이라 짤막하게 정리해둡니다.

실제 개발에 이 디자인을 활용하는 경우도 적잖게 있어, 알아두면 상당히 쓸모있는 디자인이라고 생각합니다.

문제 상황

문제 상황을 제대로 이해하는 게 중요합니다.

  • Job의 정의
    • 비동기 수행
    • 끝났는지의 여부를 확인할 필요가 있음
  • Worker queue가 존재
    • queue에는 수행할 job이 들어가 있음
    • queue.enqueue 를 통해 수행할 job을 넣고, queue.dequeue 를 통해 수행할 Job을 가져감.
  • Worker Pool이 존재
    • Worker pool 들은 대기하고 있다가, worker queue에 job이 들어오면 job.Run() 을 수행함
    • Worker pool 들은 여러개가 존재하는 상황 (통상적으로 스레드 개수만큼 존재하겠죠?)

출처: https://www.oreilly.com/library/view/designing-distributed-systems/9781491983638/ch10.html

최근 다수의 코어 기반으로 돌아가는 시스템에서 많이 쓰일수밖에 없는 구조인데, 사실 최근 언어에서는 이를 보이지 않게 포장해두어서 쉽게 쓸 수가 있는 것이 큰 매력입니다. WTHR은 근래 많이 쓰이는 Koroutine / Goroutine과 상당히 비슷한 구조이고, job을 전달하기 위한 구조는 Channel과 비슷한 구조임을 알 수 있습니다. 결국 정답은 원조 디자인에 다 있는 것…

아무튼, 고려해야 할 점들을 간단하게 짚어봅시다.

Naive WTHR

일단 가장 간단한 형태의 worker thread를 생각해 보면, 대충 worker queue로부터 job을 pop하고, job이 있으면 이를 수행하는 것이 전부인 형태가 될 것입니다.

이는 아래와 같을 것입니다.

void Run() {
    while (isRunning) {
        auto *job = wQueue.Dequeue();
        if (!job) {
            continue;
        }
        job->Run();
        Cleanup(job);
    }
}

이제 WTHR들이 수행되고, wQueue에 Enqueue를 하면 자연스럽게 job들이 수행되는 구조가 그려지게 될 것입니다.

>> 주의할 점은 while 구문이 있어야 WTHR이 됩니다!

한 가지 개인적으로 생각하면 좋은 점은 Cleanup() 입니다. job의 소유권이 WTHR에게 넘어가는 구조라면 cleanup을 저기서 해주면 좋기도 하고, 동시에 job finish broadcast를 할수 있다는 점도 좋은 점이 됩니다.

여기에서 고려해야 할 점들을 짚어나가보면…

queue 관련 명령을 atomic하게 하기

queue에서 아이템을 빼가는 행위가 atomic 할 수 있도록, critical section으로 만들어 줄 필요가 있습니다. 예를 들면, 아래와 같이 만들 수 있을 것입니다.

class Queue {
    ...

    void Enqueue(Job *j) {
        lock_guard<mutex> g(m);
        jobQueue.push(j);
    }

    Job* Dequeue() {
        lock_guard<mutex> g(m);
        return jobQueue.pop();
    }
}

하지만 아직 빠진 것이 있습니다.

empty check

WTHR은 계속해서 수행할 job을 가져오기 위해 loop를 돌 것입니다. 이때 queue가 비어있다면 문제가 발생할 것입니다. 따라서, queue가 비어 있는지 먼저 확인한 후에 dequeue를 수행해야 합니다.

하지만 이것 또한 그런데 empty한 상태를 두 스레드에서 동시에 파악하고 동시에 dequeue 하려고 할 수 있다.

즉 그러므로, empty 확인과 dequeue를 atomic하게 수행할 수 있어야 합니다. 그래서, 이거 두 작업을 lock으로 묶어봅시다.

Job* Dequeue() {
    lock_guard<mutex> g(m);
    if (jobQueue.empty()) return null;
    return jobQueue.pop();
}

이 정도까지 하면 정상 작동하는 worker queue/runner 디자인이 완성되었습니다! 하지만 아직 naive solution이기 때문에 더 고려해야 할 점들이 있습니다.

동시성 극대화

이렇게 하면 queue가 비어 있어도 무한히 loop을 돌게 되는데, 굉장한 CPU의 낭비입니다. 더 좋은 방법이 없을까요?

해법은 여러 가지가 있을 수 있을 것입니다.

  • Sleep을 건다
    • 특정한 상황에서는 좋은 답변이 될 수 있음 (오랫동안 들어오지 않는 자료라던가 …)
    • 그러나 sleep의 시간을 정확히 정할 수가 없는 게 단점. 너무 짧으면 spinlock 처럼 cpu 잡아먹는 건 동일. (사실 그 정도까지는 안 가더라도, 상대를 일부러 아래 방법으로 유도하기 위해 이 처럼 이야기하는 경우 많습니다 ㅎ)
  • Semaphore
    • worker queue에 들어있는 아이템을 기준으로 세마포어를 만들어서, queue가 모두 비면 멈추도록 만들 수 있을 것입니다.
  • Conditional Variable
    • 특정 변수 조건이 성립하지 않으면 스레드가 sleep하고, signal을 받을 때까지 대기하게 되는 방식입니다.

(개인적 취향) Conditional variable을 많이 쓰니까, 이를 설계에 적용하면 아래와 같은 방식이 될 겁니다.

class SafeQueue {
    ...

    void Enqueue(Job *j) {
        lock_guard<mutex> g(m);
        jobQueue.push(j);
        condVar.increaseAndNotify();
    }

    Job* Dequeue() {
        condVar.decrease();
        lock_guard<mutex> g(m);
        return jobQueue.pop();
    }
}

void Run() {
    while (isRunning) {
        Job *job = job->Dequeue();
        job->Run();
    }
}

즉, enqueue 하면 conditional variable에 signal을 주고, 그러면 WTHR이 깨어나서 일을 가져가게 되는 구조가 됩니다.

>> 한가지 조심할 것은 lock을 잡은 상태로 job->Run() 을 실행하게 만들면 안됩니다! 오직 enqueue, dequeue 할 때만 lock을 잡도록 최소화해야 합니다.

Job finish event

작업이 끝났으면, 끝났다는 신호나 혹은 작업의 결과값을 전달받고자 하는 객체가 있기 마련입니다. 그럼 전달을 해줘야 할 텐데, 어떻게 전달해줄수 있을지에 대해서 생각을 해 볼 필요가 있습니다.

쉽지 않은 점

이벤트가 비동기라는 점이 가장 큽니다. 비동기이기 때문에 job의 소유는 WTHR에게 넘어가기 되게 마련인데, 그러면 아래와 같은 간단한 예를 들어도 여러 상황이 발생할 수 있습니다.

Job *job = new Job();
wQueue.Enqueue(job);
// 1. 여기서 job이 수행이 끝남
job->Wait();
// 2. 여기서 job이 수행이 끝남

보통 우리는 (2)의 경우를 상정하고 만들지만, (1)의 경우에 걸리면 아주 골치가 아파집니다.

소유권이 WTHR에 넘어가는 경우, 이미 (1)에서 job이 삭제되므로 job의 결과값을 가져올 방법도 없을 뿐더러, job에 접근할 경우 잘못된 메모리 접근 오류를 맞게 됩니다.

소유권이 WTHR에 넘어가지 않아도 문제인 것이, conditional variable에 wait 걸기도 전에 finish signal이 들어오면, wait 상태에서 hang이 걸리게 됩니다.

  • 그래서 이런 경우는 conditional variable만으로는 설계가 어렵고, 별도 변수를 설계해야 이런 상황에 대비가 가능합니다.
  • 다만 그러더라도 소유권을 WTHR에 넘겨주지 않으면 무거운 shared_ptr이나 실수하기 딱 좋은 manual deleting을 써야 하는데, 굉장히 좋지 않다고 봅니다.

따라서, 이러한 경우에 사용할 수 있는 방법이 두가지가 있다고 봅니다.

콜백 및 event-driven 기반 구현

아주 고전적인 방식으로 job이 끝나면 콜백을 호출하는 방법이 있습니다.

그런데 보통 콜백을 보낼 때 여러개의 객체에 동시에 통보가 들어가야 하는 경우가 종종 생깁니다. 그래서 콜백을 관리하는 EventManager을 만들어(옵저버 패턴) “Event” 라는 개념을 만들어서, 스레드가 끝났을 때 결과값을 등록한 이벤트 핸들러에 쏴주어서 받아올 수 있도록 하는 식으로 자주 만들었던 기억이 있네요.

void func(...) {
    ...
}

EventManager.RegisterHandler("Event1", func);

// 나중에 비동기 함수가 끝났을 때
res = SomethingProcessed{ ... };
EventManager.SendEvent("Event1", res);

EventManager이 핸들러 정보를 모두 가지고 있어 소유권 통제 등에서도 비교적 깔끔하게 구현 가능한 편이지만, 몇가지 단점이 있다고 생각합니다.

  • 이벤트 명을 지정하는 구조라 컴파일타임 검증이 어렵고,
  • dangling(orphan)의 위험: 콜백함수가 더 이상 사용될 수 없을 경우 deregister을 잘 해 주어야 함.
  • 콜백 함수를 별도로 등록해야 하는 점 등이 있어 구현이나 사용이 다소 복잡한 편이 단점이라고 생각합니다. 특히 결과값을 받아올 때까지 대기해야 하는 경우라면 콜백+condVar로 구현해야 할텐데 상당히 귀찮을 테고…

Channeling

최근 프로그래밍 언어들이 쓰는 비동기 상태를 전달하기 위한 방법으로 Channel이 있습니다. 채널을 통해서 결과값을 통보받거나, 비동기 함수가 끝날때까지 기다리거나 등의 작업을 할 수 있습니다.

별도 콜백을 두지 않고 결과값을 받아와야 하는 함수에서 바로 동기로 기다리는 방식으로 쉽게 구현이 가능해서, 콜백 방식보다 깔끔하게 구현이 가능한 게 장점이라고 생각합니다. (물론 condVar 같은 건 각 채널에 내장해야 함)

구현할 때 Channel destroy 될 때 플래그를 설정해서 receiver/sender의 상태를 공유하도록 하게 할 수 있어, dangling/orphan의 위험을 줄일 수 있습니다.

// 1. Channel을 shared_ptr, 혹은 manager 기반으로 구현할 경우
Channel::~Channel() {
    // closed 되면 sender/receiver이 아무런 작동을 하지 않도록 막음
    // sender/receiver이 모두 죽으면 deregister/destroy 되도록 하면 됨.
    _close = true;
}

// 2. 또는, 별도 소유권을 가져가려 할 경우 (Rust처럼) send/recv 객체를 다르게 가져가는 방법
send, recv = CreateChannel();

...

Sender::~Sender() {
    _recv->sender = nullptr;
}

Receiver::~Receiver() {
    _sender->recv = nullptr;
}

또 그리고 named channel 개념을 도입하면 event-trigger 구조에도 사용할 수 있을테니, 이래저래 유용한 구현 방법으로 보이네요 ㅎㅎ.

또 생각해 볼 것들?

이건 상황에 따라 다르지만, WTHR에 manager을 추가하면 좋은 경우들이 있을 수 있습니다. 보통 특정한 상황이 정해지면 고려해 볼만한 내용인데… 예를 들면,

  • job의 우선순위를 결정해서 돌려야 하는 경우
  • capacity보다 많은 job이 들어올 때에 대한 대처법 (starvation 문제 등)
  • job 사이 deadlock이 발생할 수 있어 이들의 순서를 결정해야 하는 경우
  • 수행하지 않고 바로 리턴하기 위한 캐시 등

이러한 관점에서, WTHR manager에 대한 설계를 추가적으로 해볼 수도 있겠네요.