C++공부/Concurrency in Action

2. Managing threads

아헿헿헿 2022. 5. 13. 04:00

C++ 표준 라이브러리에서 제공하는 std::thread 객체를 통해 thread 관리를 쉽게 할 수 있도록 한다. 이를 통해 간단하지 않은 작업들을 기본적인 단위에서부터 쌓아올릴 수 있다. 이 장에서는 기본적인 thread의 시작, 종료 대기, 그리고 실행을 배운다. 이를 심화시켜 몇몇 parameter를 넘기거나 std::thread 객체를 다른 곳으로 소유권을 어떻게 넘기는 지에 대한 알아볼 것이다.

2.1. Basic thread management

2.1.1. Launching a thread 

parameter를 가지지 않는 void 반환 형태의 함수와 같이 간단한 경우든 파라미터를 받으며 종료 조건도 복잡한 함수든, 한 thread에서 이를 실행시키면 함수에서 return할 때까지 함수를 실행시키고 이후 thread가 종료될 것이다. 이에 관계없이 모든 thread들은 std::thread 객체를 생성하는 것으로부터 시작된다.

// function
void do_some_work();
std::thread my_thread(do_some_work);

// function object
class background_task
{
public:
	void operator() const
    {
    	do_something();
        do_something_else();
    }
};
background_task f;
std::thread my_thread(f);

 do_some_work와 같은 함수 혹은 background_task와 같은 함수 객체처럼 callable type를 std::thread 생성자의 인수로 전달할 수 있다. 이를 통해, 전달 받은 callable type 인수는 새로운 thread의 공간에 복사되어 저장되며 시행되게 된다. 따라서 원본과 동등하게 복사되어야만 한다. 그리고 이때 주의해야하는 것으로 thread 생성자에 함수 객체를 넘겨줄 때 임시 객체를 넘겨준다면, 이는 함수 선언과 같은 문맥이 되어, 객체 정의보다는 background_task 객체를 반환하는 함수를 인자로 가지고 std::thread를 반환하는 함수 선언으로 받아들인다. 따라서, 여분의 소괄호를 사용하거나, 중괄호로 초기화를 해주어야한다. 혹은 람다 함수를 활용하여 사용할 수도 있다.

std::thread my_thread(background_task()); // syntax can be interpreted as a function declaration
std::thread my_thread((background_task())); // using extra set of parentheses
std::thread my_thread{background_task()}; // using a set of braces
std::thread my_hthread([]{
	do_something();
    do_something_else();
});

위와 같이 한번 thread를 실행시킨 뒤, 우리는 join으로 thread가 끝날 때까지 기다릴지 혹은 detach로 각자 수행되도록 할지를 정해야한다. 이를 std::thread 객체가 파괴되기 전에 지정하지 못한다면, std::thread 소멸자가 std::terminate 함수를 호출하여 시행되고 있는 thread 프로그램이 강제 종료 되어버린다. 따라서 join과 detach의 선택은 필수적이다.

 만약 thread의 종료를 기다리지 않는다면, 해당 thread가 접근하는 데이터의 유효성이 보장되어야만 하고, 그렇지 않다면 미정의 행동을 할 것이다. 다음의 코드를 보자.

 

Listing 2.1. A function that returns while a thread still has access to local variables

struct func
{
    int& i;
    func(int& i_):i(i_){}
    void operator()()
    {
        for(unsigned j=0;j<1000000;++j)
        {
            do_something(i); // Potential access to dangling reference
        }
    }
};


void oops()
{
    int some_local_state=0;
    func my_func(some_local_state);
    std::thread my_thread(my_func);
    my_thread.detach(); // Don't wait and my_thread might still be running
}

이 경우, my_thread는 some_local_state를 참조하지만, oops의 함수가 my_thread보다 먼저 끝나 some_local_state의 메모리가 먼저 해제되어 my_thread에서 이를 접근할 수도 있기에 위험한 코드가 된다.이렇게 멀티스레딩에서 포인터나 참조를 사용하는 코드는 고심해서 사용해야한다. 이를 해결하기 위해서는 self-contained(함수 범위 안에서 전부 해결한다는 의미)하거나 복사를 이용한 코드를 짜거나, join으로 대체하는 것이 좋다.

2.1.2. Waiting for a thread to complete

