次の方法で共有


同時実行の問題

C++ プロジェクトで Concurrency Runtime を使用する 4 とおりの方法

Rick Molloy

サンプル コードのダウンロード

Visual Studio 2010 のベータ リリースに新たに含まれる並列コンピューティング ライブラリを既存の C++ プロジェクトに統合する方法について尋ねられることがよくあります。そこで、今回のコラムでは、Parallel Pattern Library (PPL)、Asynchronous Agents Library、および Concurrency Runtime に含まれる API やクラスを既存のプロジェクトで使用する方法をいくつか紹介します。マルチスレッド アプリケーションを開発する際に開発者がよく直面するシナリオを 4 つ取り上げ、PPL と Agents Library を使用してマルチスレッド プログラムの効率を高め、スケーラビリティを向上することが、いかに生産性の向上に直結するかを説明します。

シナリオ 1: 作業を UI スレッドからバックグラウンド タスクに移動する

Windows 開発者としてまず避けなければならないことは、UI スレッドをハングさせることです。たとえ、開発者が待機状態を示すマウス ポインターを用意したり、ハング状態の UI が Windows によって曇りガラスのウィンドウとして表示されたりしても、ウィンドウが応答しなければ、エンドユーザーが受ける印象は最悪です。このような場合に受けるアドバイスはいつも、「UI スレッドではブロッキング呼び出しを行わず、このような呼び出しはすべてバックグラウンド スレッドに移動してください」というものです。私の経験では、このアドバイスは不十分で、必要なコーディング作業はうんざりするほどたくさんあり、間違いを起こしやすく、不快ですらあります。

このシナリオでは、簡単な順次処理を行う例を用意し、既存のスレッド処理 API を使用して作業を移動する方法を示します。その後、PPL と Agents Library を使用して作業をバックグラウンド スレッドに移動する 2 とおりの方法について説明します。最後にまとめとして、この例を UI スレッドの仕様に立ち返って結び付けます。

長時間実行される順次操作をバックグラウンド スレッドに移動

では、作業をバックグラウンド スレッドに移動することにはどのような意味があるのでしょう。長時間実行される関数や他の操作をブロックする可能性のある関数がいくつかある場合に、これらの関数をバックグラウンド スレッドに移動するとします。このような場合に作業を実際に移動するメカニズムは、次に示すような単純な 1 つの関数であっても、大量のスケルトン コードが必要になります。

void SomeFunction(int x, int y){
        LongRunningOperation(x, y);
}

まず、使用予定のすべての状態をパッケージ化する必要があります。ここでは、1 組の整数をパッケージ化するだけです。それには、std::vector、std::pair、std::tuple のような組み込みのコンテナーを使用できますが、皆さんがよくやるように、次のような独自の構造体に値をパッケージ化します。

struct LongRunningOperationParams{
        LongRunningOperationParams(int x_, int y_):x(x_),y(y_){}
        int x;
        int y;
}

次に、グローバル関数または静的関数を 1 つ作成する必要があります。この関数では、Threadpool シグネチャまたは CreateThread シグネチャを照合し、(通常は void * ポインターを逆参照することで) 状態のパッケージ化を解除して、関数を実行し、必要に応じてデータを削除します。以下に例を示します。

DWORD WINAPI LongRunningOperationThreadFunc(void* data){
                LongRunningOperationParams* pData =
           (LongRunningOperationParams*) data;
                LongRunningOperation(pData->x,pData->y);
                //delete the data if appropriate
         delete pData;
}

これで、次に示すように、最終的には、そのデータを使ってスレッドのスケジュールを実際に設定できるようになります。

void SomeFunction(int x, int y){
        //package up our thread state
        //and store it on the heap
        LongRunningOperationParams* threadData =
        new  LongRunningOperationParams(x,y);
        //now schedule the thread with the data
        CreateThread(NULL,NULL,&LongRunningOperationThreadFunc,
          (void*) pData,NULL);
}

