方法: Context クラスを使用して協調セマフォを実装する
このトピックでは、concurrency::Context クラスを使用して協調セマフォ クラスを実装する方法について説明します。
解説
Context
クラスを使用すると、現在の実行コンテキストをブロックまたは生成できます。 現在のコンテキストをブロックまたは生成する機能は、リソースを使用できないことが原因で現在のコンテキストを続行できない場合に有用です。 "セマフォ" は、リソースが使用できるようになるまで現在の実行コンテキストが待機する必要がある状況を示す 1 つの例です。 セマフォは、クリティカル セクション オブジェクトと同様に、1 つのコンテキストのコードがリソースに対して排他的にアクセスすることを可能にする同期オブジェクトです。 ただし、クリティカル セクション オブジェクトとは異なり、セマフォは、複数のコンテキストが並列的にリソースにアクセスできるようにします。 コンテキストの数が最大数に達してセマフォがロックされた場合、追加のコンテキストは、別のコンテキストがロックを解放するのを待機する必要があります。
semaphore クラスを実装するには
semaphore
という名前のクラスを宣言します。 このクラスにpublic
セクションとprivate
セクションを追加します。
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
private:
};
semaphore
クラスのprivate
セクションで、セマフォのカウントを保持する std::atomic 変数と、セマフォを取得するために待機する必要があるコンテキストを保持する concurrency::concurrent_queue オブジェクトを宣言します。
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
public
クラスのsemaphore
セクションで、コンストラクターを実装します。 このコンストラクターは、ロックを同時に保持できるコンテキストの最大数を指定するlong long
値を受け取ります。
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
public
クラスのsemaphore
セクションで、acquire
メソッドを実装します。 このメソッドは、分割不可能な操作として、セマフォのカウントをデクリメントします。 セマフォのカウントが負になった場合は、現在のコンテキストを待機キューの末尾に追加し、concurrency::Context::Block メソッドを呼び出して現在のコンテキストをブロックします。
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
public
クラスのsemaphore
セクションで、release
メソッドを実装します。 このメソッドは、分割不可能な操作として、セマフォのカウントをインクリメントします。 インクリメント操作の前にセマフォのカウントが負になる場合は、ロックを待機しているコンテキストが 1 つ以上存在することを示します。 この場合は、待機キューの先頭にあるコンテキストのブロックを解除します。
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
Context::Yield();
}
// Unblock the context.
waiting->Unblock();
}
}
例
この例の semaphore
クラスは協調的に動作します。それは、Context::Block
メソッドと Context::Yield
メソッドによって実行が生成され、ランタイムが他のタスクを実行できるらめです。
acquire
メソッドはカウンターをデクリメントしますが、別のコンテキストが release
メソッドを呼び出す前にコンテキストを待機キューに追加できない可能性があります。 つまり、release
メソッドは、concurrency::Context::Yield メソッドを呼び出すスピン ループを使用して、acquire
メソッドがコンテキストの追加処理を完了するのを待ちます。
release
メソッドは、Context::Unblock
メソッドが acquire
メソッドを呼び出す前に、Context::Block
メソッドを呼び出すことができます。 ランタイムではこれらのメソッドが任意の順序で呼び出されることが考慮されているため、この競合状態に対する対策は必要ありません。 release
メソッドが Context::Unblock
を呼び出す前に同じコンテキストに対して acquire
メソッドが Context::Block
を呼び出した場合、このコンテキストは非ブロック状態のままになります。 ランタイムでは、Context::Block
の各呼び出しが対応する Context::Unblock
の呼び出しと一致することのみが求められます。
次の例は、完全な semaphore
クラスを示しています。 wmain
関数に、このクラスの基本的な使用法が示されています。 wmain
関数では、concurrency::parallel_for アルゴリズムを使用して、セマフォへのアクセスを必要とするいくつかのタスクを作成しています。 3 つのスレッドがいつでもロックを保持できるため、いくつかのタスクは、別のタスクが完了してロックを解除するのを待機する必要があります。
// cooperative-semaphore.cpp
// compile with: /EHsc
#include <atomic>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
Context::Yield();
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
int wmain()
{
// Create a semaphore that allows at most three threads to
// hold the lock.
semaphore s(3);
parallel_for(0, 10, [&](int i) {
// Acquire the lock.
s.acquire();
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
// Release the lock.
s.release();
});
}
この例では、次のサンプル出力が生成されます。
In loop iteration 5...
In loop iteration 0...
In loop iteration 6...
In loop iteration 1...
In loop iteration 2...
In loop iteration 7...
In loop iteration 3...
In loop iteration 8...
In loop iteration 9...
In loop iteration 4...
concurrent_queue
クラスの詳細については、「並列コンテナーと並列オブジェクト」を参照してください。 parallel_for
アルゴリズムの詳細については、「並列アルゴリズム」を参照してください。
コードのコンパイル
コード例をコピーし、Visual Studio プロジェクトに貼り付けるか、cooperative-semaphore.cpp
という名前のファイルに貼り付けてから、Visual Studio のコマンド プロンプト ウィンドウで次のコマンドを実行します。
cl.exe /EHsc cooperative-semaphore.cpp
信頼性の高いプログラミング
Resource Acquisition Is Initialization (RAII) パターンを使用して、semaphore
オブジェクトへのアクセスを特定のスコープに制限できます。 RAII パターンでは、データ構造はスタック上に割り当てられます。 データ構造は、作成されたときにリソースを初期化または取得し、破棄されたときにそのリソースを破棄または解放します。 RAII パターンでは、外側のスコープが終了する前に、常にデストラクターが呼び出されます。 したがって、例外がスローされた場合や、関数に複数の return
ステートメントが含まれている場合でも、リソースは適切に管理されます。
次の例では、scoped_lock
クラスの public
セクションに定義されている、semaphore
という名前のクラスを定義しています。 この scoped_lock
クラスは、concurrency::critical_section::scoped_lock クラスおよび concurrency::reader_writer_lock::scoped_lock クラスに似ています。 semaphore::scoped_lock
クラスのコンストラクターは特定の semaphore
オブジェクトへのアクセスを取得し、デストラクターはこのオブジェクトへのアクセスを解放します。
// An exception-safe RAII wrapper for the semaphore class.
class scoped_lock
{
public:
// Acquires access to the semaphore.
scoped_lock(semaphore& s)
: _s(s)
{
_s.acquire();
}
// Releases access to the semaphore.
~scoped_lock()
{
_s.release();
}
private:
semaphore& _s;
};
次の例では、RAII を使用して関数から制御が返される前にセマフォが確実に解放されるようにするために、parallel_for
アルゴリズムに渡される処理関数の本体に変更を加えています。 これにより、処理関数は例外セーフとなります。
parallel_for(0, 10, [&](int i) {
// Create an exception-safe scoped_lock object that holds the lock
// for the duration of the current scope.
semaphore::scoped_lock auto_lock(s);
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
});