C++공부/Concurrency in Action

4. Synchronizing concurrent operations

아헿헿헿 2022. 7. 15. 15:12

 이번 장에는 스레드들 간의 공유되는 데이터를 다양한 방법으로 보호하는 방법과, 이로는 부족한 경우 각각의 스레드에서의 행동을 동기화시키는 방법까지 알아보겠습니다.

 일반적으로 한 스레드가 특정 이벤트가 발생하거나 조건을 만족하기를 기다라는 것은 흔합니다. 플래그 및 데이터 확인으로도 가능하지만 이는 이상적이지 않습니다. 이와 같은 스레드간의 연산을 동기화할 필요성은 매우 흔한 경우로서 C++ 표준 라이브러리는 이를 다루는 기능을 제공하는데, 이것이 바로 조건 변수(condition variable)와 퓨쳐(future)입니다.

 

4.1 Waiting for an event or other condition

 한 스레드가 다른 스레드의 작업을 기다리는 것을 기다리는 방법에는 여러가지가 존재합니다. 먼저 mutex를 활용한 flag를 사용하는 것인데, 이는 지속적으로 flag를 확인해야고 mutex에 의해 막힐 수 있기 때문에 비효율적입니다. usleep을 사용할 수도 있지만,  이는 적절한 시간을 선택하기가 어렵고, 적절한 시간을 선택하지 못한다면 과도한 자원 낭비 혹은 반응 속도 저하를 야기합니다.

 가장 선호되는 방법은 C++ 표준 라이브러리의 조건 변수(condition variable)를 활용하는 방법입니다. 개념적으로, 조건변수는 어떤 이벤트 혹은 다른 조건과 연관되어 있고, 하나 이상의 스레드가 그 조건이 만족되기를 기다릴 수 있으며, 어떤 스레드가 그 조건이 만족되었음을 결정하면, 그 조건변수를 기다리는 하나 이상의 스레드에게 통보하여 그들을 깨우고 계속 작업을 진행하도록 합니다.

 

4.1.1 Waiting for a condition with condition variables

C++ 표준 라이브러리는 2개의 조건변수, std::condition_variable과 std::condition_variable_any을 제공합니다. 양쪽 모두 적절한 동기화를 제공하기 위해 뮤텍스가 필요한데, 전자는 std::mutex와만 함께 동작하는 반면, 후자는 최소한의 조건을 충족하여 mutex와 유사한 것으로 취급될 수 있는 것들과 함께 동작할 수 있어 접미사 _any가 붙었습니다. std::condition_variable_any가 좀 더 일반적이기에 잠재적인 추가 비용이 소요됩니다. 따라서 추가적인 유연성이 필요한 경우가 아니라면 std::condition_variable을 사용해야 합니다.

 

Lisitng 4.1 Waiting for data to prcoess with std::condition_variable

std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;

void data_preparation_thread()
{
  while (more_data_to_prepare()) {
    data_chunk const data = prepare_data();
    {
      std::lock_guard<std::mutex> lk(mut);
      data_queue.push(data);
    }
    data_cond.notify_one();
  }
}

void data_processing_thread()
{
  while (true) {
    std::unique_lock<std::mutex> lk(mut);
      data_cond.wait(lk, []{ return !data_queue.empty(); });
    data_chunk data = data_queue.front();
    data_queue.pop();
    lk.unlock();
    process(data);
    if (is_last_chunk(data)) break;
  }
}

 먼저 queue는 두 스레드 사이에 데이터를 넘겨주기 위해서 사용되며, 데이터가 준비된다면, 잠금을 걸고 데이터를 집어넣게 됩니다. 그리고 나서, condition_variable의 notify_one 멤버 메서드를 통해 condition_variable의 wait함수를 사용하여 기다리고 있는 스레드(수면 상태에 들어갑니다)에 이를 공지하여, 작업을 수행하도록 합니다. wait 함수로 대기 상태에 들어갈 때는, 잠금을 해제하고 blocked 상태로 들어가게 됩니다. 이 때, 스레드는 수면 상태에서 깨어나면서, mutex에 대한 잠금을 재획득하고, 주어진 조건을 확인하는데, 만약 이 때 조건을 만족하면 잠금을 획득한 채로 이후의 작업을 수행하게 되고, 아니라면 잠금을 풀고 다시 기다리게 됩니다. 기다리는 스레드는 잠금을 획득하거나 해제해야 하기에, std::lock_guard 대신에, std::unique_lock을 사용해야만 합니다. ​기다리는 스레드가 mutex를 재획득하고 조건을 검사할 때, 만약 다른 스레드로부터의 통보로 인한 것이 아닌 경우를 spurious wake라 부릅니다.

4.1.2 Building a thread-safe queue with condition variables

