C++공부/Concurrency in Action

6. Designing lock-based concurrent data structures

아헿헿헿 2022. 7. 17. 17:13

 프로그래밍 문제를 해결하기 위해 사용하는 자료 구조의 선택은 솔루션 전체의 중요한 부분이 될 수 있고, 병렬 프로그래밍 문제가 예외 발생하지 않게 해줍니다. 만약 자료 구조가 여러 쓰레드로부터 접근이 가능해야 하며, 완전히 불변이어서 데이터가 절대 변하지 않고, 동기화가 필요하지 않으려면 프로그램이 스레드 간에 변경사항의 완벽한 동기화를 보장도록 설계되어야만 합니다. 한 방법은 챕터 3,4에서 보았던 기술들을 이용하여, 데이터를 보호하기 위해 별도의 외부 mutex 잠금을 사용하는 것이고, 다른 방법은 동시 접근을 위한 자료 구조 자체를 설계하는 방법입니다.

 동시성 자료 구조를 설계 할 때, mutex와 condition variable 같은 이전 챕터의 멀티 쓰레드 기반 응용프로그램의 기본 빌딩 블록을 사용할 수 있으며, 그 예제들을 이미 보았습니다. 이번 챕터에서는 동시성 자료 구조 설계의 일반적인 가이드라인을 보면서 배워나갈 것이며, 복잡한 자료 구조들을 공부하기 전에, condition variable들의 기본적인 빌딩 블록들을 공부한 후, 이러한 기본적인 자료 구조들의 설계를 되짚어 볼 예정입니다.

6.1. What does it mean to design for concurrency?

 기본 수준에서, 동시성을 위한 데이터 구조 디자인이라 함은 여러 스레드가 그 데이터 구조를 동시적으로 접근하는데, 같거나 다른 연산을 수행할 수 있으며, 각 스레드는 자신만의 일관된 데이터 구조를 볼 수 있음을 의미합니다. 데이터는 없어지거나 잘못되지 않으며, 모든 불변성은 유지되고 문제되는 경쟁 조건은 존재하지 않을 때 이러한 데이터 구조는 스레드 안전(thread-safe)하다고 합니다. 

 진정으로 동시성을 위해 디자인 되었다고 하는 것은, 데이터 자료에 접근하는 스레드들에게 동시성의 기회를 제공하는 것입니다. 뮤텍스는 상호 배타성(mutual exclusion)을 제공하여, 오직 하나의 스레드만이 한번에 해당 뮤텍스에 대해 잠금을 획득할 수 있어 보호하는 데이터에 대한 동시적 접근을 명시적으로 금지함으로써 데이터 구조를 보호합니다. 이를 직렬화(serialization)이라고 합니다. 스레드들은 뮤텍스로부터 보호받는 데이터에 번갈아 가며 접근합니다. 스레드들은 데이터에 접근할때 동시에 접근하지 말고, 순차적으로 접근해야 합니다. 따라서 동시 접근을 허용하는 자료 구조를 설계할 때에는 조심해야 합니다. 몇몇 자료 구조들은 다른것들보다 동시성의 범위가 더 넓지만, 모든 방법은 같다. 보호할 영역이 작고, 연산이 적은 것들은 직렬화해야 합니다. 이것들은 동시성의 가능성이 더 큽니다.

몇몇 자료 구조 설계들을 보기 전에, 언제 동시성을 디자인해야하는지 설명하는 가벼운 가이드라인들을 살펴보도록 합시다.

6.1.1. Guidelines for designing data structures for concurrency

 동시적 접근을 위한 데이터 구조를 디자인 할 때 ​접근 안전성 여부와 진정한 동시 접근 여부, 2가지 관점을 고려해야 합니다. 먼저 접근에 안전한지를 확신하는 것은,

  • 어떠한 스레드도 다른 스레드의 행동에 의해 데이터 구조의 불변성이 깨진 상태를 볼 수 있어서는 안됩니다.
  • 동작 과정보다는 완전한 동작을 위한 함수를 제공함으로써 데이터 구조에 대한 인터페이스에 내재된 경쟁 조건을 피하도록 신경써야 합니다.
  • 불변성이 깨지지 않음을 확신하기 위해 데이터 구조가 예외 상황에서 어떻게 동작하는지를 파악해야 합니다.
  • 잠금 범위를 제한하고 중첩된 잠금을 피함으로써 데드락의 기회를 최소화해야 합니다.

 또한, 사용자들에게 어떠한 제약을 제공할지도 생각해야 합니다. 생성 도중에 접근하거나, swap 도중에 접근하거나 하는 등, 어떤 연산이 가능하고 불가능하는 지를 알아두어야만 합니다. 두번째 진정한 동시 접근을 가능하게 하기 위해, 다음의 질문들이 필요합니다.

  • 동작의 어떤 부분이 잠금 바깥에서 수행되도록 하여 잠금의 범위를 제한할 수 있는가?
  • 데이터 구조의 다른 부분은 다른 뮤텍스에 의해 보호될 수 있는가?
  • 모든 동작들이 같은 수준의 보호를 필요로 하는가?
  • 데이터 구조에 대한 단순한 변화가 동작의 의미 변화 없이 동시성을 향상시킬 수 있는가?

 질문들은 반드시 발생하는 직렬화 최소화 및 진정한 동시성의 최대화시킬 지에 대한 고민에서 발생하였습니다. 자료 구조가 읽기만 하는 다중 스레드로부터의 동시 접근을 허용하는 경우는 드물지 않은 반면, 수정할 수 있는 스레드는 독립적인 접근을 가져야만 합니다. 이는 std::shared_mutex같은 구조를 사용하여 가능합니다. 반면에, 곧 보겠지만, 자료 구조가 같은 연산을 수행하는 쓰레드를 직렬화하는 동안 다른 연산을 수행하는 스레드들로 부터 동시 접근을 지원하는 것은 아주 흔한 일입니다.

 간단한 쓰레드 세이프한 자료 구조들은 일반적으로 뮤텍스와 데이터를 보호하기 위한 잠금을 사용합니다. 문제는 존재하지만, 한번에 오직 한 스레드만이 자료 구조에 접근할 수 있도록 보장하는 것은 상대적으로 쉽습니다. 비교적 쉬운 스레드 세이프한 자료 구조의 설계를 위해, 잠금 기반 자료 구조를 먼저 살펴보고 그 후에 잠금 없는 동시성 자료 구조 설계에 대해서 알아보겠습니다.

6.2.  Lock-based concurrent data structures

 잠금 기반 동시성 자료 구조 설계는 데이터에 접근하려 할때 올바른 뮤텍스를 잠그고, 최소한의 시간 동안 유지되도록 하는 것입니다. 데이터가 뮤텍스의 보호 밖에서 접근되지 못하도록 보장하고 내재된 경쟁 상태가 없도록 해야합니다. 만약 자료 구조의 부분을 위한 각각의 뮤텍스를 사용할때, 문제는 악화되며, 자료 구조의 연산이 잠긴 뮤텍스를 하나 이상 필요로 할때 교착 상태가 존재할 수도 있습니다. 그러므로 단일 뮤텍스를 사용하는 데이터 구조의 설계보다 다중 뮤텍스를 사용하는 자료 구조의 설계인 경우 더 세심히 고려할 필요가 있습니다.

 이번 세션에서 뮤텍스와 데이터를 보호하기 위한 잠금을 사용한 몇몇 간단한 자료 구조를 설계하며 가이드 라인을 적용해보도록 합니다. 각각의 케이스에서 데이터구조가 스레드 안전하면서도 더 나은 동시성을 찾아내도록 합니다.