それほど多くのコードが必要なようには思えません。技術的には、SomeFunction に 2 行、クラスに 4 行、スレッド関数に 3 行追加しただけです。しかし、実際のコード量はこの 4 倍になります。スケジュールを設定する 1 つの関数が受け取るパラメーターの数を 2 つにするだけで、3 行のコードが 12 行に増加します。私が以前にこうした作業を行ったときは、確か、8 つの変数をキャプチャする必要がありました。この 8 つの変数の状態をすべてキャプチャおよび設定するだけで、コードは実に複雑になり、間違いが起こりやすくなりました。正確に思い出してみると、状態をキャプチャし、コンストラクターをビルドする処理だけでも少なくとも 2 つのバグを見つけて解決しました。

そのときはスレッドの完了を待機する作業は行いませんでしたが、こうした作業では通常、イベントを作成し、ハンドルを追跡するために WaitForSingleObject を呼び出す必要があります。もちろん、使い終わったハンドルのクリーンアップも必要です。これだけでもさらに 3 行のコードが必要で、例外やリターン コードの処理もまだ残っています。

CreateThread の代わり: task_group クラス

まず、PPL の task_group クラスを使用する方法から説明します。task_group クラスについてあまり詳しくない方のために簡単に説明すると、このクラスには、task_group::run を使用してタスクを非同期に起動し、task_group::wait を使用してそのタスクの完了を待機するメソッドが用意されています。また、まだ開始されていないタスクを取り消す機能や、例外を std::exception_ptr にパッケージ化し、再スローする機能もあります。

ここで示すコードは CreateThread を使用する方法に比べてはるかにコード量が少なく、読みやすさの点でも、順次処理の例に即したコードになっていることがわかるでしょう。最初に task_group オブジェクトを作成します。このオブジェクトは、ヒープやクラスのメンバー変数など、オブジェクトの有効期間を管理できる場所に保存する必要があります。次に、task_group::run を使用して、作業を行うタスク (スレッドではありません) のスケジュールを設定します。task_group::run はパラメーターとして関数記号を受け取り、その関数記号の有効期間を管理します。状態のパッケージ化に C++0x ラムダを使用すれば、事実上、プログラムを 2 行変更するだけで済みます。コードは次のようになります。

//a task_group member variable
task_group backgroundTasks;
void SomeFunction(int x, int y){
        backgroundTasks.run([x,y](){LongRunningOperation(x, y);});
}

Agents Library を使って作業を非同期にする

もう 1 つは、Agents Library を使用する方法です。これには、メッセージの受け渡しに基づくアプローチが必要です。変更するコード量はほぼ同じですが、主なセマンティクスに違いがあり、アプローチがエージェントベースであることは指摘しておく価値があるでしょう。タスクのスケジュールを設定するのではなく、メッセージを受け渡すパイプラインを構築し、データだけを含むメッセージを非同期に送信します。送信後はパイプライン自体にそのメッセージの処理を委託します。上記の例では、x と y を含むメッセージを送信します。作業は依然として別のスレッドで行われますが、同じパイプラインへの後続の呼び出しはキューに格納され、メッセージが順番に処理されます (処理の順序が保証されない task_group とは対照的です)。

まず、メッセージを保持する構造体が必要です。実際には、前述と同じ構造体を使用できますが、ここでは次のように名前を変更します。

struct LongRunningOperationMsg{
        LongRunningOperationMsg (int x, int y):m_x(x),m_y(y){}
        int m_x;
        int m_y;
}

次に、メッセージの送信先を宣言します。Agents Library では、"ターゲット" となる任意のメッセージ インターフェイスにメッセージを送信できますが、この場合に最も適しているのは call<T> でしょう。call<T> はメッセージを受け取り、そのメッセージをパラメーターとして受け取る関数記号を使用して構築されます。call の宣言と構造は、(ラムダを使用し) 次のようになります。

call<LongRunningOperationMsg>* LongRunningOperationCall = new
   call<LongRunningOperationMsg>([]( LongRunningOperationMsg msg)
{
LongRunningOperation(msg.x, msg.y);
})

この時点で SomeFunction に加える変更はほんの少しです。メッセージを構築し、そのメッセージを call オブジェクトに非同期に送信することが目標です。call は、メッセージが着信したときに、別のスレッドで呼び出されることになります。

void SomeFunction(int x, int y){
        asend(LongRunningOperationCall, LongRunningOperationMsg(x,y));
}

作業を UI スレッドに戻す