만약 통상적인 queue를 짜는 경우는 어떤 일을 해야하는 지에 대충 알 수 있지만, thread-safe하지 않습니다. 동시에 데이터를 읽거나 변경하는 경우에는 내재된 경쟁 상태에 돌입하게 되고, 이를 방지하기 위해서는 listing 4.1에서 본 것처럼 변경하는 함수들이 작동하는 동안은 이를 읽어오는 함수들은 wait을 통해 기다리도록 구성을 해야 합니다.

 이때 pop의 구현을 값이 존재할 때까지 기다렸다가 pop하는 wait_and_pop과 바로 시도하는 try_pop으로 나누어 구현하는 것이 좋습니다. 이 경우 바로 pop을 수행하는 try_pop에 대해서도, pop하는 값의 리턴받을 인자를 받는 try_pop(T& value) 형태는 수행에 성공했는지에 해당하는 bool을 리턴하게 하고, 아무것도 받지 않는 try_pop()은 pop한 해당 값을 리턴하게 하며, 실패한 경우는 nullptr이 들어가도록 합니다. 바로 수행되는 것들과는 반대로 wait_and_pop은 condition_variable을 사용하여 조건에 만족할 때까지 기다리도록 합니다. 그리고 이 condition_variable은 push함수에서 notify_one을 통해 기다리는 상태에 대해서 처리를 해줍니다. 여기서 mutex는 계속 값이 바뀌어야하므로 mutable을 이용해 const 객체이거나 const 함수에 대해서도 작동하도록 합니다. 이에 대한 실제 구현은 다음과 같습니다.

 

Lisitng 4.5 Full class definition of a thread-safe queue using condition variables

#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>

template<typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue()
    {}
    threadsafe_queue(threadsafe_queue const& other)
    {
        std::lock_guard<std::mutex> lk(other.mut);
        data_queue=other.data_queue;
    }

    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(new_value);
        data_cond.notify_one();
    }

    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_queue.empty();});
        value=data_queue.front();
        data_queue.pop();
    }

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_queue.empty();});
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }

    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty)
            return false;
        value=data_queue.front();
        data_queue.pop();
        return true;
    }

    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};

 만약 기다리는 스레드가 한번만 기다릴 것이고, 조건이 true가 되고나면 다시 조건을 기다리지 않아도 된다면, 조건 변수는 최고의 선택은 아닐 것입니다. 만약 일부 데이터의 이용 여부를 기다리고 있다면 특히 더욱 그렇습니다. 이러한 상황에서는 future 객체가 더 나은 방향일 것입니다.

 

4.2 Waiting for one-off events with futures

 C++ 표준 라이브러리는 일회성 이벤트를 future를 통해 구현합니다. 만약 하나의 스레드가 특정 일회성 이벤트를 기다릴 필요가 있다면, 그 스레드는 이 이벤트를 대표하는 future를 획득합니다. 이때 그 스레드는 주기적으로 짧은 시간동안 해당 이벤트가 발생했는지를 확인하면서 사이 시간마다 다른 작업을 수행할 수도 있고, 해당 이벤트가 발생할 때까지 다른 작업을 하고나서 future가 ready 상태가 되기를 기다릴 수 있습니다. future는 자신과 연관된 데이터를 가질수도 있고, 가지지 않을 수도 있는데, 해당 이벤트가 한번 발생하면, 퓨쳐는 다시 초기화될 수 없습니다.

 C++ 표준 라이브러리에는 unique future(std::future<>)와 shared future(std::shared_future<>)로 2종류의 future가 존재합니다. std::future 인스턴스는 하나로 연관된 이벤트를 가리키는 유일한 인스턴스인 반면, std::shared_future의 여러 인스턴스들은 공유하는 하나의 이벤트를 가리킵니다. 만약 준비 상태가 된다면 다수의 shared_future 객체가 정보에 한꺼번에 접근하려 들 것입니다.

 std::future<void> 및 std::shared_future<void> 템플릿은 공유 데이터가 없기에 특수화가 필요하며, future는 스레드들간의 통신을 위해 사용되지만, 동시적 접근은 제공하지 않습니다. 따라서 mutex나 다른 방법을 필요로 합니다.

 

4.2.1 Returning values from background tasks

 바로 결과가 필요없는 비동기 작업을 시작시키기 위해서 std::async()를 사용할 수 있습니다. 이 함수는 std::future 객체를 리턴하는데, 이 객체는 궁극적으로는 함수의 리턴값을 가지게 됩니다. 만약 리턴값이 필요하게 된다면, future에서 get()을 호출하기만 하면 얻을 수 있습니다. 그러면 스레드는 future가 ready 상태가 될 때까지 기다렸다가 ready 상태가 되면 리턴값을 돌려받게 됩니다.

 

Lisitng 4.6 Using std::future to get the return value of an asynchronous task

 int find_the_answer_to_ltuae();
 void do_other_stuff();
 int main()
 {
     std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
     do_other_stuff();
     std::cout << "The answer is " << the_answer.get() << std::endl;
 }

 std::async은 std::thread와 유사하게 추가적인 인자를 받아 전달할 수 있습니다. 첫번째 인자는 함수의 포인터, 뒤에 오는 인자들은 함수의 인자로 사용이 됩니다. 이에 대한 규칙은 thread와 동일합니다.(복제 되어서 넘어가거나, ref에 대한 처리 혹은 멤버 함수의 경우...)

 기본적으로, std::async()가 새로운 스레드를 시작시킬지 아니면 future가 기다릴 때 동기적으로 작업을 할지는 구현 세부 사항입니다. 대부분의 경우, std::async()에 추가 파라미터를 사용하여 선택을 할 수 있습니다. 파라미터는 std::launch 타입으로서, future의 wait() 혹은 get()이 호출될 때까지 함수 호출을 연기하려면 std::launch::deferred를, 함수가 자체 스레드에서 동작하도록 하려면 std::launch::async를 사용하면 됩니다. 혹은 std::launch::deferred | std::launch::async 도 사용할 수 있는데, 이것은 라이브러리 구현자가 선택하도록 하며, 기본값으로 수행됩니다.

