Skip to content

oneTBBの使い方

C++ Advent Calendar 2024 22日目の記事です。 今回は意外と少なかったoneTBB(旧intel TBB)の使い方について書きます。

oneTBBとは

oneTBBとはintelが中心となって開発している、お手軽マルチスレッディングライブラリです。 C++20ではセマフォやバリアといった多数の同期処理に関する機能が追加されましたが、これらを使ってマルチスレッドなプログラムを書くのは慣れてないとなかなか大変だと思います。 oneTBBではfor文やreducesortといった処理を並列処理する関数やスレッドセーフなコンテナを用意しており、並列処理に詳しくなくても書けるように設計されています。

名前空間はoneapi::tbbで、oneapi/tbb.hをインクルードすれば全機能が使えます。

では、実際にどんな関数があるのか見ていきましょう。

単純な並列処理

ここでは、比較的単純な並列処理を行う関数について見ていきます。

前提知識

以下で紹介する関数は引数にRange型という、何らかの範囲を表すクラスのインスタンスを指定します。

Note

parallel_forには指定しない関数オーバーロードもありますが、並列実行されるとは限らないため省略します1

これは例えば

\[ \sum_{n = 1}^{\infty} \frac{1}{n^2} \]

の計算のように、各要素の計算は時間がかからないが要素数が多い場合、スレッド間の同期処理によるオーバーヘッドを減らすために導入されています。

いくつか種類がありますが3blocked_rangeを知っていれば大体のケースは問題ないでしょう。

oneapi/tbb/blocked_range.h
namespace oneapi::tbb {
template<typename Value>
class blocked_range {
    //! Type of a value
    /** Called a const_iterator for sake of algorithms that need to treat a blocked_range as an STL container. */
    using const_iterator = Value;

    //! Type for size of a range
    using size_type = std::size_t;

    //! Construct range over half-open interval [begin,end), with the given grainsize.
    blocked_range( Value begin_, Value end_, size_type grainsize_=1 );

    //! Beginning of range.
    const_iterator begin() const;

    //! One past last value in range.
    const_iterator end() const;

    //! Size of the range
    size_type size() const;

    //! True if range is empty.
    bool empty() const;
};
}

ここでは、ユーザーが知っていればいいものだけを抜き出しました。 このクラス2は0以上の整数の範囲 (正確には右半開区間) を表すために使われます。 テンプレートパラメーターのValueは普通はstd::size_tを指定すれば良いです4。 コンストラクタで範囲を指定しましょう。

Note

beginの戻り値はイテレータではなくValue型の値です。 間違えやすい上によく使うので、注意しましょう。

以下の関数では、渡されたRange型のインスタンスを分割して、各スレッドで分割後の範囲を実行しています。

parallel_for

oneapi/tbb/parallel_for.h
namespace oneapi::tbb {
    template<typename Range, typename Body>
    void parallel_for(const Range& range, const Body& body);
}

for文のように、各要素に対してbodyを並列に実行する関数です。 Body型にも要件がありますが5[...](const Range& r) {...}で十分です。

以下はparallel_forを使った簡単な例です。

src/how_to_use_onetbb/parallel_for.cpp
#include <vector>

#include <oneapi/tbb/parallel_for.h>

int main() {
    using Range = oneapi::tbb::blocked_range<std::size_t>;

    std::vector<double> v;
    v.resize(1e6);

    oneapi::tbb::parallel_for(Range(1, v.size() + 1), [&v](const Range& r){
        for (auto i = r.begin(); i != r.end(); ++i) {
            v[i - 1] = 1.0 / (i * i);
        }
    });

    for (std::size_t i = 1; i < v.size() + 1; ++i) {
        if (v[i - 1] != 1.0 / (i * i)) {
            return 1;
        }
    }
    return 0;
}

Note

parallel_forはループ全体で少なくとも100万命令以上ある処理に対して使いましょう。

parallel_reduce