UI スレッドから作業を移動するだけでは、問題は半分しか解決しません。おそらく、LongRunningOperation の終了時にはなんらかの意味ある結果を得ようと考えているでしょうから、多くの場合、次の手順は作業を UI スレッドに戻すことです。採用するアプローチはアプリケーションごとに異なりますが、Visual Studio 2010 で提供されるライブラリでこれを実現する最も簡単な方法は、Agents Library に含まれるもう 1 つの API とメッセージ ブロックのペア、つまり try_receive と unbounded_buffer<T> を使用することです。

unbounded_buffer<T> を使用して、データと、潜在的に UI スレッドで実行する必要のあるコードを含むメッセージを格納できます。try_receive はブロック不可の API 呼び出しで、表示するデータが存在するかどうかの照会に使用できます。

たとえば、UI スレッドで画像を描画していたとすると、次のようなコードを使用して、InvalidateRect への呼び出しを行ってから、UI スレッドにデータを戻します。

unbounded_buffer<ImageClass>* ImageBuffer;
LONG APIENTRY MainWndProc(HWND hwnd, UINT uMsg,
  WPARAM wParam, LPARAM lParam)
{
    RECT rcClient;
    int i;
    ...
   ImageClass image;
   //check the buffer for images and if there is one there, display it.
   if (try_receive(ImageBuffer,image))
       DisplayImage(image);
   ...
}

ここでは、メッセージ ループの実装のような細かい処理を一部省略していますが、ここで紹介する技法を十分例示できていればさいわいです。このコラムのサンプル コードをチェックすることをお勧めします。サンプル コードには、ここで紹介する各手法が完全に機能する例として含まれています。

図 1 スレッドセーフではないクラス

samclass
Widget{
size_t m_width;
size_t m_height;
public:
Widget(size_t w, size_t h):m_width(w),m_height(h){};
size_t GetWidth(){
return m_width;
}
size_t GetHeight(){
return m_height;
}
void SetWidth(size_t width){
m_width = width;
}
void SetHeight(size_t height){
m_height = height;
}
};

シナリオ 2: メッセージ ブロックとエージェントを使って共有状態を管理する

マルチスレッド アプリケーションを開発する際によくあるもう 1 つの状況は、共有状態の管理です。具体的には、複数のスレッド間で通信を試みたり、複数のスレッド間でデータの共有を試みたりすると、即座に、共有状態の管理が処理すべき問題として持ち上がります。よく見受けられるアプローチは、単純にクリティカル セクションをオブジェクトに追加して、オブジェクトのデータ メンバーやパブリック インターフェイスを保護する方法ですが、すぐにメンテナンスが問題になり、場合によっては、パフォーマンスの問題にもつながることがあります。このシナリオでは、ロックを使用する単純な順次処理の例を取り上げ、次に Agents Library に含まれるメッセージ ブロックを使った代替方法を示します。

簡単な Widget クラスをロックする

図 1 は、幅と高さのデータ メンバーと、状態を変化させる簡単なメソッドを備えたスレッドセーフではない Widget クラスを示しています。

Widget クラスをスレッドセーフにする簡単な方法は、クリティカル セクションまたは読み取り/書き込みロックを使って、クラスのメソッドを保護することです。PPL には reader_writer_lock が含まれています。まず、図 2 に、PPL の reader_writer_lock を使用する簡単なアプローチへの明白な解決策を示します。

図 2 Parallel Pattern Library に含まれる reader_writer_lock の使用

class LockedWidget{
size_t m_width;
size_t m_height;
reader_writer_lock lock;
public:
LockedWidget (size_t w, size_t h):m_width(w),m_height(h){};
size_t GetWidth(){
auto lockGuard = reader_writer::scoped_lock_read(lock);
return m_width;
}
size_t GetHeight(){
auto lockGuard = reader_writer::scoped_lock_read(lock);
return m_height;
}
void SetWidth(size_t width){
auto lockGuard = reader_writer::scoped_lock(lock);
m_width = width;
}
void SetHeight(size_t height){
auto lockGuard = reader_writer::scoped_lock(lock)
m_height = height;
}
};