6.2.1. A thread-safe stack using locks

 3장에서 구현한 스택은 단일 뮤텍스를 통해 간단하게 구현하였는데 이를 살펴보겠습니다.

Listing 6.1 A class definition for a thread-safe stack

#include <exception>
#include <stack>
#include <__mutex_base>

struct empty_stack : std::exception
{
    const char* what() const throw;
};

template<typename T>
class threadsafe_stack {
private:
    std::stack<T> data;
    mutable std::mutex m;
public:
    threadsafe_stack() { }

    threadsafe_stack(const threadsafe_stack& other)
    {
        std::lock_guard<std::mutex> lock(other.m);
        data = other.data;                             ----- /* 1 */
    }

    threadsafe_stack& operator=(const threadsafe_stack) = delete;

    void push(T new_value) {
        std::lock_guard<std::mutex> lock(m);
        data.push(std::move(new_value));
    }

    std::shared_ptr<T> pop() {                    ------     /* 2 */
        std::lock_guard<std::mutex> lock(m);
        if (data.empty()) throw empty_stack();      
        std::shared_ptr<T> const res(std::make_shared(std::move(data.top())));  
        data.pop();                     
        return res;
    }

    void pop(T& value) {                            -----  /* 3 */
        std::lock_guard<std::mutex> lock(m);
        if (data.empty()) throw empty_stack();
        value = std::move(data.top());         
        data.pop();          
    }

    bool empty() const {
        std::lock_guard<std::mutex> lock(m);
        return data.empty();
    }
};
  • 기본적인 스레드 세이프를 제공하는 방법은 위와 같이 각각의 멤버 함수를 뮤텍스로 잠궈, 한번에 오직 한 스레드가 데이터에 접근할 수 있도록 하는 것입니다. 이를 통해 모든 멤버 함수들은 불변을 유지하고 어떤한 쓰레드도 불변이 깨지는 것을 볼 수 없게 됩니다.
  • empty()와 pop() 함수들은 경쟁 상태가 발생할 가능성이 있는데, pop()에 잠금이 걸려있을 때 스택이 비어있는 지 명시적으로 확인하기 때문에, 문제가 되지 않습니다. pop()을 호출하는 부분에서 꺼낸 데이터를 바로 반환해 줌으로써 std::stack의 top(), pop() 멤버 함수에서 발생할 수 있는 잠재적인 경쟁 상태를 피합니다.
  • 예외가 발생할 가능성 남아있는데, 뮤텍스를 잠그는 것은 예외를 던질 수 있지만, 극히 드뭅니다. 각각의 멤버 함수에서 뮤텍스를 처음에 잠구어, 어느 데이터도 수정되지 않기때문에 안전합니다. 뮤텍스를 잠금 해제하는것은 실패하지 않고 항상 안전하고, std::lock_guard는 뮤텍스가 잠긴 채로 남아있지 않게 방지해줍니다.
  • 1번의 data.push()에서는 데이터를 복사/이동할 때 예외를 던지거나 아래 자료구조를 확장하기에 충분하지 않은 메모리가 할당되어 예외를 던질 수 있습니다. 이는 std::stack에서 해결하기에 문제가 되지는 않습니다.
  • 2번의 경우는 스스로 empty_stack 예외를 던져, 비어있는 상태일 때 안전하지만, std::make_shared 호출은 새 객체를 메모리에 할당하지 못하면 예외를 던지며, 내부 데이터는 레퍼런스 카운팅을 필요로 하거나 반환될 아이템의 복사 생성자 혹은 이동 생성자는 새로 할당된 메모리로 복사/이동할때 예외를 던집니다. 두 경우 모두 C++ 런타임과 표준 라이브러리에서 메모리 누수를 막고, 제대로 새 객체가 소멸되도록 보장합니다. 결국 반환으로 예외를 던지지 못하게 보장하므로 exception-safe 합니다.
  • 3번의 경우도 비슷하여, 새 객체와 std::shared_ptr 인스턴스를 생성하는 것보다 복사 할당 또는 이동 할당 연산자는 예외를 던질 수 있습니다. data.pop()이 호출되기 전까지 자료 구조를 수정하지 않는 것은 예외 발생을 막는 것이기에 따라서 이 또한 exception-safe 합니다.
  • 마지막으로 empty()는 어떠한 데이터도 수정하지 않는다. 그러므로 이것도 exception-safe 하다.
  • 잠금이 걸려있는 동안 다른 코드를 호출하면 교착상태가 발생할 수도 있습니다. 스택에서 복사, 이동, 할당 연산을 이용할 때, 만약 이 함수들이 스택에 삽입, 삭제같이 잠금을 필요로하는 멤버 함수가 겹쳐서 호출되면 교착상태가 발생할 가능성이 존재하지만 이는 사용자가 스택에 아이템 삽입, 삭제를 복사나 메모리 할당 없이 할 수 없도록 보장해야만 합니다.
  • 모든 멤버 함수가 데이터를 보호하기위해 std::lock_guard<>를 사용하기 때문에 thread-safe합니다. 이를 사용하지 않는 생성자와 소멸자는 안전하지 않지만, 큰 문제는 되지 않는다. 객체는 오직 한번 생성되고 소멸될 수 있기 때문입니다. 완전히 생성되지 않거나 파괴된 객체의 멤버함수를 호출하는것은 concurrency든 아니든 좋은 생각이 아닙니다. 결과적으로, 사용자는 반드시 객체가 완벽하게 생성되기 전까지는 다른 스레드들이 스택에 접근하지 못하도록 보장하고, 스택이 소멸되기전에 스택의 접근을 중단시켜야 합니다.
  • 잠금을 사용해서 다중 스레드가 멤버함수를 동시에 호출해도 안전할 지라도, 한번에 오직 한 스레드만 스택 자료구조에서 일을 할 수 있습니다. 이러한 직렬화는 어플리케이션의 성능을 제한하는데, 기다리는 데에 기약이 존재하지 않아 여러 추가적인 기능을 필요로 합니다. 이렇게 대기하는 쓰레드는 데이터를 확인하면서 자원을 소비하거나 사용자가 외부 대기 및 알림코드를 작성해야 하기 때문에 좋은 구조가 되지 못합니다. 4장의 큐는 자료구조 안에서 상태변수를 사용해서 스스로 대기하는 모습을 보여주기에 이를 다음으로 구현해 보았습니다.

6.2.2. A thread-safe queue using locks and condition variables

Listing 6.2 The full class definition for a thread-safe queue using condition variables

#include <queue>
#include <__mutex_base>

template<typename T>
class threadsafe_queue {
private:
    mutable std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue() { }

    void push(T new_value) {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(std::move(new_value));
        data_cond.notify_one();                                     ------- /* 1 */
    }