oneapi/tbb/parallel_reduce.h
namespace oneapi::tbb {
    template<typename Range, typename Value, typename Func, typename Reduction>
    Value parallel_reduce(const Range& range, const Value& identity, const Func& func, const Reduction& reduction);
}

parallel_reduce6std::reduceのように、与えられた範囲rangeを集計する関数です。集計順は

range以外の仮引数の要件は、単純化すると以下の通りです。

  • identity: コピー構築可かつコピー代入可な型の左単位元(x == reduction(identity, x)を満たす値)。funcの仮引数xに渡されます。
  • func: [...](const Range& r, const Value& x) -> Value {...}を渡せば十分。この関数は初期値xrの範囲の値を集計するために使われます。
  • reduction: [...](const Value& x, const Value& y) -> Value {...}を渡せば十分。この関数は2つの値をまとめるために使われます。

以下はparallel_reduceを使った簡単な例です。

src/how_to_use_onetbb/parallel_reduce.cpp
#include <cmath>
#include <iostream>
#include <vector>

#include <oneapi/tbb/blocked_range.h>
#include <oneapi/tbb/parallel_reduce.h>

int main() {
    using Range = oneapi::tbb::blocked_range<std::size_t>;

    std::vector<double> v;
    v.resize(1e6);

    for (std::size_t i = 1; i < v.size() + 1; ++i) {
      v[i] = 1.0 / (i * i);
    }

    double sum = oneapi::tbb::parallel_reduce(
      Range(0, v.size()),
      0.0,
      [&v](const Range& r, double local_sum) {
        for (auto i = r.begin(); i != r.end(); ++i) {
            local_sum += v[i];
        }
        return local_sum;
      },
      [](double x, double y) {
        return x + y;
      }
    );

    std::cout << std::sqrt(6 * sum) << '\n';

    return 0;
}

多数のstd::setをマージすることも出来ます。7

src/how_to_use_onetbb/parallel_reduce_rvalue.cpp
#include <cmath>
#include <iostream>
#include <set>
#include <vector>
#include <stdexcept>

#include <oneapi/tbb/blocked_range.h>
#include <oneapi/tbb/parallel_reduce.h>

int main() {
    using Range = oneapi::tbb::blocked_range<std::size_t>;
    using Set = std::set<double>;

    std::vector<Set> v;
    v.resize(1e4);

    for (std::size_t i = 1; i < v.size() + 1; ++i) {
      Set s;
      for (std::size_t j = 1; j < 1e2 + 1; ++j) {
        auto k = (i - 1) * 1e2 + j;
        s.insert(1.0 / (k + k));
      }
      v[i - 1] = std::move(s);
    }

    Set merged = oneapi::tbb::parallel_reduce(
      Range(0, v.size()),
      Set{},
      [&v](const Range& r, Set&& local_sum) {
        for (auto i = r.begin(); i != r.end(); ++i) {
            if (local_sum.size() < v[i].size()) {
              local_sum.swap(v[i]);
            }
            local_sum.merge(v[i]);
            if (not v[i].empty()) {
              throw std::runtime_error("something is wrong!\n");
            }
        }
        return std::move(local_sum);
      },
      [](Set&& x, Set&& y) {
        if (x.size() < y.size()) {
          x.swap(y);
        }
        x.merge(y);
        if (not y.empty()) {
          throw std::runtime_error("something is wrong!\n");
        }
        return std::move(x);
      }
    );

    if (merged.size() != 1e6) {
      return 1;
    }
    return 0;
}
Note

parallel_reduceには実はもう一つ

template<typename Range, typename Body>
void parallel_reduce(const Range& range, Body& body);
というオーバーロードがあります。このオーバーロードは2つ目の例のような右辺値を集計したい時に使う機会がありましたが、1つ目の方が対応したため使う機会がないと思い省略しました。

複雑な並列処理

この節で登場する関数は一部シングルスレッドで動作します。 紹介しておいてなんですが、なるべく使わない方が良いと思います。

parallel_for_each