4.2.2 Associating a task with a future

std::packaged_task<> 은 future를 함수 혹은 호출가능 객체에 묶어, std::packaged_task<> 객체가 불려지면, 그 객체는 연관된 함수 혹은 호출가능 객체를 호출하고 future를 ready 상태로 만드는데, 이때 리턴값이 연관된 데이터로서 저장합니다. 이는 스레드 풀이나 다른 작업 관리를 위해 사용될 수 있습니다. 만약 큰 작업을 나눌 수 있다면, 이를 작업 관리자나 스레드 풀로 넘겨서 std::packaged_task를 통해 작업을 수행하도록 할 수 있습니다.

 std::packaged_task 클래스 템플릿의 템플릿 인자는 함수 인자로서, 만약 std::packaged_task의 객체를 만들려고 할 때, 함수나 호출가능 객체들은 모두 동일한 형태를 띄고 있거나 변환가능 해야합니다. 이때 반환형은 std::future<>가 반환하는 값의 형태가 됩니다. 이때 std::packaged_task에서 작업이 어떻게 수행되는 지를 밑의 예시를 통해 알아보겠습니다.  먼저 스레드 간에서 작업을 넘기는 예시를 보겠습니다.

 

Lisitng 4.9 Running code on a GUI thread using std::packaged_task

#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>

std::mutex m;
std::deque<std::packaged_task<void()> > tasks;

bool gui_shutdown_message_received();
void get_and_process_gui_message();

void gui_thread()
{
    while(!gui_shutdown_message_received())
    {
        get_and_process_gui_message();
        std::packaged_task<void()> task;
        {
            std::lock_guard<std::mutex> lk(m);
            if(tasks.empty())
                continue;
            task=std::move(tasks.front());
            tasks.pop_front();
        }
        task();
    }
}

std::thread gui_bg_thread(gui_thread);

template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
    std::packaged_task<void()> task(f);
    std::future<void> res=task.get_future();
    std::lock_guard<std::mutex> lk(m);
    tasks.push_back(std::move(task));
    return res;
}

 위의 작업에서 gui_thread는 어떠한 외부 message가 들어오면 tasks에 작업이 있는지 확인하고, 이를 std::packaged_task에 std::move로 넘기는 작업을 수행합니다. 그 이후, task()를 통해 작업을 수행합니다.

 tasks로 작업을 넘겨주는 것도, 간단합니다. 먼저 어느 한 작업을 std::packaged_task로 받은 후에, 이에 대한 결과를 받아오는 창구로는 get_future로 future 객체를 받아오고, 이에 대한 작업을 수행하는 곳으로 보내기 위해서는 tasks 덱에 push_back을 통해 넘겨줍니다. 

 만약 이번 예시와는 달리 단순한 함수 호출로 표현될 수 없는 작업이거나 결과가 여러 군데에서 생기는 작업의 경우 처리는 packaged_task를 통해서는 수행하기 어렵습니다. 따라서 future를 생성하는 3번째 방법으로, 값을 명시적으로 설정하는 std::promise를 사용하여 해결합니다.

 

4.2.3 Making (std::)promise

​ std::promise<T>는 타입 T의 값을 설정하는 수단을 제공하는데, 나중에 이와 연관된 std::future<T>객체를 통해 그 값을 읽을 수 있습니다. std::promise/std::future 쌍은 값을 설정하여 넘겨주는 기능을 위한 방법을 제공하는데, 기다리는 스레드가 future를 기다리는 동안, 데이터를 제공하는 스레드는 값을 설정하기 위해 promise를 사용하여 future를 ready 상태로 만들어 받을 수 있게 합니다.

 std::promise 객체에서 get_future() 멤버 함수를 호출하여 이와 연관된 std::future 객체를 얻을 수 있으며, set_value를 통해 promise의 값이 설정되면, future는 ready 상태가 되면서 저장된 값을 돌려받을 수 있게 됩니다. 이를 Listing 4.10을 통해 알아보도록 하겠습니다.

 

Lisitng 4.10 Handling multiple connections from a single thread using promises

#include <future>
void process_connections(connection_set& connections)
{
    while(!done(connections))
    {
        for(connection_iterator connection=connections.begin(),end=connections.end();
            connection!=end; ++connection)
        {
            if(connection->has_incoming_data())
            {
                data_packet data=connection->incoming();
                std::promise<payload_type>& p= connection->get_promise(data.id);
                p.set_value(data.payload);
            }
            if(connection->has_outgoing_data())
            {
                outgoing_packet data = connection->top_of_outgoing_queue();
                connection->send(data.payload);
                data.promise.set_value(true);
            }
        }
    }
}

 연결이 지속되는 한, 루프가 지속되며, 모든 연결을 확인하게 되는데, has_incoming_data가 존재한다면, 데이터를 받습니다. 이때 데이터에 ID와 payload가 존재한다고 하면, 이를 ID에 해당하는 promise를 가져오고 여기에 payload를 넘겨줌으로서 작업을 시킵니다.

 나가는 패킷에 대해서는 나가는 패킷을 가져온 다음 이를 전송하고서 promise 객체에 잘 전송되었다고 보고를 하도록 합니다. 이를 통해  들어오고 나가는 연결에 대해서 작업을 분산하여 작동하도록 짤 수 있습니다.

 하지만 모든 것이 잘 돌아가지 않을 때는 예외를 던질 수도 있는데 이를 처리하는 방법에 대해서 알아보겠습니다.

 