    void wait_and_pop(T& value) {                                   ------- /* 2 */
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this] { return !data_queue.empty(); });
        std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
    }


    std::shared_ptr<T> wait_and_pop() {                             ------- /* 3 */
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this] { return !data_queue.empty(); }); ------- /* 4 */
        std::shared_ptr<T> res(std::make_shared(std::move(data_queue.front())));
        data_queue.pop();
        return res;
    }

    bool try_pop(T& value) {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty()) return false;
        value = std::move(data_queue.front());
        data_queue.pop();
        return true;
    }

    std::shared_ptr<T> try_pop() {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
        return std::shared_ptr<T>();                                ------- /* 5 */
        std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        return res;
    }

    bool empty() const {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};
  • 위는 listing 6.1 스택과 비슷하지만 data_cond.notify_one()과 wait_and_pop()을 추가하였습니다. try_pop() 메서드는 이전과 유사하지만, 큐가 비어있어도 예외를 던지지 않는 대신에 데이터가 있는지 알려주는 bool 값을 반환하거나, NULL 포인터를 반환합니다.
  • 새로운 wait_and_pop() 함수들은 스택에서 보았던 큐 항목에 대한 대기문제의 해결책으로, empty()를 계속 호출하기보다 대기하는 쓰레드가 wait_and_pop()을 한번 호출하고 자료구조가 조건 변수을 통해 값이 들어올 때 까지 기다리게 하였습니다. 큐가 비지 않을때 까지 반환하지 않기에 비어있는 큐에서 작동하거나 계속 보호 받고 있는 지에 걱정이 필요 없어 경쟁 상태나 데드락의 가능성이 없어 불변성을 유지합니다.
  • exception-safety 에 관해 약간의 차이가 존재하는데, 여러 스레드들이 push를 기다릴 때, 오직 한 쓰레드만이 조건 변수에 의해 깨어납니다. 하지만 쓰레드가 4번줄의 wait_and_pop()에서 새 std::shared_ptr<> 생성으로 예외를 던지져 큐에는 push되어 있지만 다른 쓰레드들은 깨어나지 않게 됩니다. data_cond.notify_all()를 사용하는 방법도 있어 모든 스레드를 깨우지만 하나를 제외한 대부분은 큐가 비어있는것을 확인하고 다시 자러갑니다. 두번째 대안은 wait_and_pop()이 예외를 던질때 notify_one()을 호출하도록 하는 것으로 notify_one을 호출함으로써 다른 쓰레드는 저장된 값을 가져올 수 있습니다. 세번재 대안은 std::shared_ptr<> 생성자를 push() 호출할 때로 옮기고 std::shared_ptr<> 인스턴스를 데이터 값들 대신 저장하는 것입니다. std::shared_ptr<>을 내부 std::queue<> 밖에서 복사를 하게 되면 예외를 던지지 않아서, wait_and_pop()은 안전하게 됩니다. 다음 볼 큐는 이를 염두해 두고 구현한 것이다.

Listing 6.3 A thread-safe queue holding std::shared_ptr<> Instances

#include <__mutex_base>
#include <queue>

template<typename T>
class threadsafe_queue {
private:
    mutable std::mutex mut;
    std::queue<std::shared_ptr<T>> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue() { }

    void wait_and_pop(T& value) {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this] { return !data_queue.empty(); });
        value = std::move(*data_queue.front());                  ----- /* 1 */
        data_queue.pop();
    }

    bool try_pop(T& value) {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return false;
        value = std::move(*data_queue.front());                  ----- /* 2 */
        data_queue.pop();
        return true;
    }

    std::shared_ptr<T> wait_and_pop() {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this] { return !data_queue.empty(); });
        std::shared_ptr<T> res = data_queue.front();             ----- /* 3 */
        data_queue.pop();
        return res;
    }

    std::shared_ptr<T> try_pop() {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> res = data_queue.front();             ----- /* 4 */
        data_queue.pop();
        return res;
    }

    void push(T new_value) {
        std::shared_ptr<T> data(std::make_shared<T>(std::move(new_value))); ----- /* 5 */
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(data);
        data_cond.notify_one();
    }

    bool empty() const {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};

 

std::shared_ptr<>로 데이터 가지게 되어 변한 것은 새 데이터를 변수에 담는 pop() 함수들은 저장된 값을 이동시키고(1, 2번), std::shared_ptr<> 인스턴스를 반환하는 pop() 함수들은 호출자에게 반환하기 전에 큐에서 복사하여 반환 합니다.(3, 4번)
 만약 std::shared_ptr<>이 데이터를 잡고있다면 장점이 있다. 새 인스턴스 할당이 push()의 잠금 밖에서 가능하기에, 비싼 할당 연산 시간동안 뮤텍스에 잡혀있지 않아 동시성이 더 확보 되며 성능에서 이점을 지닙니다. 
 뮤텍스 하나로 전체 자료구조에 사용한 것은 스택 예제처럼 큐가 지원하는 동시성을 제약하빈다. 비록 여러 쓰레드들이 큐의 다양한 함수들에서 막혔을지라도, 한번에 한 쓰레드만이 일할 수 있습니다. std::queue<>와 같은 표준 컨테이너를 사용함으로써 한 아이템만을 가지기에 이러한 제한을 지니게 되었습니다. 자료 구조의 세부 구현을 좀 더 제어한다면, 보다 세밀한 잠금과 더 높은 수준의 동시성이 이루어낼 수 있습니다.

 

6.2.3. A thread-safe queue using fine-grained locks and condition variables

 위의 예제에서는 보호받는 아이템과 뮤텍스를 사용하였는데 좀 더 세밀하게 제어하기 위해서는 큐 내부를 들여다보고 부분마다 뮤텍스를 할당해야 합니다.

 가장 간단한 큐 자료구조는 연결 리스트로, 큐는 첫번째 원소를 가리키는 head 포인터를 포함하고 있고 각 원소는 다음 원소를 가리킵니다. 삭제 될떄는 원소들이 가리키는 주소가 바뀌면서 삭제 됩니다. 큐에 값을 넣기 위해서는 리스트의 마지막 원소를 참조하는 tail 포인터도 가지고 있어야 합니다. 리스트가 비어있을때는 head 와 tail 포인터 둘다 NULL입니다. 밑의 예제는 간단한 큐의 구현으로 싱글 스레드를 가정하게 만들었기에 try_pop() 함수만 있고 wait_and_pop() 함수는 없습니다.

Listing 6.4 A simple single-threaded queue Implementation

#include <memory>

template<typename T>
class queue {
private:
    struct node {
        T data;
        std::unique_ptr<node> next;

        node(T data_) : data(std::move(data_)) { }
    };

    std::unique_ptr<node> head;   
    node* tail;                 

public:
    queue() { }

    queue(const queue& other) = delete;

    queue& operator=(const queue& other) = delete;

    std::shared_ptr<T> try_pop() {
        if (!head)
            return std::shared_ptr<T>();
        std::shared_ptr<T> const res(std::make_shared<T>(std::move(head->data)));
        std::unique_ptr<node> const old_head = std::move(head);
        head = std::move(old_head->next);
        return res;
    }

    void push(T new_value) {
        std::unique_ptr<node> p(new node(std::move(new_value)));
        node* const new_tail = p.get();
        if (tail) {
            tail->next = std::move(p);
        } else {
            head = std::move(p);
        }
            tail = new_tail;
    }
};

 우선 노드들을 메모리 관리를 위해 std::unique_ptr을 사용하며, 소유권이 head에서 tail로 이어지게 됩니다. 따라서 tail은 포인터만을 들고 있습니다.
 이 구현은 싱글 스레드에서는 잘 돌아가지만, 멀티스레드에서 잠금을 사용하려면 문제점을 발생시킵니다. 먼저 2가지 아이템을 지니고 있기에 2가지의 뮤텍스를 사용해야할 것 같지만 이는 몇가지의 문제를 발생시킵니다. 가장 명백한 문제는 push()가 head와 tail을 수정하기에 두 뮤텍스를 모두 잠궈야 하는데, push()와 pop() 둘 다 노드의 next 포인터에 접근한다는 것이다. push가 tail->next를 업데이트하고, try_pop은 head->next를 읽어 들이는데, 하나의 아이템만 존재한다면, head와 tail은 같은 것을 가리키고, next도 필연적으로 같게 되어, 보호가 필요하게 됩니다. 결국 경쟁 조건이 발생하게 되기에 head와 tail은 같은 뮤텍스가 필요하다는 결론이 나게 됩니다. 이에 대한 해결 방안은 아래에서 설명하겠습니다.

 

