次の方法で共有


Azure Queue Storage を F# で始める方法

Azure Queue Storage を使用すると、アプリケーション コンポーネント間のクラウド メッセージングが提供されます。 拡張性を重視してアプリケーションを設計する場合、通常、アプリケーション コンポーネントを個別に拡張できるように分離します。 Queue Storage は、アプリケーション コンポーネントがクラウド、デスクトップ、オンプレミスのサーバー、モバイル デバイスのいずれで実行されている場合でも、アプリケーション コンポーネント間の通信に非同期メッセージングを提供します。 Queue Storage ではまた、非同期タスクの管理とプロセス ワークフローの構築もサポートします。

このチュートリアルについて

このチュートリアルでは、Azure Queue Storage を使用していくつかの一般的なタスクを実行するための F# コードを記述する方法を示します。 紹介するタスクは、キューの作成と削除、キュー メッセージの追加、読み取り、削除です。

キュー ストレージの概念の概要については、キュー ストレージの .NET ガイドをご覧ください。 これらのチュートリアルでは、簡単にするために接続文字列を使用して Azure で認証しています。 最適なセキュリティを確保するには、Microsoft Entra ID とマネージド ID を使用する必要があります。

前提条件

このガイドを使用するには、まず Azure ストレージ アカウントを作成する必要があります。 また、このアカウントのストレージ アクセス キーも必要になります。

F# スクリプトを作成して F# インタラクティブを開始する

この記事のサンプルは、F# アプリケーションと F# スクリプトのどちらでも使用できます。 F# スクリプトを作成するには、.fsx 拡張子のファイルを作成します。たとえば、F# 開発環境では queues.fsx です。

スクリプトを実行する方法

F# インタラクティブ (dotnet fsi) は、対話形式で起動することも、コマンド ラインから起動してスクリプトを実行することもできます。 コマンド ラインの構文は次のとおりです

> dotnet fsi [options] [ script-file [arguments] ]

スクリプトにパッケージを追加する

次に、#rnuget:package name を使用して Azure.Storage.Queues パッケージと open 名前空間をインストールします。たとえば、次のようになります。

> #r "nuget: Azure.Storage.Queues"
open Azure.Storage.Queues

名前空間宣言の追加

次の open ステートメントを queues.fsx ファイルの先頭に追加します。

open Azure.Storage.Queues // Namespace for Queue storage types
open System
open System.Text

接続文字列を取得する

このチュートリアルでは、Azure Storage 接続文字列が必要になります。 接続文字列の詳細については、ストレージ接続文字列の構成に関する記事をご覧ください。

このチュートリアルでは、次のようにスクリプトに接続文字列を入力します。

let storageConnString = "..." // fill this in from your storage account

キュー サービス クライアントを作成する

QueueClient クラスを使用すると、Queue Storage に格納されているキューを取得できます。 クライアントを作成する方法の 1 つを次に示します。

let queueClient = QueueClient(storageConnString, "myqueue")

これで、Queue Storage に対してデータの読み取りと書き込みを実行するコードを記述する準備が整いました。

キューを作成する

この例では、まだキューがない場合にこれを作成する方法を示します。

queueClient.CreateIfNotExists()

メッセージをキューに挿入する

既存のキューにメッセージを挿入するには、最初に新しい Message を作成します。 次に、SendMessage メソッドを呼び出します。 Message は、次のように文字列 (UTF-8 形式) または byte 配列から作成できます。

queueClient.SendMessage("Hello, World") // Insert a String message into a queue
queueClient.SendMessage(BinaryData.FromBytes(Encoding.UTF8.GetBytes("Hello, World"))) // Insert a BinaryData message into a queue

次のメッセージを覗く

PeekMessage メソッドを呼び出すと、キューの先頭にあるメッセージを、キューから削除せずにピークできます。

let peekedMessage = queueClient.PeekMessage()
let messageContents = peekedMessage.Value.Body.ToString()

処理する次のメッセージを取得する

キューの先頭にあるメッセージを取得して処理するには、ReceiveMessage メソッドを呼び出します。

let updateMessage = queueClient.ReceiveMessage().Value

後で、DeleteMessage を使用してメッセージの処理が成功したことを示します。

キューに配置されたメッセージの内容を変更する