4.2.4 Saving an exception for future

 ​만약 std::async()에서 호출된 함수가 예외를 던진다면, 그 예외는 future에 저장되면서 future는 ready 상태로 변합니다. 해당 future에서의 get() 호출은 저장된 예외를 다시 던지게 되는데, 이것은 std::packaged_task에서도 동일하게 작동합니다. 작업이 시작되었을 때 해당 함수가 예외를 던진다면, 예외는 future에 결과 대신에 저장되고 future는 get() 호출시 예외를 던지기 위해 ready 상태로 변합니다.  std::promise 또한 명시적인 함수 호출로 같은 기능을 제공합니다. 값 대신에 예외를 저장하기를 원한다면, set_value() 대신에 set_exception() 멤버 함수를 호출하면 됩니다.

double squre_root(double x)
{
  if (x < 0) throw std::out_of_range("x<0");
  return sqrt(x);
};

void worker(std::promise<string>* p) {
  try {
    throw std::runtime_error("Some Error!");
  } catch (...) {
    // set_exception 에는 exception_ptr 를 전달해야 한다.
    p->set_exception(std::current_exception());
  }
}

int main() {
  double y = squre_root(-1);
  std::future<double> f = std::async(square_root, -1);
  try {
    double y = f.get();
  catch {const std:out_of_range& e) {
    std::cout << e.what() << std::endl;;
  };

  std::promise<string> p;
  std::future<string> data = p.get_future();
  std::thread t(worker, &p);
  data.wait();

  try {
    data.get();
  } catch (const std::exception& e) {
    std::cout << "예외 : " << e.what() << std::endl;
  }
  t.join();
}

 exception을 future에 저장하는 다른 방법은 객체를 부르거나 저장하지 않은 채로 std::promise 및 std::packaged_task를 파괴하는 방법입니다. 소멸자가 future가 준비되지 않은 상태에서 파괴된다면 std::future_error 예외 처리를 하는데, 이는 future 객체에 저장됩니다. 만약 문제가 생겨 아무것도 저장하지 않고 파괴되면 future 객체가 계속 기다린다는 문제가 발생하기 때문입니다.

 std::future는 오직 한 스레드의 결과만 기다린다는 단점이 있는데, 이를 위해 std::shared_future이 사용됩니다.

4.2.5 Waiting from multiple threads

std::future가 오직 이동될 수만 있으므로 소유권이 인스턴스들 사이에서 이동되고 그래서 오직 하나의 인스턴스만이 비동기 결과를 가리킬 수 있어 다수의 스레드에서 접근하면 문제가 생길 수 있는 반면, std::shared_future 인스턴스는 복사될 수 있으므로 같은 결과를 가리키는 여러 인스턴스들을 가질 수 있습니다.

std::promise<int> p;
std::future<int> f(p.get_future());          // implicit transfer of ownership
assert(f.valid());                           // f is valid
std::shared_future<int> sf(std::move(f));
assert(!f.valid());                          // f is no longer valid
assert(sf.valid());                          // sf is valid

 위의 코드에서, get_future를 통해 future 객체를 암시적으로 넘겨주었다가 이를 sf로 넘겨주게 되었는데, 여기서 서로 공유할 수 없기에 f에서는 소유권을 가지지 않게 되었습니다. 소유권 이전은 우측값에서는 암시적인 것이므로, std::promise 객체의 get_future() 멤버 함수의 반환값에서 직접 std::shared_future를 생성할 수 있습니다.

std::promise<std::string> p;
std::shared_future<std::string> sf(p.get_future());

 또한, std::future는 share() 멤버함수를 가지고 있는데, 이것은 std::shared_future를 생성하고 그것에 직접 소유권을 이전할 수 있습니다. 이는 auto를 통해서 손쉽게 받을 수 있습니다.

std::promise<std::map<SomeIndexType, SomeDataType, SomeComparator, SomeAllocator>::iterator> p;
auto sf=p.get_future().share();

4.3  Waiting with a time limit

 두 가지 종류의 timeout이 존재하는데, 하나는 지속 시간 기반으로 얼마나 기다릴 지를 정하는 것이고, 다른 하나는 특정한 시간을 기다리는 것으로 특정 시간까지를 기다리는 것입니다. 전자는 _for, 후자는 _until을 뒤에 붙이며, std::condition_variable은 wait_for함수에 대해 위의 둘에 해당하는 오버로딩되어 있습니다.

4.3.1 Clocks

C++ 표준 라이브러리에 관한 한, clock은 시간 정보의 원천으로 이는 4개의 구별되는 정보를 제공하는 클래스입니다.

  • 시간 now
  • clock으로부터 얻은 시간을 표현하는데 사용되는 값의 타입
  • clock의 tick 기간
  • clock tick이 일정한 비율인지의 여부.

 현재 시간은 std::chrono::system_clock::now()와 같이 clock 클래스의 now()를 통해 불러올 수 있습니다.

 steady clock으로 간주될 수 있는지의 여부 clock의 tick 기간은 초의 분수값으로 표시되는데, 초당 25번의 tick은 std::ratio<1, 25>이고, 2.5초당 tick은 std::ratio<5, 2>입니다. 만약 clock이 균일한 비율로 흐르고 조정될 수 없다면, 이는 steady clock이라 불립니다. 일반적으로 system_clock은 steady가 아닌데, 임의로 시간이 조정될 수 있기 때문입니다. steady clock은 타임아웃 계산시에 중요한데, 그래서 C++ 표준 라이브러리는 std::chrono::steady_clock의 형태로 이를 제공합니다.

 

4.3.2 Durations

​ duration은 시간 지원에 있어서 가장 간단한 부분인데, std::chrono::duration<> 클래스 템플릿으로 처리됩니다. 첫번째 템플릿 파라미터는 표현타입(int, long, double)이고, 두번째 템플릿 파라미터는 표현단위가 얼마나 많은 초를 명시하는지를 나타내는 분수입니다. 예를 들어 short형으로 저장된 1분은 std::chrono::duration<short, std::ratio<60,1>>입니다. 만약 ms를 표기하고 싶다면 0.001s 이기 때문에 std::chrono::duration<double, std::ratio<1, 1000>>로 표기되어야 합니다. 이와 같은 기본 타입에 대해서 표준 라이브러리에서 typedef를 통해 nanosecond, microsecond 등 다양한 이름을 저장해두었습니다.

using namespace std::chrono_literals;
auto one_day = 24h;
auto half_an_hour = 30min;

  duration간의 변환은 값의 잘림이 없을 때는 암시적으로 일어나는데, 명시적으로 변환하기 위해서는 std::chrono::duration_cast<>가 사용되어야 합니다.

std::chrono::milliseconds ms(54802);
std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms);

 

 또한 duration에 대한 사칙 연산도 정상적으로 작동하며 (5s = 5 * seconds(1) = minutes(1) - seconds(55)) .count를 사용하면 안의 숫자를 빼낼 수 있습니다.(seconds(55).count() = 1234). 이를 통해  future 객체엇 시간을 측정하여 기다리게 됩니다.