std::thread 객체에 join 함수를 호출하여 thread가 종료될 때까지 기다릴 수 있다. Listing 2.1에서는 detach로 기다리지 않았지만, Join을 사용한다면 my_thread가 종료될 때까지 기다리기 떄문에 위에서 발생한 문제점이 생기지 않을 것이다. 하지만 이 경우는 my_thread가 수행되는 동안 oops()에서는 이의 종료를 기다리기에 능률이 떨어지게 된다.

 join 함수의 호출은 thread와 관련된 모든 저장 공간을 비우게 된다. 따라서 한 thread 객체에 있어 join은 한번만 부를 수 있으며 joinable 함수를 통해 이를 확인할 수 있다. 

2.1.3. Waiting in exceptional circumstances

detach의 경우는 thread 객체가 생성되자마자 바로 선언하면 되지만, join의 경우는 선언하는 위치에 크게 영향을 받게 된다. 특히 Join 호출 전에 예외를 던지는 경우에 대해서 고민을 할 필요가 있다. 다음의 코드를 보자.

 

Listing 2.2. Waiting for a thread to finish

struct func{};

void do_something_in_current_thread()
{}

void f()
{
    int some_local_state=0;
    func my_func(some_local_state);
    std::thread t(my_func);
    try
    {
        do_something_in_current_thread();
    }
    catch(...)
    {
        t.join();
        throw;
    }
    t.join();
}

try catch 문을 통해 f 함수에서 thread 생성 후 할 일을 하다가 exception을 발생하던 하지 않던 간에 적절한 조치를 취하고 있다. 하지만 이와 같은 try catch 문은 코드를 복잡하게 하며 조금이라도 실수한다면 큰 문제를 불러 일으킬 수 있다. 따라서 우리는 다음과 같이 RAII 형식으로 안전하게 처리하는 방법을 소개하고자 한다.

 

Listing 2.3. Using RAII to wait for a thread to complete

class thread_guard
{
    std::thread& t;
public:
    explicit thread_guard(std::thread& t_): // 생성시 thread를 보관
        t(t_)
    {}
    ~thread_guard() // 소멸 시 joinable하다면 join을 호출 후 소멸.
    {
        if(t.joinable())
        {
            t.join();
        }
    }
    thread_guard(thread_guard const&)=delete;
    thread_guard& operator=(thread_guard const&)=delete;
};

void f()
{
    int some_local_state;
    func my_func(some_local_state);
    std::thread t(my_func);
    thread_guard g(t);
    do_something_in_current_thread();
}

함수 f에서 예외가 발생하여 탈출한다고 하더라도, 지역 변수인 thread_guard가 join 가능 여부 판단 후 호출하는 식으로 만들어져 있으며, 중복된 thread 처리가 필요 없기 떄문에 복사 생성자와 복사 할당자가 삭제되어있다.

2.1.4. Running threads in the background

 std::thread 객체에 detach 함수를 호출하는 것으로 background에서 작업을 수행할 수 있도록 만든다. 하지만 이는 thread의 소유권을 가져가거나 thread가 종료하는 것을 기다릴 수 없게 된다. 이때 detach된 thread를 daemon thread라 부르며, 이들의 소유권 및 제어권은 thread 종료 시에 thread의 자원을 적절하게 반환하게 하는 C++ Runtime Library가 가지게 된다. daemon thread들은 보통 오래 살아남아 지속적으로 background에서 작업을 수행하게 된다.

 또한 detach를 할 때 주의해야할 점으로는 detach된 thread들은 joinable하지 않지만, joinable하지 않은 thread들도 detach 불가능하다. 즉 쌍방향으로 작용하는 셈이다.

 여러개의 문서를 동시에 편집하는 워드 프로세서를 생각해보자. 여러 개의 문서를 동시에 작업하기 위해서는 다양한 방법이 존재할 것인데, 일반적으로 생각할 수 있는 방법은 하나의 작업마다 독립적인 창을 가지며, 독립적인 자료 및 수행을 가지도록 하는 방법이다. 그리고 이러한 하나의 작업을 하나의 Thread에 대응시키는 것으로 구현이 가능하다. 따라서 이를 다음과 같이 구현하였다.

Listing 2.4. Detaching a thread to handle other documents

void edit_document(std::string const& filename)
{
    open_document_and_display_gui(filename);
    while(!done_editing())
    {
        user_command cmd=get_user_input();
        if(cmd.type==open_new_document)
        {
            std::string const new_name=get_filename_from_user();
            std::thread t(edit_document,new_name);
            t.detach();
        }
        else
        {
            process_user_input(cmd);
        }
    }
}

 

2.2. Passing arguments to a thread function