oneapi/tbb/parallel_for_each.h
namespace oneapi::tbb {
    // (1)
    template<typename InputIterator, typename Body>
    void parallel_for_each( InputIterator first, InputIterator last, Body body );

    // (2)
    template<typename Container, typename Body>
    void parallel_for_each( Container& c, Body body );

    // (3)
    template<typename Container, typename Body>
    void parallel_for_each( const Container& c, Body body );
}

この関数はループ回数がわからないけど、各要素を並列処理したいという時に使います。 一見便利に見えますが、要素アクセスがシングルスレッドで動作するので注意しましょう。

テンプレートパラメーターInputIterator入力イテレータでなければいけません。 bodyInputIterator前方向イテレータかどうかでwell-definedな引数が決まります。 説明のため

using value_type = typename std::iterator_traits<InputIterator>::value_type;

とすると、

  1. InputIteratorが前方向イテレータでない場合、以下のラムダ式をbodyに渡せます。
    [...](const value_type& item) {...}
    [...](value_type&& item) {...}
    
  2. InputIteratorが前方向イテレータの場合、1に加えて以下のラムダ式もbodyに渡せます。
    [...](value_type& item) {...}
    

オーバーロードの2と3はparallel_for_each(std::begin(c), std::end(c), body)と同じです。

src/how_to_use_onetbb/parallel_for_each.cpp
#include <vector>

#include <oneapi/tbb/parallel_for_each.h>

int main() {
    using Range = oneapi::tbb::blocked_range<std::size_t>;

    std::vector<double> v;
    v.resize(1e6);
    for (std::size_t i = 0; i < v.size(); ++i) {
      v[i] = i;
    }

    oneapi::tbb::parallel_for_each(v.begin(), v.end(), [&v](double& d){
      d = d * d;
    });

    for (std::size_t i = 0; i < v.size(); ++i) {
        if (v[i] != i * i) {
            return 1;
        }
    }
    return 0;
}

parallel_pipeline

oneapi/tbb/parallel_pipeline.h
namespace oneapi::tbb {
    void parallel_pipeline(size_t max_number_of_live_tokens, const filter<void,void>& filter_chain);
}

この関数は工場の流れ作業のように、filter_chainに与えられた関数を実行します。 詳しい説明の前に具体例を見た方が早いでしょう。

src/how_to_use_onetbb/parallel_pipeline.cpp
#include <algorithm>
#include <iostream>
#include <random>

#include <oneapi/tbb/parallel_pipeline.h>

int main() {
  using Seed  = std::random_device::result_type;
  using Seeds = std::array<Seed, 2>;
  using Coord = std::array<double, 2>;

  std::random_device seed_gen;

  std::size_t counter = 0;

  std::size_t i = 0;
  constexpr std::size_t N = 1e6;

  oneapi::tbb::parallel_pipeline(
    N,
    oneapi::tbb::make_filter<void, Seeds>(
      oneapi::tbb::filter_mode::serial_out_of_order,
      [&](oneapi::tbb::flow_control& fc) -> Seeds {
        if (i < N) {
          ++i;
          return {seed_gen(), seed_gen()};
        } else {
          fc.stop();
          return {0, 0};
        }
      }
    ) &
    oneapi::tbb::make_filter<Seeds, std::size_t>(
      oneapi::tbb::filter_mode::parallel,
      [&](const Seeds& seeds) -> std::size_t {
        std::uniform_real_distribution<double> dist(0, 1);
        Coord coord;
        for (std::size_t i = 0; i < 2; ++i) {
          std::ranlux48 engine(seeds[i]);
          coord[i] = dist(engine);
        }
        if (coord[0] * coord[0] + coord[1] * coord[1] < 1) {
          return 1;
        }
        return 0;
      }
    ) &
    oneapi::tbb::make_filter<std::size_t, void>(
      oneapi::tbb::filter_mode::serial_out_of_order,
      [&](std::size_t i) -> void {
        counter += i;
      }
    )
  );

  std::cout << static_cast<double>(4 * counter) / N << std::endl;

  return 0;
}