ここでは、reader_writer_lock をメンバー変数として追加してから、すべての該当メソッドを読み取りロックまたは書き込みロックのいずれかで修飾しています。また、例外処理の最中にロックがかかったままにならないよう scoped_lock オブジェクトも使用しています。これで、すべての Get メソッドに読み取りロックがかかり、すべての Set メソッドに書き込みロックがかかるようになります。このアプローチは技術上は適切に思えますが、実際の設計上は不適切で、複数のインターフェイスが組み合わさると各インターフェイスがスレッドセーフではなくなるため、全体的には脆弱です。具体的には、次のようなコードがあると、破綻状態に陥る可能性があります。

Thread1{
                SharedWidget.GetWidth();
                SharedWidget.GetHeight();
}
Thread2{
                SharedWidget.SetWidth();
                SharedWidget.SetHeight();
}

Thread1 と Thread2 での呼び出しはインターリーブされる可能性があるため、Thread1 が GetWidth 用に読み取りロックをかけた後、GetHeight が呼び出される前に、SetWidth と SetHeight が両方実行される可能性があります。そのため、データを保護するだけでなく、そのデータへのインターフェイスも適切に保つ必要があります。このような状態では、コードは正しく見え、エラーを追跡するのが非常に困難なため、競合状態としては最も油断のならない状況の 1 つです。このような状況でよく行われてきた簡単な解決策は、オブジェクト自体にロック メソッドを導入する方法です。あるいは、最悪でも、その Widget にアクセスするときに、開発者がロックをかけることを忘れないような場所にロックをしまっておきます。場合によっては、両方のアプローチを使用します。

簡単なのは、複数の呼び出しがインターリーブされる間にオブジェクトの状態を破損するような機能を公開しないで、複数のインターフェイスを安全にインターリーブできるようにするアプローチです。図 3 に示すように、GetDimensions メソッドと UpdateDimensions メソッドを用意して、インターフェイスを進化させることができます。このインターフェイスでは、安全ではないインターリーブをメソッドから公開することを許可していないため、意外な動作につながる可能性はほとんどなくなります。

図 3 GetDimensions メソッドと UpdateDimensions メソッドを備えたインターフェイス

struct WidgetDimensions
{
size_t width;
size_t height;
};
class LockedWidgetEx{
WidgetDimensions m_dimensions;
reader_writer_lock lock;
public:
LockedWidgetEx(size_t w, size_t h):
m_dimensions.width(w),m_dimensions.height(h){};
WidgetDimensions GetDimensions(){
auto lockGuard = reader_writer::scoped_lock_read(lock);
return m_dimensions;
}
void UpdateDimensions(size_t width, size_t height){
auto lockGuard = reader_writer::scoped_lock(lock);
m_dimensions.width = width;
m_dimensions.height = height;
}
};

メッセージ ブロックを使った共有状態の管理

ここで、Agents Library を使用すると共有状態の管理が容易になり、コードが少し堅牢になるところを見ていくことにしましょう。Agents Library に含まれるクラスのうち共有変数の管理に役立つ主なクラスは次のとおりです。overwrite_buffer<T> は、呼び出しを受けたときに、更新可能な値を 1 つ格納し、最新値のコピーを返します。single_assignment<T> は、呼び出しを受けたときに、1 つの値のコピーを格納して返します。ただし、定数と同様、代入できるのは 1 回のみです。unbounded_buffer<T> はメモリが許す限りアイテムを個数に制限なく格納します。呼び出しを受けたときは、FIFO キューのように、最も古いアイテムをキューから取り出して返します。

では、overwrite_buffer<T> から使ってみましょう。まず、Widget クラスの m_dimensions メンバー変数を overwrite_buffer<WidgetDimensions> に置き換えた後、メソッドから明示的なロックを取り除いて、対応する送受信呼び出しに置き換えます。まだ、インターフェイスを安全にすることについての検討は必要ですが、データをロックすることを覚えておく必要はなくなりました。以下に、ここでの説明を実現するコードを示します。実際、コードの行数はロックを使用するバージョンよりもやや少なくなり、順次処理のバージョンと同じです。