위의 Listing 2.4에서 우리가 여지껏 봤던 것과 다른 점은 인자값을 하나가 아닌 2개를 보냈다는 점이다. 인자를 여러 개 보낸다면, thread에서 실행시키고자 하는 함수의 인자로서 복사되어 들어가게 된다. 그 예시는 다음과 같다.

void f(int i, std::string const& s);

void oops(int some_param)
{
    char buffer[1024];
    sprintf(buffer, "%i", some_param);
    std::thread t0(f, 3, "hello"); // Might make dangling pointer
    std::thread t1(f, 3, buffer); // Might make dangling pointer
    std::thread t2(f, 3, std::string(buffer)) // Using std::string avoids dangling pointer
    t0.detach();
    t1.detach();
    t2.detach();
}

다음과 같이 인자 2개를 받는 함수에 대해서 thread 생성 시 인자를 3개를 보내주어, 첫번째는 함수로, 2, 3번째는 함수의 인자값이 된다. 이 떄 t0 thread의 경우는 리터럴 문자를 그대로 넣어주었고, t1의 경우는 buffer를 넣어주었는데, t1의 경우는 buffer 자체가 oops 함수 스택의 변수로 저장되어 있고 모든 thread에 있어 detach를 수행하였기 때문에 t1이 background에서 돌아가는 와중에 buffer의 메모리가 해제될 수도 있어 t1으로 들어간 포인터가 dangling pointer가 되어버릴 수도 있다. 이를 방지하기 위해서 t2에서는 std::string으로 변환 후에 이를 넣어주게 된다면 문제를 미리 방지할 수 있다. 이와 반대의 경우도 상정할 수도 있다.

void update_data_for_widget(widget_id w, widget_data& data);
void oops_again(widget_id w)
{
    widget_data data;
    std::thread t1(update_data_for_widget, w, data); // non_const reference can't take rvalue
    std::thread t2(update_data_for_widget, w, std::ref(data)); // passing reference
    display_status();
    t1.join();
    t2.join();
    process_widget_data(data);
}

 이번에는 oops_again의 함수 스택에서 thread를 생성하는데, 이떄의 함수의 인자로 참조값을 받는 것에 주의하자. 이전에 말했듯이 thread로 넘어가는 인자들은 복사를 수행하여 넘기게 되고, 이때 값들은 ravlue로 처리되게 된다. 그러나 non-const 참조값들은 rvalue를 이용한 초기화가 불가능하기 떄문에 컴파일에 실패하게 된다. 이를 해결할 수 있는 방법 중 첫번째는 왼쪽 참조를 쓰지 않는 방법이고, 다른 방법으로는 std::ref로 감싸서 참조 상태를 보존하여 보내는 방법이 존재한다.

 이와 같은 복사는 std::bind와 유사하게 돌아가게 되는데, 기저에 가지고 있는 알고리즘이 같기 때문이다. 이는 멤버 함수 포인터를 함수로, 인자의 첫번째 값으로 해당 객체의 포인터를 보내게 되면 이도 잘 수행된다.

 또다른 재밌는 예시로는 복사는 되지 않고 이동만 가능한 객체에 대한 것이다. 예시로 std::unique_ptr이 존재하는데, 이러한 객체는 원본을 텅비게 만들고 새로운 객체에 기존의 값들을 넣게 된다. 이들을 옮기기 위해서는 std::move 함수를 활용해야하는데 std::thread도 이러한 객체에 해당된다.

2.3. Transferring ownership of a thread

thread를 생성하고 이를 백그라운드에서 돌아가도록 만든 뒤 이를 종료되길 기다리기보다는 소유권을 호출한 함수에 넘기고 싶다고 가정하자. 그렇다면 소유권을 다른 장소로 옮겨야만 할 것이다. 이를 위해 std::thread의 이동이 지원된다. std::thread는 복사 불가능하지만 이동은 가능하도록 구성되어 있다.

void some_function();
void some_other_function();
std::thread t1(some_function); // new thread is started
std::thread t2 = std::move(t1); // t1(now do nothing) is transferred to t2(running some_function)
std::thread t3 // default constructor
t1 = std::thread(some_other_function); // temporary std::thread object moves to t1
t3 = std::move(t2); // t2(now do nothing) is transferred to t3(running some_function)
t1 = std::move(t3); // terminate some_other_function and take some_function