[Enabling Concurrency By Seperating Data]

 위의 문제점은 값이 없는 더미 노드를 미리 할당함으로 해결 가능합니다. 더미 노드를 미리 할당하면 큐에는 항상 적어도 1개 이상의 노드(head 이면서 tail)가 존재하기에, head 와 tail이 NULL이 아니라 더미 노드를 가리키며, 큐가 비어있을 때는 try_pop()이 head->next에 접근하지 않기때문에 안전하게 됩니다. 큐에 노드를 추가한다면, head와 tail은 각각 다른 노드들을 가리키게 되어 앞서 말한 문제가 사라집니다. 단점은 더미 노드들을 위해 포인터로 데이터를 저장하는 추가의 우회가 필요한데 이는 아래에서 어떻게 구현된 지를 볼 수 있습니다.

Listing 6.5 A simple queue with a dummy node

#include <memory>

template<typename T>
class queue {
private:
    struct node {
/* 1 */ std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    };

    std::unique_ptr<node> head;
    node* tail;

public:
    /* 2 */
    queue() : head(new node), tail(head.get()) { }

    queue(const queue& other) = delete;

    queue& operator=(const queue& other) = delete;

    std::shared_ptr<T> try_pop() {
/* 3 */ if (head.get() == tail) {
            return std::shared_ptr<T>();
        }
/* 4 */ std::shared_ptr<T> const res(head->data);
        std::unique_ptr<node> old_head = std::move(head);
/* 5 */ head = std::move(old_head->next);
/* 6 */ return res;
    }

    void push(T new_value) {
/* 7 */ std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
/* 8 */ std::unique_ptr<node> p(new node);
/* 9 */ tail->data = new_data;
        node* const new_tail = p.get();
        tail->next = std::move(p);
        tail = new_tail;
    }
};

먼저 try_pop()이 약간 변하였는데, 이전의 NULL 체크에서 head와 tail을 비교하여 비어 있는 지를 확인합니다. 두번째로, try_pop에서 node가 포인터로 데이터를 저장하기에, T의 새 인스턴스를 만드는것보다 포인터로 바로 접근하는게 더 좋습니다. push()에서는 큰 변화가 있어 다음과 같이 작동합니다.

  • [7] 힙에 T 인스턴스를 먼저 만들고 메모리 재할당 오버헤드를 피하기 위해 std::shared_ptr<>로 가져야 옵니다. 
  • [8] 더미 노드를 새로 만들어 줍니다.
  • [9] 대신 이전의 더미 노드에 new_value를 복사해서 넣어줍니다.
  • [2] 이를 위해 처음 생성할 때, 더미노드를 가지기 위해서, 생성자에서 만들어주어야만 합니다.

 push()는 head가 아니라 tail에 접근한다. try_pop()은 head와 tail에 모두 접근하지만, tail은 초기비교에만 필요하고, 잠금은 일시적이며, 더미노드 때문에 try_pop()과 push()가 같은 노드에서 연산하지 않아서 더이상 뮤텍스가 필요하지 않습니다. 따라서 head용, tail용 뮤텍스가 하나씩 존재하면 되는데 이를 어디다 사용할 지가 쟁점이 됩니다. push()의 경우는 뮤텍스는 tail에 대한 모든 접근에서 잠겨야 하기에 8과 9번 사이에 넣어주고 끝까지 유지하면 됩니다.
 try_pop()은 복잡한데, 먼저 head 뮤텍스를 잠궈, 어느 스레드에서 작업이 이루어질 지를 정합니다. head가 변경되고 나면 뮤텍스가 필요 없기에 반환되기 전에 풀어야 합니다 따라서 5,6 번사이에 들어가게 됩니다. 남은 것은 tail에 관한 것인데, 오직 한번만 tail에 접근해야하기 때문에, 읽는 시간 동안 뮤텍스를 얻을 수 있도록 합니다. 사실 head 뮤텍스를 잠궈야 하는 코드가 멤버들의 집합이라서, 함수를 감싸는게 명확합니다. 이를 적용한 코드는 다음과 같습니다.

 

Listing 6.6 A thread-safe queue with fine-grained locking

#include <memory>
#include <__mutex_base>

template<typename T>
class threadsafe_queue {
private:
    struct node {
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    };

    std::mutex head_mutex;
    std::unique_ptr<node> head;
    std::mutex tail_mutex;
    node* tail;

    node* get_tail() {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        return tail;
    }

    std::unique_ptr<node> pop_head() {
        std::lock_guard<std::mutex> head_lock(head_mutex);

        if (head.get() == get_tail()) {
            return nullptr;
        }
        std::unique_ptr<node> old_head = std::move(head);
        head = std::move(old_head->next);
        return old_head;
    }

public:
    threadsafe_queue() : head(new node), tail(head.get()) { }

    threadsafe_queue(const threadsafe_queue& other) = delete;

    threadsafe_queue& operator=(const threadsafe_queue& other) = delete;

    std::shared_ptr<T> try_pop() {
        std::unique_ptr<node> old_head = pop_head();
        return old_head ? old_head : std::shared_ptr<T>();
    }

    void push(T new_value) {
        std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
        std::unique_ptr<node> p(new node);
        node* const new_tail = p.get();
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        tail->data = new_data;
        tail->next = std::move(p);
        tail = new_value;
    }
};

 이전의 가이드라인을 생각하며 이 코드를 보아, 불변성이 망가지기 전에 다음 사항을 짚고 넘어가야 합니다.

  • tail->next == nullptr
  • tail->data == nullptr
  • head == tail 은 리스드가 비었음을 의미
  • 1개의 원소가 있을때는 head->next == tail 이다.
  • 리스트의 각 노드 x는 tail이 아닌 경우, 'x->data'는 T 인스턴스를 가리키며 x->next는 리스트의 다음 노드를 가리킵니다.
  • head의 next 노드를 따가가다보면 결국 tail이 나옵니다.

push()는 직관적입니다. 자료구조의 수정은 tail_mutex에 의해 보호받고, 불변을 유지합니다. 흥미로운 부분은 try_pop()으로, tail 읽기를 보호하기위해, tail_mutex의 잠금만이 필요한 것이 아니라, head와 중첩되는 경우를 막아 경쟁 상태를 방지합니다. 만약 뮤텍스가 존재하지 않아 동시 접근이 된다면 정의되지 않은 행동을 하게 될 것 입니다. 
 get_tail()의 호출이 head_mutex 잠금안에서 일어나는것도 중요합니다. 만약 그렇지 않으면, pop_head()호출은 get_tail()과 head_mutex의 잠금 사이에서 갇힐 수 있습니다.