std::future<int> f = std::async(some_task);
if (f.wait_for(std::chrono::milliseconds(35)) == std::future_status::ready)
  do_something_with(f.get());

wait_for의 결과가 시간을 지났을 때 유효하지 않으면 std::future_status::timeout을 아니면 std::future_status::ready 혹은 std::future_status::deferred를 리턴합니다.

4.3.3 Time points

 clock을 위한 ​time point는 std::chrono::time_point<> 클래스 템플릿의 인스턴스로 표현되는데, 이것은 첫번째 템플릿 파라미터는 clock의 종류를, 두번째 템플릿 파라미터는 측정 단위를 나타냅니다. time point의 값은 clock의 기준시점(epoch) 이후부터의 시간의 길이입니다. 일반적인 기준시점은 1970년 1월 1일 0시0분0초와 애플리케이션이 동작하는 컴퓨터의 부팅 시점입니다.

 이는 duration과의 연산이 가능합니다.

 

Lisitng 4.10 Handling multiple connections from a single thread using promises

#include <condition_variable>
#include <mutex>
#include <chrono>
std::condition_variable cv;
bool done;
std::mutex m;
bool wait_loop()
{
    auto const timeout= std::chrono::steady_clock::now()+
        std::chrono::milliseconds(500);
    std::unique_lock<std::mutex> lk(m);
    while(!done)
    {
        if(cv.wait_until(lk,timeout)==std::cv_status::timeout)
            break;
    }
    return done;
}

 

 

4.3.4 Functions that accept timeouts

std::this_thread::sleep_for 혹은 std::this_thread::sleep_until 함수는 받아 움직이는데 이는 wait_for과 유사합니다. 이와 같이 timeout을 받는 함수들은 여럿 존재합니다.

 

4.4  Using synchronization of operations to simplify code

이번 장에서 소개된 동시성 기능들은 동시성을 요구하는 작업들의 구조를 더 짤 수 있도록 돕습니다. 이를 좀 더 수용하기 위해서는 함수적으로 접근하는 것이 좋습니다. 스레드들 간에 데이터를 공유하기 보다는 각각의 작업이 필요한 데이터를 내재하고 다른 스레드에서 필요한 데이터들은 future 객체를 얻도록 하는 것이 좋습니다.

 

4.4.1 Functional Programming with futures

​ 함수형 프로그래밍(functional programming, FP)이라는 용어는 함수 호출의 결과가 오직 그 함수에 전달된 파라미터에만 의존하고 외부 상태에 의존하지 않는 프로그래밍 스타일을 일컫습니다. 이것은 수학적 함수 개념과 유사한데, 같은 인자로 2번 함수를 호출해도 그 결과는 동일하다는 것을 의미합니다. 순수 함수(pure function)는 어떠한 외부 상태도 변경하지 않으며, 함수의 효과는 오직 리턴값으로만 제한됩니다. 이는 공유 데이터나 경쟁 조건에 대해 데이터 보호에 신경을 쓰지 않아도 되기에 더 쉽게 멀티스레딩을 짤 수 있도록 도와줍니다.

 C++은 멀티패러다임 언어이어서, 프로그램을 FP 스타일로 작성하는 것이 가능합니다. C++11에서는 이것이 더 쉬워졌는데, 람다 함수의 출현과 std::bind(), 그리고 타입을 추론하는 자동타입 때문입니다. future는 C++에서 FP 스타일의 동시성을 만드는 마지막 퍼즐 조각으로, 계산 결과를 다른 스레드의 결과에 의존하게 해주기 위해 tuture를 스레드들간에 넘겨줄 수 있는데, 어떠한 공유 데이터도 명시적으로 접근할 필요가 없어졌기 때문입니다. 다음은 FP 스타일로 동시 퀵소트를 수행한 경우의 예시 코드입니다.

 