위에서 유의할 점은 크게 2가지가 있다. 먼저 임시 객체인 경우는 대입 연산자를 통해 명시적인 std::move 없이도 rvalue로서 대입가능하지만, 객체의 이동의 경우는 std::move를 통해 직접적으로 수행해줘야만 한다. 다른 하나는 마지막 문장에서 나오는 것인데, 다른 함수를 수행하고 있는 thread에 이동 명령으로 할당하려는 경우, std::terminate가 호출되어 기존의 프로그램이 종료시키게 된다. 따라서 detach나 join을 이전에 해주지 않았다면 문제를 발생시킬 수 있다. 이러한 것은 std::thread를 함수 리턴 값으로 가질 때도 적용된다. 

 

Listing 2.5. Retruning a std::thread from a function

std::thread f()
{
    void some_function();
    return std::thread(some_function);
}
std::thread g()
{
    void some_other_function(int);
    std::thread t(some_other_function,42);
    return t;
}

비슷하게 함수 안으로 집어넣을 때는 std::move를 활용하면 된다.

void f(std::thread t);
void g()
{
    void some_function();
    f(std::thread(some_function));
    std::thread t(some_function);
    f(std::move(t));
}

std::thread 이동 연산은 위에서 언급한 thread_guard 클래스를 발전시키는 데에 큰 도움이 된다. 이는 thread_guard 객체가 참조하고 있는 thread보다 오래 살거나 더이상 joinable 하지 않은 경우를 방지하고자 한다. 이를 scoped_thread 클래스의 구축으로 해결하고자한다.

 

Listing 2.6. scoped_thread and example usage

class scoped_thread
{
    std::thread t;
public:
    explicit scoped_thread(std::thread t_):
        t(std::move(t_))
    {
        if(!t.joinable())
            throw std::logic_error("No thread");
    }
    ~scoped_thread()
    {
        t.join();
    }
    scoped_thread(scoped_thread const&)=delete;
    scoped_thread& operator=(scoped_thread const&)=delete;
};
struct func;
void f()
{
    int some_local_state;
    scoped_thread t(std::thread(func(some_local_state)));
    do_something_in_current_thread();
}

이는 thread_guard와 유사하지만 생성자에서 thread를 참조하는 것이 아닌 값 자체를 private 변수로서 가지고 있으며, 생성자에서 rvalue로 값을 전달받아 보관하게 된다. 위의 코드에서 f 함수가 종료될 때 scoped_thread 소멸자가 호출되고 이때 join을 통해서 thread의 종료를 기다리게 된다. C++17 코드에서는 좀 더 구조를 바꿀 수 있다. joining_thread는 std::thread와 비슷한 역할을 할 수 있도록 인자를 받는 것들이 구현되었고, 다른 메소드들도 구현되었다. 다른 점으로는 자동으로 위와 마찬가지로 소멸자 호출시 자동으로 join을 하도록 만든 것이다.

 

Listing 2.7. A joining_thread class

class joining_thread
{
	std::thread t;
public:
	joining_thread() noexcept=default;
    template<typename Callable, typename... Args>
    explicit joining_thread(Callable&& func, Args&&... args) :
    	t(std::forward<Callable>(func), std::forward<Args>(args)...) {};
    explicit joining_thread(std::thread t_) noexcept : t(std::move(t_)) {};
    explicit joining_thread(joining_thread&& other) noexcept : t(std::move(other.t)) {};
    joining_thread& oeprator=(joining_thread&& other) noexcept
    {
    	if (joinable())
        	join();
        t = std::move(other.t);
        return *this;
    }
    joining_thread& oeprator=(std::thread other) noexcept
    {
    	if (joinable())
        	join();
        t = std::move(other);
        return *this;
    }
    ~joining_thread() noexcept
    {
    	if(joinable())
        	join();
    }
    void swap(joining_thread& other) noexcept
    {
    	t.swap(other.t);
    }
    std::thread::id get_id() const noexcept {
    	return t.get_id();
    }
    bool joinable() const noexcept
    {
    	return t.joinable();
    }
    void join()
    {
    	t.join();
    }
    std::thread& as_thread() noexcept
    {
   		return t;
    }
    const std::thread& as_thread() const noexcept
    {
    	return t;
    }
};

이동 지원은 std::thread 객체들이 컨테이너에서도 활용될 수 있도록 도와준다. 다음과 같이 vector에 원하는 개수만큼의 thread를 생성하고 이들이 작업이 끝날 때까지 기다리도록 구성할 수 있다. 이를 통해 고정된 숫자보다 runtime에 유동적으로 만들어질 수 있도록 구성할 수 있다.

 

Listing 2.8. Spawns some threads and waits for them to finish

void do_work(unsigned id);