std::unique_ptr<node> pop_head() {                     // 이건 잘못된 구현이다.
    node* const old_tail = get_tail();                 // [1] head_mutex잠금 밖에서 old tail의 값을 가져온다.
    std::lock_guard<std::mutex> head_lock(head_mutex);

    if (head.get() == old_tail) {                      // [2]
        return nullptr;
    }
    std::unique_ptr<node> old_head = std::move(head);
    head = std::move(old_head->next);                  // [3]
    return old_head;
}

 해당 시나리오에서 get_tail()을 잠금 밖에서 호출하고 있는데, head_mutex에 대한 작업을 하는 동안 수정된다면, tail에 대한 정보가 옳지 않게 되거나 리스트 밖에 존재할 수도 있고, 비교도 실패하게 됩니다. 이를 통해 head를 업데이트할 때 head를 tail 너머의 리스트로 보내질 수도 있어 자료구조가 망가집니다. 따라서 get_tail은 잠금 내부에서 이루어져 불변을 유지하도록 해야합니다.
 예외들은 흥미롭습니다. 데이터 할당 패턴을 변경하여, 예외들은 다른 곳에서 발생합니다. 예외를 던질 수 있는 try_pop()의 연산은 뮤텍스 잠금에 해당하여, 데이터는 잠금을 요청할 때까지 수정되지 않기에 try_pop()은 exception-safe합니다. 반면에 push()는 힙에 새 T 인스턴스를 할당하고, 새 노드 인스턴스 할당하는 과정에서 예외를 던질 수 있지만, 둘 다 스마트포인터를 사용하기 때문에, 예외가 발생하여도 문제가 생기지 않아 push()도 exception-safe합니다.

 인터페이스를 변경하지 않았기 때문에 교착 상태에 대한 생길 여지는 존재하지 않습니다. 두 개의 잠금이 획득되는 유일한 장소는 pop_head()이기에 데드락이 발생하지 않습니다.
 남은 문제는 동시성 여부에 관한 것으로, 이 데이터 구조는 더 세밀한 잠금과 밖에서 더 많은 작업을 수행하기에 이전보다 훨씬 더 넓은 동시성 범위를 지니고 있습니다. push()에서 잠금 없이 할당되어 할당은 여러 스레드에서 진행 될 수 있어 시간을 절약할 수 있습니다. try_pop()은 tail에서 읽기를 보호하기 위한 tail_mutex를 짧은 시간 동안만 유지하여 결과적으로 try_pop()에 대한 호출 거의 전체가 push() 호출과 동시에 발생할 수 있습니다. 또한 head_mutex를 보유하는 동안 수행되는 작업은 매우 미미하고 값비싼 삭제는 잠금 밖에 존재합니다.

 

 

 

[Waiting For An Item To Pop]

 앞의 예시는 잠금 존재하는 스레드 안전한 큐를 제공하지만 try_pop()만 지원합니다. 이를 wait_and_pop() 함수로 확장 하고자 합니다. push()를 수정하는 것은 함수의 끝에 data_cond.notify_one() 호출을 추가하면 될 것 같아 보이지만, 최대한의 동시성을 원하기 때문에 세밀한 잠금을 필요로 되기에 쉽지 않습니다 notify_one()을 호출하는 동안 뮤텍스를 잠군다면, 뮤텍스가 잠금 해제되기 전에 깨어난 스레드가 해제되면 뮤텍스를 기다려야 하고 뮤텍스의 잠금을 해제 해놓는다면, 대기 중인 스레드가 깨어날 때 획득 가능합니다.
 wait_and_pop 어디서 기다리고, 조건이 무엇이고 어떤 뮤텍스가 잠겨야하는 지를 고려해야 하기에 더 복잡합니다. 조건은 head != tail로 큐가 비지 않았음을 기다리는데 이는 head_mutex와 tail_mutex를 모두 잠궈야할 것 같지만, get_tail로 가져올 때만 잠궈도 되는 것을 이미 알고 있습니다. 따라서 head_mutex만 유지한 채로 head != get_tail()을 실행시키고, 이후 data_cond.wait()에 대한 호출을 사용하여 대기 로직을 추가하면 구현은 try_pop()과 동일합니다.

 try_pop()의 두 번째 오버로드와 그에 해당하는 wait_and_pop() 오버로드에 대해서는 신중한 고려가 필요합니다. old_head에서 얻은 std::shared_ptr<> 반환값을 값 매개 변수에 대한 복사 할당으로 바꾸면 잠재적인 예외-안전 문제가 존재합니다. 이 시점에서 데이터 항목은 대기열에서 제거되고 뮤텍스가 잠금 해제됩니다. 남은 것은 데이터를 호출자에게 반환하기만 하면 되는데 복사 할당에 예외가 발생할 경우 데이터 항목은 동일한 위치의 대기열로 반환될 수 없기 때문에 사라지게 됩니다.
 또한, 템플릿 인수에 사용된 실제 유형 T가 예외를 던지지 않는 이동 할당 연산자 또는 스왑 연산자가 있다면, 이를 사용할 수 있지만 모든 유형 T에 대한 일반적인 해법을 원하기에, 이 경우 노드가 목록에서 제거되기 전에, 잠재적으로 예외를 던지는 작업을 잠금된 영역 내로 이동시켜야 합니다. 즉, 목록을 수정하기 전에 저장된 값을 검색하는 pop_head()의 오버로드가 추가로 필요하게 됩니다. empty는 head == get_tail()을 통해 간단히 구현됩니다. 이를 구현한 결과는 다음과 같습니다.

 

Listing 6.7 A thread-safe queue with locking and waiting: internals and interface

template<typename T>
class threadsafe_queue
{
private:
    struct node
    {
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    };
    
    std::mutex head_mutex;
    std::unique_ptr<node> head;
    std::mutex tail_mutex;
    node* tail;
    std::condition_variable data_cond;
public:
    threadsafe_queue():
        head(new node),tail(head.get())
    {}
    threadsafe_queue(const threadsafe_queue& other)=delete;
    threadsafe_queue& operator=(const threadsafe_queue& other)=delete;

    std::shared_ptr<T> try_pop();
    bool try_pop(T& value);
    std::shared_ptr<T> wait_and_pop();
    void wait_and_pop(T& value);
    void push(T new_value);
    bool empty();
};

 

Listing 6.8 A thread-safe queue with locking and waiting: pushing new values

template<typename T>
void threadsafe_queue<T>::push(T new_value)
{
    std::shared_ptr<T> new_data(
        std::make_shared<T>(std::move(new_value)));
    std::unique_ptr<node> p(new node);
    {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        tail->data=new_data;
        node* const new_tail=p.get();
        tail->next=std::move(p);
        tail=new_tail;
    }
    data_cond.notify_one();
}

 

Listing 6.9 A thread-safe queue with locking and waiting: wait_and_pop()

template<typename T>
class threadsafe_queue
{
private:

    std::atomic<node*> tail;
    std::atomic<node*> head;

    node* get_tail()
    {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        return tail;
    }

    std::unique_ptr<node> pop_head()
    {
        std::unique_ptr<node> const old_head=std::move(head);
        head=std::move(old_head->next);
        return old_head;
    }

    std::unique_lock<std::mutex> wait_for_data()
    {
        std::unique_lock<std::mutex> head_lock(head_mutex);
        data_cond.wait(head_lock,[&]{return head!=get_tail();});
        return std::move(head_lock);
    }

    std::unique_ptr<node> wait_pop_head()
    {
        std::unique_lock<std::mutex> head_lock(wait_for_data());
        return pop_head();
    }

    std::unique_ptr<node> wait_pop_head(T& value)
    {
        std::unique_lock<std::mutex> head_lock(wait_for_data());
        value=std::move(*head->data);
        return pop_head();
    }
        
public:
    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_ptr<node> const old_head=wait_pop_head();
        return old_head->data;
    }

    void wait_and_pop(T& value)
    {
        std::unique_ptr<node> const old_head=wait_pop_head(value);
    }
};

 