見ての通り、filter_chainにはoneapi::tbb::make_filter8で作成したfilterを渡します。 別のフィルターを後ろにつけるには&で繋げます。

フィルター間で受け渡しされる値をトークンとすると、max_number_of_live_tokensは並列実行可能なトークンの最大数です11

make_filter

oneapi/tbb/parallel_pipeline.h
namespace oneapi::tbb {
    template<typename InputType, typename OutputType, typename Body>
    filter<InputType, OutputType> make_filter(filter_mode mode, const Body& body);
}

InputTypeには前のフィルターから受け取る値の型を、OutputTypeには後ろのフィルターに渡す値の型を書きます。 ない場合はvoidです。

modeには以下の表の値を指定します。9 すべてoneapi::tbb名前空間の下にあります。

mode 意味
parallel 並列実行
serial_in_order 順番通りに逐次実行
serial_out_of_order 順不同に逐次実行

bodyは最初のフィルターでない限り、[...](InputType&&) -> OutputType {...}を渡せば良いです(bodyには右辺値が渡されます10)。 最初のフィルターの場合、[...](oneapi::tbb::flow_control& fc) -> OutputType {...}を渡せば良いです。 ただし、関数内でfc.stop()を呼んでパイプラインを終わらせる必要があります。

その他

parallel_sort

oneapi/tbb/parallel_sort.h
namespace oneapi::tbb {
    // (1)
    template<typename RandomAccessIterator>
    void parallel_sort(RandomAccessIterator begin, RandomAccessIterator end);

    // (2)
    template<typename RandomAccessIterator, typename Compare>
    void parallel_sort(RandomAccessIterator begin, RandomAccessIterator end, const Compare& comp);

    // (3)
    template <typename Container>
    void parallel_sort(Container&& c) {
        parallel_sort(std::begin(c), std::end(c));
    }

    // (4)
    template <typename Container, typename Compare>
    void parallel_sort(Container&& c, const Compare& comp) {
        parallel_sort(std::begin(c), std::end(c), comp);
    }
}

与えられた半開区間[begin, end)またはコンテナcを、std::lessまたはcompを用いて並列にソートする関数。 イテレータRandomAccessIteratorはランダムアクセスイテレータで、値はムーブ可能でなければならない。 また、比較オブジェクトcompbool operator()(const T&, const T&)(Tは値型)を実装していなければならない。

FAQ

Parallel STLがあるからTBB要らないんじゃない?

簡単な並列処理ならPSTLを使ったほうが楽だと思いますが、TBBを直に使う方がより複雑な並列処理ができます。 実装の詳細までは知りませんが、TBBではちゃんと設定しないとparallel_forの中で他の並列処理を呼ぶと正しく処理してくれません。なので、そういう処理を行う際は必要になるかなと思います。

あと説明していませんでしたが、TBBにはFlow Graphという命令並列な並列処理を行う機能があります。 また、タスクの中断時の挙動の設定やエラー処理もTBBにはあるので、こういう機能で差別化できているかなと思います。

余談

parallel_reduceの大量のstd::setをマージするという例は実は私がコードを書いていた中で出てきた問題でした。 oneTBBにイシューを立てたところ、Bodyを使った方法を親切に教えてもらいました。 intelは最近あまり良い話題がありませんが、何らかの形で恩返し出来たらなと思います。

PSTLの実装について

実はGNU、LLVMどちらも内部でTBBを使っています。 具体的には、GNUはLLVMの実装を使っていて、フラグでTBBを使うようにしており12、 LLVMの方もデフォルトでは逐次実行ですが、TBBを使うフラグを用意しています13

LLVMの実装の理由は環境依存の依存関係を作りたくないからのようですが、使う際は逐次実行の可能性もあるため、どのようにビルドしたかまで調べる必要があります。 気持ちはわかるんですが、こんなことされるとPSTL使いたくないなと思ってしまいました。

参考