キュー内で取得済みメッセージの内容をその場で変更できます。 メッセージが作業タスクを表している場合は、この機能を使用して、作業タスクの状態を更新できます。 次のコードでは、キュー メッセージを新しい内容に更新し、表示タイムアウトを設定して、60 秒延長します。 これにより、メッセージに関連付けられている作業の状態が保存され、クライアントにメッセージの操作を続行する時間が 1 分与えられます。 この方法を使用すると、キュー メッセージに対する複数の手順から成るワークフローを追跡でき、ハードウェアまたはソフトウェアの問題が原因で処理手順が失敗した場合に最初からやり直す必要がなくなります。 通常は、さらに再試行回数を保持し、メッセージの再試行回数が特定の回数を超えた場合はこれを削除するようにします。 こうすることで、処理するたびにアプリケーション エラーをトリガーするメッセージから保護されます。

queueClient.UpdateMessage(
    updateMessage.MessageId,
    updateMessage.PopReceipt,
    "Updated contents.",
    TimeSpan.FromSeconds(60.0))

次のメッセージをキューから取り出す

コードでは、2 つの手順でキューからメッセージをデキューします。 ReceiveMessageを呼び出すと、キュー内の次のメッセージを取得します。 ReceiveMessage から返されたメッセージは、このキューからメッセージを読み取る他のコードから参照できなくなります。 既定では、このメッセージを参照できない状態は 30 秒間続きます。 また、キューからのメッセージの削除を完了するには、DeleteMessage を呼び出す必要があります。 このようにメッセージを 2 つの手順で削除することで、ハードウェアまたはソフトウェアの問題が原因でコードによるメッセージの処理が失敗した場合に、コードの別のインスタンスで同じメッセージを取得し、もう一度処理することができます。 ご自分のコードで、メッセージが処理された直後に DeleteMessage を呼び出します。 これまでに説明したすべての Queue メソッドには、Async の代替メソッドがあります。

let deleteMessage = queueClient.ReceiveMessage().Value
queueClient.DeleteMessage(deleteMessage.MessageId, deleteMessage.PopReceipt)

一般的な Queue Storage API を使った非同期ワークフローの使用

この例では、一般的な Queue Storage API を使って非同期ワークフローを使用する方法を示します。

async {
    let! exists = queueClient.CreateIfNotExistsAsync() |> Async.AwaitTask

    let! delAsyncMessage = queueClient.ReceiveMessageAsync() |> Async.AwaitTask

    // ... process the message here ...

    // Now indicate successful processing:
    queueClient.DeleteMessageAsync(delAsyncMessage.Value.MessageId, delAsyncMessage.Value.PopReceipt) |> Async.AwaitTask
}

メッセージをデキューするための追加オプション

キューからのメッセージの取得をカスタマイズする方法は 2 つあります。 1 つ目の方法では、(最大 32 個の) メッセージのバッチを取得できます。 2 つ目の方法では、コードで各メッセージを完全に処理できるように、非表示タイムアウトの設定を長くまたは短くすることができます。 次のコード例では、ReceiveMessages を使用して 1 回の呼び出しで 20 件のメッセージを取得してから、各メッセージを処理します。 また、各メッセージの非表示タイムアウトを 5 分に設定します。 この 5 分間は、すべてのメッセージに対して同時に開始されます。そのため、ReceiveMessages への呼び出しから 5 分が経過すると、削除されていないすべてのメッセージが再び表示されます。

for dequeueMessage in queueClient.ReceiveMessages(20, Nullable(TimeSpan.FromMinutes(5.))).Value do
        // Process the message here.
        queueClient.DeleteMessage(dequeueMessage.MessageId, dequeueMessage.PopReceipt)

キューの長さを取得する

キュー内のメッセージの概数を取得できます。 GetProperties メソッドによって、メッセージ数などのキューの属性を取得するように Queue サービスに要求します。 ApproximateMessagesCount プロパティを使用すると、GetProperties メソッドによって取得された最後の値を返すことができます。

let properties = queueClient.GetProperties().Value
let count = properties.ApproximateMessagesCount

キューを削除する

キューおよびキューに格納されているすべてのメッセージを削除するには、キュー オブジェクトの Delete メソッドを呼び出します。

queueClient.DeleteIfExists()

古いライブラリから移行している場合は、既定で Base64 でエンコードされたメッセージになりますが、新しいライブラリの場合はそうではありません。その方がパフォーマンスが向上します。 エンコードを設定する方法については、「MessageEncoding」を参照してください。

こちらもご覧ください