class AgentsWidget{
    overwrite_buffer<WidgetDimensions> m_dimensionBuf;
public:
    AgentsWidget(size_t w, size_t h){
        send(&m_dimensionBuf,WidgetDimensions(w,h));
    };
    WidgetDimensions GetDimensions(){
        return receive(&m_dimensionBuf);
    }
    void UpdateDimensions(size_t width, size_t height){
        send(&m_dimensionBuf,WidgetDimensions(w,h));
    }
};

この実装と reader_writer_lock の実装とでは、その意味合いがやや異なります。overwrite_buffer では、Dimensions の呼び出し中に、UpdateDimensions の呼び出しを行うことができます。これにより、これらの呼び出しは事実上ブロックされませんが、GetDimensions の呼び出し結果が最新の結果とわずかに異なることがあります。GetDimensions を実行した直後に結果が最新状態ではなくなる可能性があるため、ロックを使用するバージョンと同じ問題が存在していることは指摘しておかなければなりません。ここで行ったのは、他の操作をブロックする呼び出しを取り除いただけです。

Widget クラスにとっては unbounded_buffer も有効です。前述の意味合いのわずかな違いがきわめて重要だった場合を考えてください。たとえば、あるオブジェクトのインスタンスがあり、このインスタンスには必ず一度に 1 つのスレッドからしかアクセスしないようにする場合、そのオブジェクトへのアクセスを管理する、オブジェクトの保持役として unbounded_buffer を使用できます。このことを Widget クラスに当てはめるには、m_dimensions を削除して unbounded_buffer<WidgetDimension> に置き換えます。このバッファを使用するには、GetDimensions と UpdateDimensions への呼び出しを使います。ここでの課題は、Widget の値を更新中に Widget から値を取得できないようにすることです。これを実現するには unbounded_buffer を空にします。その結果、更新を待機している間 GetDimension への呼び出しがブロックされます。この例を 図 4 に示します。GetDimensions と UpdateDimensions はどちらもブロックされ、寸法の変数への排他アクセスを待機します。

図 4 Unbounded_Buffer を空にする

class AgentsWidget2{
unbounded_buffer<WidgetDimensions> m_dimensionBuf;
public:
AgentsWidget2(size_t w, size_t h){
send(&m_dimensionBuf,WidgetDimensions(w,h));
};
WidgetDimensions GetDimensions(){
//get the current value
WidgetDimensions value = receive(&m_dimensionBuf);
//and put a copy of it right back in the unbounded_buffer
send(&m_dimensionBuf,value);
//return a copy of the value
return WidgetDimensions(value);
}
void UpdateDimensions (size_t width, size_t height){
WidgetDimensions oldValue = receive(&m_dimensionBuf);
send(&m_dimensionBuf,WidgetDimensions(width,height));
}
};

データへのアクセスの調整

Widget クラスについて、もう 1 つ重視していることがあります。つまり、同時にアクセスできるメソッドやデータが、"安全に" 連携できるようにすることが重要です。多くの場合、こうした連携を実現するには、メソッドやオブジェクトをロックするのではなく、状態へのアクセスを調整します。純粋に "コード行数" の点では、ロックを使用する例と比べてもそれほど大きなメリットはなく、特に 2 番目の例ではわずかに行数が増えています。しかし、安全性の高い設計というメリットが得られます。また、少し考えるだけで、オブジェクトの内部状態を "破損" しないように、順次処理のインターフェイスを変更できます。Widget の例では、メッセージ ブロックを使用してこれを実現し、安全性の高い方法で状態を保護できました。将来 Widget クラスにメソッドや機能を追加しても、ここで設定した内部同期を破損する可能性はあまりありません。メンバーをロックする場合は、クラスにメソッドを追加する際に、ロックをかけるのを忘れがちです。しかし、操作をメッセージ受け渡しモデルに移行し、overwrite_buffer などのメッセージ ブロックを自然な方法で使用すると、データやクラスの同期を維持するのが容易になります。

シナリオ 3: スレッド ローカルな累積と初期化への Combinable の使用