void f()
{
    std::vector<std::thread> threads;
    for(unsigned i=0;i<20;++i)
    {
        threads.push_back(std::thread(do_work,i));
    }
    std::for_each(threads.begin(),threads.end(),
    std::mem_fn(&std::thread::join));
}

 

2.4. Choosing the number of threads at runtime

 C++ Standard Library는 std::thread::hardware_concurrency()를 통해 현재 프로그램에서 동시 실행할 수 있는 thread의 수를 얻을 수 있다. 멀티코어 시스템에서는 CPU 코어의 값이 리턴될 것이다.

 LIsting 2.9에서는 std::accumulate의 병렬 버전을 구현하였다. 먼저 오버헤드를 피하기 위해서 사용가능한 thread의 개수만큼만 할당하였으며, 어떤 것도 예외를 던지지 않는다는 가정하에 구현하였다. 예외 처리는 Chapter 8에서 설명될 것이다. 

 

Listing 2.9. A naive parallel version of std::accumulate

template<typename Iterator,typename T>
struct accumulate_block
{
    void operator()(Iterator first,Iterator last,T& result)
    {
        result=std::accumulate(first,last,result);
    }
};

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
    unsigned long const length=std::distance(first,last);

    if(!length)
        return init;

    unsigned long const min_per_thread=25;
    unsigned long const max_threads=
        (length+min_per_thread-1)/min_per_thread;

    unsigned long const hardware_threads=
        std::thread::hardware_concurrency();

    unsigned long const num_threads=
        std::min(hardware_threads!=0?hardware_threads:2,max_threads);

    unsigned long const block_size=length/num_threads;

    std::vector<T> results(num_threads);
    std::vector<std::thread>  threads(num_threads-1);

    Iterator block_start=first;
    for(unsigned long i=0;i<(num_threads-1);++i)
    {
        Iterator block_end=block_start;
        std::advance(block_end,block_size);
        threads[i]=std::thread(
            accumulate_block<Iterator,T>(),
            block_start,block_end,std::ref(results[i]));
        block_start=block_end;
    }
    accumulate_block<Iterator,T>()(block_start,last,results[num_threads-1]);
    
    std::for_each(threads.begin(),threads.end(),
        std::mem_fn(&std::thread::join));

    return std::accumulate(results.begin(),results.end(),init);
}

위의 코드는 잘 읽어보면 해석될 것이다. 다만 유의할 점이 몇몇개 존재하는데, 먼저 type T애 따라 더하기 연산자가 연대 되지 않을 수도 있고, iterator 또한 forward iterator이어야만 하며, T 또한 results 벡터 생성을 위해 생성가능해야만 한다.

2.5. Identiftying threads

thread 식별자는 std::thread::id 형태로 주어지며, 두가지 방법으로 찾을 수 있다. 첫째는 std::thread 객체 메소드 get_id의 호출로 얻는 방법이다.만약 현재 실행 중이지 않은 thread에 대해 get_id를 호출한다면, 기본 생성된 std::thread::id를 리턴하며 이는 그 어떤 thread도 아니라는 것을 명시한다. 대신, 현재 thread의 식별자는 std::this_thread::get_id()를 호출하는 것으로 획득할 수 있다.

 std::thread::id는 복사 비교가 자유롭다. 같은 id를 가졌다는 것은 같은 thread이거나 둘다 어떤 thread도 아님을 뜻한다. C++ Standard Library는 thread 식별자들이 같은지 아닌지를 비교하는데에 제약을 두지 않는다. 이는 컨테이너 안에서 비교 연산할 떄에도 사용될 수 있으며, std::hash<std::thread::id>를 제공하여 std::thread::id가 키 값으로도 찾을 수 있게 할 수 있다.

 std::thread::id는 thread가 특정 작업 여부를 확인할 때 사용될 수 있다. 만약 thread들이 Lisitng 2.9처럼 나눠서 작업을 수행할 때 몇몇은 중간에 특정한 다른 알고리즘을 수행해야 할 수도 있다. 이 경우에는 std::this_thread__get_id()의 값을 저장하고 수행하는 도중에 필요한 thread ID와 각자의 thread ID를 비교하는 연산을 수행하도록 만든다.

std::thread::id master_thread;
void some_core_part_of_algorithm()
{
    if(std::this_thread::get_id() == master_thread)
    	do_master_thread_work();
    do_common_work();
}

 비슷하게, thread ID들은 연결 컨테이너에서 특정한 데이터가 thread에서 필요할 때 연결한 경우에도 적절하게 사용될 수 있다.