Listing 6.10 A thread-safe queue with locking and waiting: try_pop() and empty()

template<typename T>
class threadsafe_queue
{
private:
    std::unique_ptr<node> try_pop_head()
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        if(head.get()==get_tail())
        {
            return std::unique_ptr<node>();
        }
        return pop_head();
    }

    std::unique_ptr<node> try_pop_head(T& value)
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        if(head.get()==get_tail())
        {
            return std::unique_ptr<node>();
        }
        value=std::move(*head->data);
        return pop_head();
    }

public:
    std::shared_ptr<T> try_pop()
    {
        std::unique_ptr<node> const old_head=try_pop_head();
        return old_head?old_head->data:std::shared_ptr<T>();
    }

    bool try_pop(T& value)
    {
        std::unique_ptr<node> const old_head=try_pop_head(value);
        return old_head;
    }

    bool empty()
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        return (head==get_tail());
    }
};

 이는 메모리가 허하는 한 새로운 값을 계속 넣을 수 있는 무한 큐입니다. 이에 대한 대안으로는 최대 길이를 제한하는 경계 큐이며, 큐가 가득 찬 상태에서 더 넣을려고 한다면 실패하거나 공간이 빌 때까지 차단됩니다. 경계 큐는 수행할 작업에 따라 작업을 스레드 간에 나눌 때 작업의 균등한 확산을 보장하는 데 유용할 수 있고, 대기열을 채우는 스레드가 대기열에서 항목을 읽는 스레드보다 너무 앞서 실행되지 않습니다.
 여기에 표시된 무한 대기열 구현은 push()에서 조건 변수를 기다림으로써 대기열 길이를 제한하기 위해 쉽게 확장할 수 있습니다. pop()에서처럼 대기열에 항목이 있을 때까지 기다리지 않고 최대 항목 수보다 적은 항목이 있을 때까지 대기하게 하면 됩니다.

 

6.3. Designing more complex lock-based data structures

 스택과 큐는 인터페이스가 매우 제한적이며 한정된 곳에 사용되기 때문에 간단하게 구현 가능하지만, 모든 데이터 구조가 그렇게 간단한 것은 아닙니다. 대부분의 데이터 구조는 다양한 작업을 지원하는데, 이는 동시성을 위한 더 많은 기회를 가져올 수 있지만, 또한 다중 접근을 고려해야 하기 때문에 데이터를 보호하는 일을 훨씬 더 어렵게 만듭니다. 동시 접근을 위한 데이터 구조를 설계할 때 수행할 수 있는 다양한 작업의 정밀도가 중요합니다. 이를 룩업 테이블의 설계를 통해 살펴보겠습니다.

6.3.1. Writing a thread-safe lookup table using locks

 룩업 테이블은 키를 통해 값과 연결합니다. 일반적으로 이러한 구조의 목적은 코드가 주어진 키로 관련된 데이터를 찾도록 하는 것입니다.

 스택과 큐의 메소드는 상태 변경을 자주 하는 반면, 룩업 테이블은 거의 수정 않습니다. 표준 컨테이너의 인터페이스는 내재된 경쟁 조건을 만들기에 여러 스레드에서 동시에 데이터 구조에 액세스해야 하는 경우 적합하지 않습니다.

 동시성 관점에서 std::map 인터페이스의 가장 큰 문제는 반복자로, 다른 스레드가 컨테이너에 액세스 및 수정할 수 있는 경우에도 컨테이너에 대한 안전한 액세스를 제공하는 반복자가 있을 수 있지만, 이는 상당히 까다롭습니다. 반복자를 올바르게 처리하려면 반복자가 참조하는 요소를 삭제하는 다른 스레드와 같은 문제를 처리해야 해야 하는데 이는 오히려 더 말려들 수도 있습니다. 따라서 스레드 세이프 룩업 테이블 인터페이스에서 std::map의 인터페이스가 반복자에 상당히 의존하고 있지만 일단은 이를 배제한 채 설계하도록 하겠습니다. 따라서 다음과 같은 기능들이 기본 기능으로 존재하게 됩니다

  • key/value 쌍 추가
  • 주어진 key에 따른 value 변경
  • 주어진 key와 관련된 값 제거
  • 주어진 key와 관련된 값 획득

 스레드 안전성을 위한 가이드 라인을 고집한다면, 참조를 반환하지 않고 각 멤버 함수 전체에 하나의 뮤텍스로 잠그면 됩니다. 경쟁 조건의 가능성이 높은 곳은 새 키/값 쌍이 추가되는 경우입니다. 두 개의 스레드가 새 값을 추가하면, 첫 번째 스레드만 되고 두 번째 스레드는 실패할 것입니다. 한 가지 가능성은 add와 change를 단일 멤버 함수로 결합하는 것입니다. 인터페이스 관점에서 유일한 다른 흥미로운 점은 연관된 값을 얻어오는 부분으로, 한 가지 옵션은 키가 없는 경우 반환되는 "기본" 결과를 사용자가 제공할 수 있도록 하는 것입니다.

mapped_type get_value(key_type const& key, mapped_type default_value);

 

 default_value가 명시적으로 제공되지 않은 경우, mapped_type의 기본값으로 구성된 인스턴스를 사용할 수 있습니다. 또한 mapped_type 인스턴스 대신 std::pair<mapped_type,bool>을 반환하여 bool은 값의 존재 여부를 나타냅니다. 다른 옵션은 값을 참조하는 스마트 포인터를 반환하고 값이 NULL이면 값이 존재하지 않음을 나타냅니다.
 이미 언급했듯이, 일단 내재된 경쟁 조건이 없는 인터페이스가 정해지면, 기본 데이터 구조를 보호하기 위해 모든 멤버 함수 주위에 단일 뮤텍스와 간단한 잠금을 사용함으로써 스레드 안전을 보장할 수 있습니다. 하지만 이는 데이터 구조를 읽고 수정쓰는 동시성의 가능성을 낭비하게 됩니다. 따라서 한 가지 옵션은 std::shared_mutex를 통한 사용한 다수의 읽는 스레드와 단일 쓰는 스레드를 지원하는 것입니다. 이렇게 하면 동시 액세스 가능성이 향상되지만 한 번에 하나의 스레드만 데이터 구조를 수정할 수 있습니다. 하지만 이보다 나은 방법이 존재합니다.

 

[Designing a Map Data Structure For Fine-grained Locking]

 

섹션 6.2.3에서 설명한 대기열과 마찬가지로, 세분화된 잠금을 허용하려면 std::map<과 같은 기존 컨테이너를 포장하는 대신 데이터 구조의 세부 사항을 주의 깊게 살펴볼 필요가 있습니다. 룩업 테이블과 같은 연관 컨테이너를 구현하는 세 가지 일반적인 방법은 다음과 같습니다.

  • 레드 블랙 트리과 같은 이진 탐색 트리
  • 정렬된 배열
  • 해쉬 테이블

 이진 트리는 색인 및 접근을 모두 잠금이 필요한 루트 노드에서 시작해야 하기 때문에, 동시성에 별로 좋은 구조가 아닙니다. 정렬된 배열은 배열에서 지정된 데이터 값이 어디에 있는지 미리 알 수 없기 때문에 전체 배열에 대해 단일 잠금이 필요하여 더욱 별로입니다.

 따라서 해시 테이블이 남습니다. 고정된 수의 버킷을 가정할 때, 키가 속하는 버킷은 순전히 키와 해시 함수에 의존하기에, 버킷당 별도의 잠금을 안전하게 유지할 수 있습니다. 다수의 읽기 또는 단일 쓰기를 지원하는 뮤텍스를 다시 사용할 경우 동시성이 버킷 수에 비례하여 증가합니다. 단점은 키를 위해 좋은 해시 기능이 필요하다는 점입니다. C++ 표준 라이브러리는 이 용도로 사용할 수 있는 std::hash 템플릿을 제공하며, 이미 int와 std::string과 같은 타입에 맞게 특수화되어 있으며, 다른 키 유형에 대해서도 쉽게 특수화하여 만들 수 있습니다. 이를 통해 사용자는 std::hash를 특수화할지 아니면 별도의 해시 함수를 제공할지 선택할 수 있습니다. 따라서 스레드 안전한 룩업 테이블의 구현은 다음과 같이 될 수 있습니다.

 