ロックやメッセージ ブロックを使ってオブジェクトへのアクセスを保護する 2 番目のシナリオは、あまり頻繁にはアクセスされない、処理負荷のきわめて高いオブジェクトでは実に適切に機能します。この例を読んでいるときに、同期が取られる Widget を緊密なループ (または並列ループ) の内部で使用すると、パフォーマンスに問題が生じるのではないかと考えたとしたら、おそらくそれは間違いではありません。それは、共有状態を保護することで問題が生じやすくなるためです。本当の意味で状態を共有する、完全に汎用化されたアルゴリズムやオブジェクトには、残念ながら、アクセスの調整かロックの導入以外に多くの選択肢はありません。しかし、共有状態への依存度を下げるようにコードやアルゴリズムをリファクタリングする方法はほぼ確実に見つかります。いったんこれを行ったら、オブジェクトから Parallel Pattern Library に含まれる combinable<T> を呼び出すという、明確かつ共通のパターンのいくつかが実に役立ちます。

combinable<T> は、次の 3 つの幅広いユース ケースをサポートする同時実行コンテナーです。1 つは、スレッドローカルな変数を保持するか、スレッドローカルな初期化を行う場合、1 つは、スレッドローカルな変数に対して関連するバイナリ演算 (sum、min、mix など) を単独または組み合わせて実行する場合、もう 1 つは、(リストを相互に結合するような) 操作によって、それぞれスレッドローカルなコピーにアクセスする場合です。ここでは、それぞれのユース ケースについて説明し、その使用方法を示す例を示します。

スレッドローカルな変数の保持またはスレッドローカルな初期化の実行

combinable<T> の最初のユース ケースとして、スレッドローカルな変数を保持することを挙げました。グローバルな状態のスレッドローカルなコピーを格納することは比較的よく行われます。たとえば、サンプル パック (code.msdn.microsoft.com/concrtextras) に含まれるカラー対応レイ トレーサー アプリケーションや、.NET 4.0 による並列開発のサンプル (code.msdn.microsoft.com/ParExtSamples) には、並列性を目に見えるようにするため、スレッドによって各行に色付けするオプションがあります。このデモのネイティブ版では、スレッドローカルな色を保持する combinable オブジェクトを使用して、この色付けを行っています。

当然、スレッドローカル記憶域 (TLS) を使用すればスレッドローカルな変数を保持できますが、これにはいくつかデメリットがあります。最も顕著なデメリットは有効期間の管理と可視性で、これらは連動します。TLS を使用するには、まず、TlsAlloc を使ってインデックスを割り当てる必要があります。次に、オブジェクトを割り当ててから、TlsSetValue を使って、インデックスにオブジェクトへのポインターを格納します。その後、スレッドの終了時に、必ず、オブジェクトの割り当てを解除する必要があります (TlsFree が自動的に呼び出されます)。スレッドごとにこれを 1 回か 2 回行い、早期終了や例外によるリークが生じていないことを確認することはそれほど大変なことではありませんが、アプリケーションでこのような作業が大量に必要になる場合は、別のアプローチを検討する方がよいかもしれません。

combinable<T> を使用して、スレッドローカルな変数を保持できます。ただし、個別のオブジェクトの有効期間は combinable<T> アイテムの有効期間に結び付けられます。また、多くの初期化が自動的に行われます。スレッドローカルな変数にアクセスする場合は combinable::local メソッドを呼び出すだけです。このメソッドは、ローカル オブジェクトへの参照を返します。以下に task_group の使用例を示しますが、これは Win32 スレッドを使用して実行できます。

combinable<int> values;

auto task = [&](){
                values.local() = GetCurrentThreadId();
                printf("hello from thread: %d\n",values.local());
};

task_group tasks;

tasks.run(task);

//run a copy of the task on the main thread
task();

tasks.wait();

スレッドローカルな初期化も combinable を使用して実現できます。たとえば、各スレッドで使用するライブラリ呼び出しを初期化する必要があれば、コンストラクター内で初期化を実行するクラスを作成できます。その後、各スレッドでの初回使用時にライブラリ呼び出しが行われますが、2 回目以降の使用時には呼び出しがスキップされます。以下に例を示します。

class ThreadInitializationClass
{
public:
      ThreadInitializationClass(){
            ThreadInitializationRoutine();
      };
};

...
//a combinable object will initialize these
combinable<ThreadInitializationClass> libraryInitializationToken;
            
...
//initialize the library if it hasn't been already on this thread
ThreadInitializationClass& threadInit = libraryInitalizationToken.local();

並列ループ内での減算の実行