Lisitng 4.12 A sequential implements of Quicksort

template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
    if(input.empty()) {
        return input;
    }
    std::list<T> result;
    result.splice(result.begin(),input,input.begin());
    T const& pivot = *result.begin();
    auto divide_point = 
        std::partition(input.begin(), input.end(), [&](T const& t){return t<pivot;});
    std::list<T> lower_part;
    lower_part.splice(lower_part.end(), input,input.begin(), divide_point);
    std::future<std::list<T>> new_lower(std::async(&parallel_quick_sort<T>, 
                                                          std::move(lower_part)));
    auto new_higher(parallel_quick_sort(std::move(input)));
    result.splice(result.end(), new_higher);
    result.splice(result.begin(), new_lower.get());
    return result;
}

 보통 사용하는 함수형과의 가장 큰 차이점은 future 객체를 사용했다는 점입니다. 동시성을 활용하지 않는다면, lower과 higher에 대해서 순서대로 수행하였겠지만, lower에 대해서는 다른 스레드에서 수행하되, higher에 대해서는 현재의 스레드에서 처리하도록 만들었습니다. 

아무 플래그도 주지 않았기에 스레드를 생성할 지 아니면 현재 스레드에서 진행할 지 자동적으로 처리해주고 있습니다. 만약 스레드풀을 활용하여 멀티스레딩 형식으로 전부 사용하고 싶다면, async을 사용하기 보다는 spawn_task 함수를 직접 만들어 사용할 수도 있습니다.

 

Lisitng 4.14 A sample implementation of spawn_task

#include <future>
template<typename F,typename A>
std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f,A&& a)
{
    typedef std::result_of<F(A&&)>::type result_type;
    std::packaged_task<result_type(A&&)> task(std::move(f));
    std::future<result_type> res(task.get_future());
    std::thread t(std::move(task),std::move(a));
    t.detach();
    return res;
}

 이는 스레드 std::packaged_task를 만들어 함수를 부르고, future 객체를 받아 값을 받아옵니다. 작업을 큐에 넣어 스레드 풀에서 돌아가게 하는 초석을 다질 수는 있습니다. 만약 프로그램에 멀티스레딩의 운영을 맡기기 보다는 자체적으로 수행하고 싶다면 이러한 방법도 괜찮습니다.

 Fp와 비슷한 패러다임으로는 CSP가 존재합니다. 이 또한 스레드들을 공유 데이터가 존재하지 않도록 개념적으로 독립시켜놓고 스레드 사이에 메세지만 전달할 수 있도록 하였습니다.

4.4.2 Synchronizing operations with message passing

​CSP(Communicating Sequential Processes)의 개념은 간단합니다. 공유 데이터가 없다면, 각각의 스레드들은 완전히 독립적으로 동작할 수 있으며 오직 그 스레드가 받은 메시지에 반응하기만 하면 된다는 것입니다. 그러므로 각각의 스레드는 효과적으로 상태 기계(state machine)가 될 수 있는데, 메시지를 받으면 현재 상태에 따른 동작을 수행하고 일정한 방법으로 그 상태를 업데이트하고 다른 스레드에 하나 이상의 메시지를 전달할 수도 있습니다.

4.4.3 Continuation-style concurrency with the Concurrency TS

Concurrency TS는 std::experimental에서 새로운 버전의 std::promise와 std::packaged_task를 제공합니다. 이들은 std::experimental::future을 반환하는데, 이의 continuation이라는 새로운 특성을 활용할 수 있습니다. 이때까지의 future 객체는 결과를 기다리거나, 정해진 시간 동안 기다렸습니다. 하지만 이는 불편하며 직관적이지 않습니다. 이를 좀 더 직관적으로 구현해주는 메서드가 .then()입니다. then()의 안쪽에 그 다음 할일을 적으면 됩니다.

std::experimental::future<int> find_the_answer;
auto fut = find_the_answer();
auto fut2 = fut.then(find_the_question);
assert(!fut.valid());
assert(fut2.valid());

std::string find_the_question(std::experiemtnal::future<int> the_answer);

위 처럼 fut가 끝나면 이에 대한 결과를 바로 .then을 통해 find_the_question을 수행하는데, 이때 함수의 인자는 future 객체를 인자로 가집니다. 또한 fut은 다음 함수를 수행하면서 future 객체를 이동시켜 함수에서 사용하고 파괴되었기에 더 이상 유효하지 않게됩니다. Concurrency TS에 std::async과 같은 함수는 존재하지 않지만, 다음과 같이 유사하게 구현하여 .then도 활용이 가능합니다.

Lisitng 4.17 A simple equivalent to std::async for Concurency TS futures