Listing 6.11 A thread-safe lookup table

template<typename Key,typename Value,typename Hash=std::hash<Key> >
class threadsafe_lookup_table
{
private:
    class bucket_type
    {
    private:
        typedef std::pair<Key,Value> bucket_value;
        typedef std::list<bucket_value> bucket_data;
        typedef typename bucket_data::iterator bucket_iterator;

        bucket_data data;
        mutable std::shared_mutex mutex;

        bucket_iterator find_entry_for(Key const& key) const
        {
            return std::find_if(data.begin(),data.end(),
                [&](bucket_value const& item)
                {return item.first==key;});
        }
    public:
        Value value_for(Key const& key,Value const& default_value) const
        {
            std::shared_lock<std::shared_mutex> lock(mutex);
            bucket_iterator const found_entry=find_entry_for(key);
            return (found_entry==data.end())?
                default_value : found_entry->second;
        }

        void add_or_update_mapping(Key const& key,Value const& value)
        {
            std::unique_lock<std::shared_mutex> lock(mutex);
            bucket_iterator const found_entry=find_entry_for(key);
            if(found_entry==data.end())
            {
                data.push_back(bucket_value(key,value));
            }
            else
            {
                found_entry->second=value;
            }
        }
    
        void remove_mapping(Key const& key)
        {
            std::unique_lock<std::shared_mutex> lock(mutex);
            bucket_iterator const found_entry=find_entry_for(key);
            if(found_entry!=data.end())
            {
                data.erase(found_entry);
            }
        }
    };
    
    std::vector<std::unique_ptr<bucket_type> > buckets;
    Hash hasher;

    bucket_type& get_bucket(Key const& key) const
    {
        std::size_t const bucket_index=hasher(key)%buckets.size();
        return *buckets[bucket_index];
    }

public:
    typedef Key key_type;
    typedef Value mapped_type;
    typedef Hash hash_type;
    
    threadsafe_lookup_table(
        unsigned num_buckets=19, Hash const& hasher_=Hash()):
        buckets(num_buckets),hasher(hasher_)
    {
        for(unsigned i=0;i<num_buckets;++i)
        {
            buckets[i].reset(new bucket_type);
        }
    }

    threadsafe_lookup_table(threadsafe_lookup_table const& other)=delete;
    threadsafe_lookup_table& operator=(
        threadsafe_lookup_table const& other)=delete;
    
    Value value_for(Key const& key,
        Value const& default_value=Value()) const
    {
        return get_bucket(key).value_for(key,default_value);
    }
    
    void add_or_update_mapping(Key const& key,Value const& value)
    {
        get_bucket(key).add_or_update_mapping(key,value);
    }
    
    void remove_mapping(Key const& key)
    {
        get_bucket(key).remove_mapping(key);
    }
};

 이 구현에서는 버킷을 고정하는 데 std::vector<std::unique_ptr<bucket_type>>를 사용하여 생성자에서 버킷 수를 지정합니다. 각 버킷은 std::shared_mutex로 보호되어 버킷당 동시에 다수가 읽거나 한 명이 수정하는 것을 허용합니다.
 버킷의 수가 고정되어 있기 때문에 get_bucket() 함수는 잠금 없이 호출될 수 있으며 버킷 뮤텍스는 읽기만 하는 경우는 shared_lock을, 쓰는 경우는 unique_lock로 잠급니다. find_entry_for() 멤버 함수는 값이 버킷에 있는지 여부를 확인하며, 각 버킷은 list로 키/값 형태로 들어있기에 항목을 쉽게 추가하고 제거할 수 있습니다.
 예외적인 안전성에 대해 말하면, value_for는 아무것도 수정하지 않으므로 예외가 발생하더라도 데이터 구조에 영향을 주지 않습니다. remove_messages는 지울 호출을 사용하여 목록을 수정하지만, 이 호출을 삭제하지 않도록 보장되므로 안전합니다. add_or_update_mapping의 경우는, push_back은 예외적으로 안전하며, 예외를 던지더라도 목록이 그대로이므로 문제가 존재하지 않습니다. 문제는 기존 값을 대체하는 경우, 할당이 취소되면 원래 값을 변경하지 않고 그대로 두는 것으로 생각되는데 이는 전체적으로 데이터 구조에 영향을 미치지 않으며 전적으로 사용자가 제공한 유형의 속성이기 때문에 안전하게 이 작업을 사용자에게 맡겨야 합니다.
 이 섹션의 시작 부분에서 이러한 룩업 테이블의 한 가지 편리한 점은 현재 상태를 다른 곳으로 복사하는 기능의 존재입니다. 이렇게 하려면 일관된 상태의 복사본을 검색하기 위해 전체 컨테이너를 잠가야 하며, 이를 위해서는 모든 버킷을 잠가야 합니다. 룩업 테이블의 일반  작업에는 한 번에 하나의 버킷에 대한 잠금만 필요하므로 모든 버킷에 대한 잠금이 필요한 작업은 이 작업뿐입니다. 따라서 버킷 인덱스 증가 등과 같이 동일한 순서로 잠근다면 교착 상태가 발생하지 않습니다. 이 구현은 다음 목록에 나와 있습니다.

 

Listing 6.12 Obtaining contents of a threadsafe_lookup_table as std::map<>

std::map<Key,Value> threadsafe_lookup_table::get_map() const
{
    std::vector<std::unique_lock<std::shared_mutex> > locks;
    for(unsigned i=0;i<buckets.size();++i)
    {
        locks.push_back(
            std::unique_lock<std::shared_mutex>(buckets[i].mutex));
    }
    std::map<Key,Value> res;
    for(unsigned i=0;i<buckets.size();++i)
    {
        for(bucket_iterator it=buckets[i].data.begin();
            it!=buckets[i].data.end();
            ++it)
        {
            res.insert(*it);
        }
    }
    return res;
}

 각 버킷을 개별적으로 잠그고 std::shared_mutex를 사용하여 각 버킷에 대한 판독기 동시성을 허용함으로써 전체적으로 룩업 테이블의 동시성을 증가시켰습니다. 다음 섹션에서는 버킷의 동시성을 반복자를 포함한 스레드 세이프 list 컨테이너를 통해 보다 세밀한 잠금으로 동시성을 높이도록 하겠습니다.

 

6.3.2 Writing a thread-safe list using locks