combinable オブジェクトのもう 1 つの主なシナリオは、スレッドローカルな減算 (スレッドローカルな累積) を行うことです。具体的には、combinable を使ってループを並列処理する際、または再帰的に並列操作を行う際に、特定の種類の競合状態を回避できます。以下に、実に簡単な例を示します。この例は、スピードアップを目的としたものではありません。以下のコードは単純なループを示していますが、sum 変数にアクセスする場合を除けば、parallel_for_each を使って並列に処理できるように思えます。

 

int sum = 0;
for (vector<int>::iterator it = myVec.begin(); it != myVec.end(); ++it) {
    int element = *it;
    SomeFineGrainComputation(element);
    sum += element;
}

ここで、parallel_for_each 内でロックをかけるのではなく、combinable オブジェクトを使用して、スレッドローカルに合計を計算できます。ループ内でロックをかけると、スピードアップのチャンスが失われます。

combinable<int> localSums;
 
parallel_for_each(myVec.begin(),  myVec.end(), [&localSums] (int element) {
   SomeFineGrainComputation(element);
   localSums.local() += element;
});

競合状態は正しく回避できるようになりましたが、スレッドローカルな合計のコレクションは localSums オブジェクトに格納されているため、最終値を取り出す必要があります。これを実現するには、以下のようにバイナリ関数記号を受け取る combine メソッドを使用します。

int sum = localSums.combine(std::plus<int>);

combinable<T> の 3 つ目のユース ケースは、スレッドローカルなコピーにアクセスし、そのコピーに対してなんらかの操作 (クリーンアップやエラー チェックなど) を実行する必要がある場合です。このユースケースでは、combine_each メソッドを使用します。もう 1 つ、さらに興味深い例は、combinable オブジェクトが combinable<list<T>> の場合です。この場合、スレッド内で std::lists または std::sets を構築します。std::lists の場合は、list::splice を使ってリストを容易に相互連結できます。std::sets の場合は、set::insert を使ってリストを挿入できます。

シナリオ 4: 既存のバックグラウンド スレッドからエージェントまたはタスクへの変換

アプリケーション内にはバックグラウンド スレッドまたはワーカー スレッドが既に含まれているものとします。バックグラウンド タスクを PPL のタスクまたはエージェントに変換する理由には非常に適切なものがいくつかあり、このような変換は比較的簡単です。変換を行う大きなメリットには、以下のようなものがあります。

構成の可能性とパフォーマンス: ワーカー スレッドでコンピューティング処理が集中的に行われるため、PPL や Agents Library 内で別のスレッドを使用することを検討している場合は、バックグラウンド スレッドをワーカー タスクに変換すれば、そのタスクを他のタスクと共にランタイムに組み込むことができ、システムが煩雑になるのを防ぐことができます。

取り消しと例外処理: スレッド上での作業を容易に取り消せるようにする場合、または例外を処理するメカニズムをしっかりと作成する場合は、こうした機能が task_group に組み込まれています。

制御フローと状態管理: スレッドの状態 (開始、完了など) を管理する必要がある場合、または状態をワーカー スレッドから効果的に分離できないオブジェクトがある場合。エージェントを実装すると役立つことがあります。

取り消しと例外処理をサポートする task_group

最初のシナリオでは、task_group を使って作業のスケジュールを設定するために必要なことを説明しました。基本的には、(ラムダ、std::bind、またはカスタム関数オブジェクト) を使用して作業を関数記号にパッケージ化し、task_group::run メソッドを使ってスケジュールを設定する必要があります。このとき、取り消しと例外処理のセマンティクスについては説明しませんでしたが、実際には関連性があります。

図 5 MyAgentClass の実装

class MyAgentClass : public agent{
public:
MyAgentClass (){
}
AgentsWidget widget;
void run(){
//run is started asynchronously when agent::start is called
//...
//set status to complete
agent::done();
}
};

まず、簡単なセマンティクスから説明します。コードから task_group::cancel を呼び出すか、タスクからキャッチされない例外がスローされると、事実上その task_group の取り消しが行われます。取り消しが行われると、その task_group でまだ開始されていないタスクは開始されなくなり、task_group でスケジュールが設定された作業を容易かつ迅速にキャンセルできます。この取り消しは、実行中のタスクやブロックされているタスクに割り込むことはありません。そのため、実行中のタスクでは task_group::is_canceling メソッドまたはヘルパー関数 is_current_task_group_canceling を使用して、取り消しの状態を照会できます。