#include <experimental/future>
template<typename Func>
std::experimental::future<decltype(std::declval<Func>()())> spawn_async(Func&& func)
{
    std::experimental::promise<decltype(std::declval<Func>()())> p;
    auto res=p.get_future();
    std::thread t(
        [p=std::move(p),f=std::decay_t<Func>(func)]()
            mutable{
            try{
                p.set_value_at_thread_exit(f());
            } catch(...){
                p.set_exception_at_thread_exit(std::current_exception());
            }
    });
    t.detach();
    return res;
}

4.4.4 Chaining continuations

 시간을 소모하는 일들이 많고 이들을 비동기적으로 수행해야할 때, .then을 연속해서 사용할 수 있습니다.

Lisitng 4.20 A function to process user login with continuations

#include <experimental/future>
std::experimental::future<void> process_login(
    std::string const& username,std::string const& password)
{
    return spawn_async([=](){
        return backend.authenticate_user(username,password);
    }).then([](std::experimental::future<user_id> id){
        return backend.request_current_info(id.get());
    }).then([](std::experimental::future<user_data> info_to_display){
        try{
            update_display(info_to_display.get());
        } catch(std::exception& e){
            display_error(e);
        }
    });
}

 std::experimental::shared_future도 continuation이 가능합니다.

4.4.5 Waiting for more than one future 

 프로세스에 있어 많은 데이터가 존재하고 Future 객체로 받을 때, 이들을 하나하나 받는 것은 상당히 불편합니다. 따라서 다른 비동기적 작업을 통해 이러한 future 객체들을 받아와 결과를 얻도록 하는 것이 좋습니다. 만약 이를 선형적이며 개별적으로 수행하고 결과를 받아온다면 이는 아직 끝나지 않은 작업을 기다려야하고, 이는 기다리는 시간을 잡아먹을 뿐만 아니라 future를 ready 시키기 위해 작업을 수행하는데 필요한 컨텍스트 스위칭을 빈번하게 발생시킬 수 있습니다. std::experimental::when_all을 사용하면, 이는 기다리고 스위치하는 것을 피할 수 있습니다. when_all에서 future들이 기다리게 한다면, 이들이 모두 준비가 되었을 때 ready로 상태가 변하여 값들을 받아올 수 있게 됩니다. 이는 다음과 같이 코드에 활용할 수 있습니다.

std::experimental::future<FinalResult> process_data(std::vector<MyData>& vec)
{
    size_t const chunk_size=whatever;
    std::vector<std::experimental::future<ChunkResult>> results;
    for(auto begin=vec.begin(),end=vec.end();beg!=end;){
        size_t const remaining_size=end-begin;
        size_t const this_chunk_size=std::min(remaining_size,chunk_size);
        results.push_back(
            spawn_async(
            process_chunk,begin,begin+this_chunk_size));
        begin+=this_chunk_size;
    }
    return std::experimental::when_all(
        results.begin(),results.end()).then(
        [](std::future<std::vector<
             std::experimental::future<ChunkResult>>> ready_results)
        {
            std::vector<std::experimental::future<ChunkResult>>
                all_results=ready_results .get();
            std::vector<ChunkResult> v;
            v.reserve(all_results.size());
            for(auto& f: all_results)
            {
                v.push_back(f.get());
            }
            return gather_results(v);
        });
}

 4.4.6 Waiting for the first future in a set with when_any

when_all은 모든 데이터가 준비될 때까지 기다렸지만, when_any는 어떤 하나라도 준비가 된다면 실행하도록 짜는 코드 입니다.

std::experimental::future<FinalResult>
find_and_process_value(std::vector<MyData> &data) {
    unsigned const concurrency = std::thread::hardware_concurrency();
    unsigned const num_tasks = (concurrency > 0) ? concurrency : 2;
    std::vector<std::experimental::future<MyData *>> results;
    auto const chunk_size = (data.size() + num_tasks - 1) / num_tasks;
    auto chunk_begin = data.begin();
    std::shared_ptr<std::atomic<bool>> done_flag =
        std::make_shared<std::atomic<bool>>(false);
    for (unsigned i = 0; i < num_tasks; ++i) {
        auto chunk_end =
            (i < (num_tasks - 1)) ? chunk_begin + chunk_size : data.end();
        results.push_back(std::experimental::async([=] {
            for (auto entry = chunk_begin; !*done_flag && (entry != chunk_end);
                 ++entry) {
                if (matches_find_criteria(*entry)) {
                    *done_flag = true;
                    return &*entry;
                }
            }
            return (MyData *)nullptr;
        }));
        chunk_begin = chunk_end;
    }
    std::shared_ptr<std::experimental::promise<FinalResult>> final_result =
        std::make_shared<std::experimental::promise<FinalResult>>();
    struct DoneCheck {
        std::shared_ptr<std::experimental::promise<FinalResult>> final_result;

        DoneCheck(
            std::shared_ptr<std::experimental::promise<FinalResult>>
                final_result_)
            : final_result(std::move(final_result_)) {}

        void operator()(
            std::experimental::future<std::experimental::when_any_result<
                std::vector<std::experimental::future<MyData *>>>>
                results_param) {
            auto results = results_param.get();
            MyData *const ready_result = results.futures[results.index].get();
            if (ready_result)
                final_result->set_value(process_found_value(*ready_result));
            else {
                results.futures.erase(results.futures.begin() + results.index);
                if (!results.futures.empty()) {
                    std::experimental::when_any(
                        results.futures.begin(), results.futures.end())
                        .then(std::move(*this));
                } else {
                    final_result->set_exception(
                        std::make_exception_ptr(                    
                            std::runtime_error(“Not found”)));
                }
            }
        }
    };

    std::experimental::when_any(results.begin(), results.end())
        .then(DoneCheck(final_result));
    return final_result->get_future();
}

 