list는 가장 기본적인 데이터 구조 중 하나로 스레드 안전하게 작성하는 것이 간단해야 합니다. 반복자를 제공받고 싶은데, STL 스타일의 반복자의 경우 내부 데이터에 대한 참조를 유지하고 있어야하며, 이는 다른 스레드에서 잠금을 걸며 수정한 경우에도 참조는 유효하게 남아있어야 합니다. 이러한 STL식 반복자는 컨테이너의 제어 밖에 있기에 별로 좋지 않습니다.
 대안으로는 for_each와 같이 반복하는 함수를 제공하는 것입니다. 이는 컨테이너가 반복과 잠금을 직접적으로 담당하게 되지만, 교착상태 회피 지침에는 반하게 됩니다. for_each가 유용한 작업을 수행하려면 내부 잠금을 유지하면서 사용자 제공 함수를 호출해야 하며 사용자 제공 함수가 이 항목에서 작동하기 위해 해당 참조를 전달해야 합니다. 복사본의 경우는 비용이 비쌀수도 있습니다.
 따라서 사용자가 교착 상태 및 데이터 레이스를 발생시키지 않는 코드를 짜지 않도록 맡겨야만 합니다. 이 경우, 룩업 테이블은 완벽하게 안전하게 됩니다. 이후 목록에는 다음과 같은 작업을 메서드를 가지도록 할 수 있습니다.

  • list에 항목 추가
  • 특정 조건을 만족하면 ist에서 항목 제거
  • 특정 조건을 만족하면 list에서 항목 찾기
  • 특정 조건을 만족하면 항목 업데이트
  • 다른 컨테이너에서 list로 항목 복사

 이를 좋은 범용 목록 컨테이너로 만들려면 위치 삽입과 같은 추가 작업을 필요하지만, 이는 현재 필요하지는 않습니다. 링크된 목록에 대해 잠금을 사용하는 기본 아이디어는 노드당 하나의 뮤텍스를 갖는 것이지만 이는 너무 많은 뮤텍스를 요구합니다. 만약 이렇게 만든다면 각 작업은 관심 있는 노드의 잠금만 유지하고 다음 노드로 이동할 때 각 노드의 잠금을 해제하는 식으로 구성되며 이는 아래와 같이 구성됩니다.

 

Listing 6.13 A thread-safe list with iteration support

template<typename T>
class threadsafe_list
{
    struct node
    {
        std::mutex m;
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;

        node():
            next()
        {}
        
        node(T const& value):
            data(std::make_shared<T>(value))
        {}
    };
    
    node head;

public:
    threadsafe_list()
    {}

    ~threadsafe_list()
    {
        remove_if([](T const&){return true;});
    }

    threadsafe_list(threadsafe_list const& other)=delete;
    threadsafe_list& operator=(threadsafe_list const& other)=delete;
    
    void push_front(T const& value)
    {
        std::unique_ptr<node> new_node(new node(value));
        std::lock_guard<std::mutex> lk(head.m);
        new_node->next=std::move(head.next);
        head.next=std::move(new_node);
    }

    template<typename Function>
    void for_each(Function f)
    {
        node* current=&head;
        std::unique_lock<std::mutex> lk(head.m);
        while(node* const next=current->next.get())
        {
            std::unique_lock<std::mutex> next_lk(next->m);
            lk.unlock();
            f(*next->data);
            current=next;
            lk=std::move(next_lk);
        }
    }

    template<typename Predicate>
    std::shared_ptr<T> find_first_if(Predicate p)
    {
        node* current=&head;
        std::unique_lock<std::mutex> lk(head.m);
        while(node* const next=current->next.get())
        {
            std::unique_lock<std::mutex> next_lk(next->m);
            lk.unlock();
            if(p(*next->data))
            {
                return next->data;
            }
            current=next;
            lk=std::move(next_lk);
        }
        return std::shared_ptr<T>();
    }

    template<typename Predicate>
    void remove_if(Predicate p)
    {
        node* current=&head;
        std::unique_lock<std::mutex> lk(head.m);
        while(node* const next=current->next.get())
        {
            std::unique_lock<std::mutex> next_lk(next->m);
            if(p(*next->data))
            {
                std::unique_ptr<node> old_next=std::move(current->next);
                current->next=std::move(next->next);
                next_lk.unlock();
            }
            else
            {
                lk.unlock();
                current=next;
                lk=std::move(next_lk);
            }
        }
    }
};
  •  threadsafe_list는 단일 연결 list이며, 여기서 각 항목은 노드로 구성됩니다. 기본 생성된 노드는 목록의 맨 앞에 사용됩니다. 새로운 노드는 push_front 함수를 통해 추가되며, 먼저 새로운 노드는 힙에 저장된 데이터를 할당하고 다음 포인터는 NULL로 남겨둡니다. list의 head에 노드를 삽입하려면 헤드 노드에 대한 mutex의 잠금을 획득하고 head.next가 새 노드를 가리키도록 설정합니다. 목록에 새 항목을 추가하려면 뮤텍스를 하나만 잠그면 되므로 교착 상태에 빠질 위험이 없습니다. 또한 느린 메모리 할당은 잠금 외부에서 발생하므로 잠금은 실패할 수 없는 몇 가지 포인터 값의 업데이트만 보호합니다.
  • 먼저 for_each()를 살펴보면, 각 요소에 적용할 함수를 필요로 하는데, 이 함수는 값에 사용되며 호출할 수 있는 경우 작동합니다. 이번 for_each에서 함수는 T형 값을 단독 인자로 가지고 있어야합니다. 작업은 먼저 head에 대한 잠금으로 시작하여 현재 노드에 대한 잠금을 지닌 채로 시작합니다. 현재 노드의 다음 노드가 존재한다면, 다음 노드를 잠그고 현재 노드는 잠금 해제합니다. 그 이후에 지정된 함수를 호출하고 다음으로 진행시키도록 변수와 잠금을 적절히 이동시킵니다. 작업하는 동안, 노드에 대한 잠금이 유지되기 때문에 기능이 제대로 작동하면 완전히 안전하게 됩니다.
  • find_first_if()은 for_each()와 유사하지만 조건에 일치하는 경우(Predicate가 true인 경우)는 찾은 값을 리턴하고, 그렇지 않은 경우는 null을 반환 합니다.
  • remove_if도 비슷하지만 다른 부분으로는 list에 대한 변경을 수행합니다. 만약 조건에 해당하는 값을 찾으면, 찾은 노드의 이전과 다음을 연결시켜주는 작업을 수행하고, 값을 제거합니다.
  • 다음으로, 안전성을 고려하여, 데드락과 경쟁 상태가 있는 지를 확인해봅시다. 제공된 조건과 함수를 잘 사용한다면 문제가 없습니다. head에서만 출발하고 한 방향으로만 반복이 일어나며 현재 뮤텍스를 해제하기 전에 항상 다음 뮤텍스를 잠그기 때문에 서로 다른 스레드에서 다른 잠금 순서가 발생할 가능성이 없습니다. 만약 mutex를 잠금 해제한 후에 삭제하기에 존재한다면 remove_if 부분이 의심스러울 수는 있지만, 이전 노드에서 뮤텍스를 계속 유지하므로 삭제 중인 노드의 잠금을 새 스레드가 획득할 수 없기에 실제로는 안전합니다.
  •  이번 구현의 핵심은 단일 뮤텍스에 대한 동시성 가능성을 개선하는 것이었습니다. 서로 다른 스레드가 어떤 작업을 수행하든 list의 서로 다른 노드에서 동시에 작동할 수 있어 동시성은 크게 개선되었으나 아쉬운 부분은 각 노드에 대한 뮤텍스는 차례로 잠겨 있어야 하므로 스레드가 서로 통과할 수 없습니다. 한 스레드가 특정 노드를 처리하는 데 오랜 시간을 소비하는 경우, 다른 스레드는 해당 노드에 도달할 때까지 기다려야만 합니다.