以下に、簡単な例を示します。

task_group tasks;   
tasks.run([](){
    ...
    if(is_current_task_group_canceling())
    {
        //cleanup then return
        ...
        return;
    }
});
tasks.cancel();
tasks.wait();

task_group でのキャッチされない例外はその task_group での取り消しのトリガーとなるため、例外処理も取り消しに影響します。キャッチされない例外が存在する場合、task_group は実際には std::exception_ptr を使用して、その例外をスローしたスレッドでパッケージ化します。その後、task_group::wait が呼び出されたときに、wait を呼び出したスレッドで例外が再スローされます。

非同期エージェントの実装

Agents Library では、task_group を使用する代わりに、スレッドをエージェント ベースのクラスに置き換える代替策があります。スレッドにたくさんのスレッド固有の状態やオブジェクトがある場合、このようなシナリオにはエージェントが適しています。抽象エージェント クラスは、アクター パターンの実装です。このクラスの用途は、エージェントから派生した独自のクラスを実装し、アクター (つまり、スレッド) のすべての状態をそのエージェントにカプセル化することです。パブリックにアクセス可能にするフィールドが複数存在する場合は、そのフィールドをメッセージ ブロック、またはソースとターゲットとして公開し、メッセージの受け渡しを使用してエージェントと通信することをお勧めします。

エージェントを実装するには、クラスを agent 基本クラスから派生し、仮想メソッド run をオーバーライドします。エージェントは agent::start を呼び出して起動します。その結果、スレッドによく似たタスクとして run メソッドが呼び出されます。この場合のメリットは、スレッドローカルな状態をクラスに格納できるようになることです。その結果として、特に状態がメッセージ ブロックに格納されている場合に、複数のスレッド間での状態の同期が容易になります。図 5 に、AgentsWidget 型のメンバー変数をパブリックに公開する実装の例を示します。

run メソッドの終了時に処理を完了するため、エージェントのステータスを設定していることに注意してください。これにより、エージェントを開始できるだけでなく、待機することもできます。さらに、agent::status を呼び出して、エージェントの現在ステータスを照会することもできます。以下のコードに示すように、エージェント クラスの開始と待機は実に簡単です。

 

MyAgentClass MyAgent;

//start the agent
MyAgent.start();

//do something else
...

//wait for the agent to finish
MyAgent.wait(&MyAgent);

おまけ: parallel_sort を使った並列並べ替え

最後に、並列処理を容易にする別の可能性を紹介しますが、現時点では PPL または Agents Library からではなく、サンプル パック (code.msdn.microsoft.com/concrtextras) から入手できます。並列クイックソートは、タスクを使った分割統治アルゴリズムを再帰的に並列処理する方法を説明するための一例です。並列クイックソートの実装は、サンプル パックに含まれています。並列ソートは、比較演算が文字列どうしのようにやや負荷が高い場合に大量の項目を並べ替える処理をスピードアップすることができます。少数の項目を並べ替える場合、または integer や double のような組み込みの型を並べ替える場合は、おそらくスピードアップは望めません。以下に使用例を示します。

//from the sample pack
#include "parallel_algorithms.h"
int main()

using namespace concurrency_extras;
{
                vector<string> strings;
 
                //populate the strings
                ...
                parallel_sort(strings.begin(),strings.end());
}

まとめ

このコラムによって、Visual Studio 2010 に含まれる並列処理ライブラリをプロジェクトに適用する際に、単にコンピューター処理を集中的に行うループをスピードアップするために parallel_for やタスクを使用することにとどまらず、皆さんの視野が広がることを願っています。MSDN (msdn.microsoft.com/library/dd504870(VS.100).aspx) のドキュメントやサンプル パック (code.msdn.microsoft.com/concrtextras) には、並列ライブラリについて紹介し、その使い方を説明する役立つ例がたくさんあります。これらをチェックすることをお勧めします。

Rick Molloy は、マイクロソフトの並列コンピューティング プラットフォーム チームのプログラム マネージャーです。**