4.4.7 Latches and barriers in the Concurrency TS

latch는 카운터가 0이 되었을 때 ready 상태에 돌입하며, 이는 쭉 지속 됩니다. 반면에 barrier은 재사용가능한 구성요소로서 latch는 어떤 스레드가 카운터를 줄였는지 알 수 없는데에 반해, barrier은 같은 스레드가 카운터를 연속해서 내리지 못하도록 막습니다. 즉 각각의 스레드는 한 사이클에 한번만 barrier에 도착할 수 있습니다. 모든 스레드들이 도착한 후에야 풀리게 되며, 이는 다시 사이클을 돌게 됩니다.

4.4.8 A basic latch type: std::experimental::latch

std::experimental::latch를 사용할 때, 초기값을 넣어주어 생성해야 합니다. 이후 특정 이벤트가 작동하면 count_down을 통해 카운터를 줄이고, 0이되는 순간 ready 상태가 됩니다.

void foo(){
    unsigned const thread_count=...;
    latch done(thread_count);                     
    my_data data[thread_count];
    std::vector<std::future<void> > threads;
    for(unsigned i=0;i<thread_count;++i)
        threads.push_back(std::async(std::launch::async,[&,i]{         
            data[i]=make_data(i);
            done.count_down();                     
            do_more_stuff();                       
        }));
    done.wait();                                   
    process_data(data,thread_count);               
}

4.4.9 std::experimental::barrier : a basic barrier

barrier에는 두가지 종류가 존재하는데 std::experiemtnal::barrier와 std::experiemtnal::flex_barrier이 존재합니다. 전자는 기본적이며 적은 오버헤드를 가지고, 후자는 유연하지만 더 큰 오버헤드를 가집니다.

 모든 작업이 같이 수행될 필요는 없지만 어느 시점에서 기다려야만 할 때 std::experimental::barrier이 사용될 수 있습니다. arrive_and_wait을 통해 barrier에 도착하여 대기하도록 할 수 있습니다.

result_chunk process(data_chunk);
std::vector<data_chunk>
divide_into_chunks(data_block data, unsigned num_threads);

void process_data(data_source &source, data_sink &sink) {
    unsigned const concurrency = std::thread::hardware_concurrency();
    unsigned const num_threads = (concurrency > 0) ? concurrency : 2;

    std::experimental::barrier sync(num_threads);
    std::vector<joining_thread> threads(num_threads);

    std::vector<data_chunk> chunks;
    result_block result;

    for (unsigned i = 0; i < num_threads; ++i) {
        threads[i] = joining_thread([&, i] {
            while (!source.done()) {
                if (!i) {
                    data_block current_block = source.get_next_data_block();
                    chunks = divide_into_chunks(current_block, num_threads);
                }
                sync.arrive_and_wait();
                result.set_chunk(i, num_threads, process(chunks[i]));
                sync.arrive_and_wait();
                if (!i) {
                    sink.write_data(std::move(result));
                }
            }
        });
    }
}

 위의 코드에서 if(!i)의 부분은 하나 스레드가 데이터를 가져오고 이를 쓰기 위함이며, sync.arrive_and_wait을 통해 처음에 데이터를 가져오는 것을 기다리고, 두번째는 각자 데이터를 처리하는 것을 기다리게 됩니다. 

4.4.10 std::experimental::flex_barrier— std::experimental::barrier’s flexible friend

std::experiemtnal::flex_barrier은 std::experimental::barier와 인터페이스에서 하나가 다른데, 완성된 함수를 생성자 인자로 받는다는 것입니다. 이 함수는 모든 스레드들이 barrier에 도착하면, 하나의 스레드에서만 작동하는 함수입니다. 이는 다음 사이클에 작동하는 스레드의 수를 줄이거나 연속적으로 수행되어야하는 일을 수행하게 만들 수 있습니다. 예시는 다음과 같습니다.

result_chunk process(data_chunk);
std::vector<data_chunk>
divide_into_chunks(data_block data, unsigned num_threads);

void process_data(data_source &source, data_sink &sink) {
    unsigned const concurrency = std::thread::hardware_concurrency();
    unsigned const num_threads = (concurrency > 0) ? concurrency : 2;

    std::vector<data_chunk> chunks;

    auto split_source = [&] {
        if (!source.done()) {
            data_block current_block = source.get_next_data_block();
            chunks = divide_into_chunks(current_block, num_threads);
        }
    };

    split_source();

    result_block result;
    
    std::experimental::flex_barrier sync(num_threads, [&] {
        sink.write_data(std::move(result));
        split_source();
        return -1;
    });
    std::vector<joining_thread> threads(num_threads);


    for (unsigned i = 0; i < num_threads; ++i) {
        threads[i] = joining_thread([&, i] {
            while (!source.done()) {
                result.set_chunk(i, num_threads, process(chunks[i]));
                sync.arrive_and_wait();
            }
        });
    }
}

위의 코드에서 하나의 코드가 데이터를 쓰고, 소스를 나눈 다음에 Return -1을 하면서 빠져나가게 됩니다.