使用 WCF 通道模型在 Oracle 数据库中接收基于轮询的数据更改消息

可以将适用于 Oracle 数据库的 Microsoft BizTalk 适配器配置为轮询 Oracle 数据库表或视图以查找任何数据更改。 若要执行此类轮询操作,适配器会定期对 Oracle 表或视图执行 SQL 查询,后跟可选的 PL/SQL 代码块。 然后,Oracle 数据库适配器会将 SQL 查询的结果作为入站 POLLINGSTMT 操作中的强类型结果集返回给代码。 有关用于使用 Oracle 数据库适配器在 Oracle 数据库上配置和执行轮询的机制的详细信息,请参阅 在 Oracle 数据库适配器中接收基于轮询的数据更改消息。 强烈建议在继续之前先阅读本主题。

通过在 OracleDBBinding 实例上设置绑定属性,将 Oracle 数据库适配器配置为轮询和 Oracle 数据库表或视图。 在 WCF 通道模型中,使用此绑定生成一个通道侦听器,从中可以获取 IInputChannel 通道以从适配器接收 POLLINGSTMT 操作。

有关如何在 WCF 中使用 IInputChannel 接收操作的概述,请参阅 服务 Channel-Level 编程

本主题中的各节提供的信息有助于使用 WCF 通道模型对 Oracle 数据库表和视图执行轮询。

使用 POLLINGSTMT 请求消息

适配器对代码调用 POLLINGSTMT 操作以轮询 Oracle 数据库。 也就是说,适配器发送通过 IInputChannel 通道形状收到的 POLLINGSTMT 请求消息。 POLLINGSTMT 请求消息包含 PollingStatement 绑定属性指定的查询的结果集。 可以通过以下两种方式之一使用 POLLINGSTMT 消息:

  • 若要使用节点值流式处理消息,必须对响应消息调用 WriteBodyContents 方法,并向其传递实现节点值流式处理功能的 XmlDictionaryWriter

  • 若要使用节点流式处理消息,可以对响应消息调用 GetReaderAtBodyContents 以获取 XmlReader

    通常使用节点值流式处理来使用包含 Oracle LOB 数据列的结果集。

    有关 POLLINGSTMT 操作的消息结构的详细信息,请参阅 轮询操作的消息架构

    有关 Oracle 数据库适配器如何支持对 LOB 数据进行流式处理的详细信息,请参阅 在 Oracle 数据库适配器中流式处理大型对象数据类型

    有关在代码中实现节点值流式处理以支持 LOB 数据的端到端流式处理的详细信息,请参阅 使用 WCF 通道模型流式处理 Oracle Database LOB 数据类型

关于本主题中使用的示例

本主题中的示例使用 SCOTT。ACCOUNTACTIVITY 表和 SCOTT。ACCOUNT_PKG。PROCESS_ACTIVITY 函数。 示例提供了生成这些项目的脚本。 该示例执行以下操作:

  • 作为轮询语句的一部分,从 ACCOUNTACTIVITY 表中选择所有记录,并在控制台上显示。

  • 作为投票后语句的一部分,该示例调用 PROCESS_ACTIVITY 函数,该函数将所有记录从 ACCOUNTACTIVITY 表移动到 ACTIVITYHISTORY 表。

  • 对 ACCOUNTACTIVITY 表的后续轮询不会返回任何记录。 但是,如果希望示例在轮询操作过程中返回更多记录,则必须在 ACCOUNTACTIVITY 表中插入一些记录。 为此,可以运行示例随附的 more_activity_data.sql 脚本。

    有关示例的详细信息,请参阅 适配器示例

如何使用 IInputChannel 轮询 Oracle 数据库?

若要使用 WCF 通道模型轮询 Oracle 数据库表或视图以接收数据更改消息,请执行以下步骤。

使用 IInputChannel 接收数据更改的消息

  1. 在 Visual Studio 中创建 Visual C# 项目。 对于本主题,请创建控制台应用程序。

  2. 在解决方案资源管理器中添加对 、Microsoft.ServiceModel.ChannelsSystem.ServiceModel和 的Microsoft.Adapters.OracleDBSystem.Runtime.Serialization引用。

  3. 打开 Program.cs 文件并添加以下命名空间:

    • Microsoft.Adapters.OracleDB

    • Microsoft.ServiceModel.Channels

    • System.ServiceModel

    • System.ServiceModel.Description

    • System.ServiceModel.Channels

    • System.Xml

    • System.Runtime.Serialization

    • System.IO

    • Microsoft.ServiceModel.Channels.Common

  4. 创建 OracleDBBinding 实例并设置配置轮询所需的绑定属性。 至少必须设置 InboundOperationTypePollingStatementPollingInterval 绑定属性。 在此示例中,还设置了 PostPollStatement 绑定属性。 有关用于配置轮询的绑定属性的详细信息,请参阅 在 Oracle 数据库适配器中接收基于轮询的数据更改消息

    OracleDBBinding binding = new OracleDBBinding();  
    binding.InboundOperationType = InboundOperation.Polling;  
    binding.PollingInterval = 30;  
    binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";  
    binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;"  
    
  5. 创建绑定参数集合并设置凭据。

    ClientCredentials credentials = new ClientCredentials();  
    credentials.UserName.UserName = "SCOTT";  
    credentials.UserName.Password = "TIGER";  
    
    BindingParameterCollection bindingParams = new BindingParameterCollection();  
    bindingParams.Add(credentials);  
    
  6. 创建并打开通道侦听器。 通过在 OracleDBBinding 上调用 BuildChannelListener<IInputChannel> 方法来创建侦听器。 可以通过在连接 URI 中设置 PollingId 属性来修改 POLLINGSTMT 操作的目标命名空间。 有关适配器连接 URI 的详细信息,请参阅 创建 Oracle 数据库连接 URI

    IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);  
    listener.Open();  
    
  7. 通过在侦听器上调用 AcceptChannel 方法获取 IInputChannel 通道,并将其打开。

    IInputChannel channel = listener.AcceptChannel();  
    channel.Open();  
    
  8. 在通道上调用 Receive ,从适配器获取下一条 POLLINGSTMT 消息。

    Message message = channel.Receive();  
    
  9. 使用 POLLINGSTMT 操作返回的结果集。 可以使用 XmlReaderXmlDictionaryWriter 来使用消息。

    XmlReader reader = message.GetReaderAtBodyContents();  
    
  10. 完成处理请求后关闭通道。

    channel.Close()  
    

    重要

    处理完 POLLINGSTMT 操作后,必须关闭通道。 关闭通道失败可能会影响代码的行为。

  11. 接收完数据更改的消息后,关闭侦听器。

    listener.Close()  
    

    重要

    关闭侦听器不会关闭使用侦听器创建的通道。 必须显式关闭使用侦听器创建的每个通道。

示例

以下示例演示如何配置 Oracle 数据库适配器以使用 WCF 通道模型轮询 Oracle 数据库表和视图以及接收 POLLLINGSTMT 操作。 使用 XmlReader 将 POLLINGSTMT 操作中返回的结果集写入控制台。

using System;  
using System.Collections.Generic;  
using System.Text;  
  
// Add WCF, WCF LOB Adapter SDK, and Oracle Database adapter namepaces  
using System.ServiceModel;  
using System.ServiceModel.Description;  
using Microsoft.ServiceModel.Channels;  
using Microsoft.Adapters.OracleDB;  
  
// Add this namespace for channel model  
using System.ServiceModel.Channels;  
  
using System.Xml;  
using System.Runtime.Serialization;  
using System.IO;  
  
// Include this namespace for the WCF LOB Adapter SDK and Oracle exceptions  
using Microsoft.ServiceModel.Channels.Common;  
  
namespace OraclePollingCM  
{  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            Uri connectionUri = new Uri("oracleDB://ADAPTER/");  
  
            IChannelListener<IInputChannel> listener = null;  
            IInputChannel channel = null;  
  
            // set timeout to receive POLLINGSTMT message  
            TimeSpan messageTimeout = new TimeSpan(0, 0, 30);  
  
            Console.WriteLine("Sample Started");  
  
            try  
            {  
                // Create a binding: specify the InboundOperationType, PollingInterval (in seconds), the           
                // PollingStatement,and the PostPollStatement.  
                OracleDBBinding binding = new OracleDBBinding();  
                binding.InboundOperationType = InboundOperation.Polling;  
                binding.PollingInterval = 30;  
                binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";  
                binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";  
  
                // Create a binding parameter collection and set the credentials  
                ClientCredentials credentials = new ClientCredentials();  
                credentials.UserName.UserName = "SCOTT";  
                credentials.UserName.Password = "TIGER";  
  
                BindingParameterCollection bindingParams = new BindingParameterCollection();  
                bindingParams.Add(credentials);  
  
                Console.WriteLine("Opening listener");  
                // get a listener  from the binding  
                listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);  
                listener.Open();  
  
                Console.WriteLine("Opening channel");  
                // get a channel from the listener  
                channel = listener.AcceptChannel();  
                channel.Open();  
  
                Console.WriteLine("Channel opened -- waiting for polled data");  
                Console.WriteLine("Receive request timeout is {0}", messageTimeout);  
  
                // Poll five times with the specified message timeout   
                // If a timeout occurs polling will be aborted  
                for (int i = 0; i < 5; i++)  
                {  
                    Console.WriteLine("Polling: " + i);  
                    Message message = null;  
                    XmlReader reader = null;  
                    try  
                    {  
                        //Message is received so process the results  
                        message = channel.Receive(messageTimeout);  
                    }  
                    catch (System.TimeoutException toEx)  
                    {  
                        Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);  
                        continue;  
                    }  
  
                    // Get the query results using an XML reader  
                    try  
                    {  
                        reader = message.GetReaderAtBodyContents();  
                    }  
                    catch (Exception ex)  
                    {  
                        Console.WriteLine("Exception :" + ex);  
                        throw;  
                    }  
  
                    // Write the TID, ACCOUNT, AMOUNT, and TRANSDATE for each record to the Console  
                    Console.WriteLine("\nPolling data received for request number {0}", i+1);  
                    Console.WriteLine("Tx ID\tACCOUNT\tAMOUNT\tTx DATE");  
  
                    while (reader.Read())  
                    {  
                        if (reader.IsStartElement())  
                        {  
                            switch (reader.Name)  
                            {  
                                case "POLLINGSTMTRECORD":  
                                    Console.Write("\n");  
                                    break;  
  
                                case "TID":  
                                    reader.Read();  
                                    Console.Write(reader.ReadString() + "\t");  
                                    break;  
  
                                case "ACCOUNT":  
                                    reader.Read();  
                                    Console.Write(reader.ReadString() + "\t");  
                                    break;  
                                case "AMOUNT":  
                                    reader.Read();  
                                    Console.Write(reader.ReadString() + "\t");  
                                    break;  
  
                                case "TRANSDATE":  
                                    reader.Read();  
                                    Console.Write(reader.ReadString() + "\t");  
                                    break;  
  
                                default:  
                                    break;  
                            }  
                        }  
                    }  
  
                    // return the cursor  
                    Console.WriteLine();  
  
                    // close the reader  
                    reader.Close();  
  
                    //            To save the polling data to a file you can REPLACE the code above with the following  
                    //  
                    //            XmlDocument doc = new XmlDocument();  
                    //            doc.Load(reader);  
                    //            using (XmlWriter writer = XmlWriter.Create("PollingOutput.xml"))  
                    //            {  
                    //                doc.WriteTo(writer);  
                    //            }  
                    message.Close();  
                }  
  
                Console.WriteLine("\nPolling done -- hit <RETURN> to finish");  
                Console.ReadLine();  
            }  
            catch (TargetSystemException tex)  
            {  
                Console.WriteLine("Exception occurred on the Oracle Database");  
                Console.WriteLine(tex.InnerException.Message);  
            }  
            catch (ConnectionException cex)  
            {  
                Console.WriteLine("Exception occurred connecting to the Oracle Database");  
                Console.WriteLine(cex.InnerException.Message);  
            }  
            catch (Exception ex)  
            {  
                Console.WriteLine("Exception is: " + ex.Message);  
                if (ex.InnerException != null)  
                {  
                    Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);  
                }  
            }  
            finally  
            {  
                // IMPORTANT: close the channel and listener to stop polling  
                if (channel != null)  
                {  
                    if (channel.State == CommunicationState.Opened)  
                        channel.Close();  
                    else  
                        channel.Abort();  
                }  
  
                if (listener != null)  
                {  
                    if (listener.State == CommunicationState.Opened)  
                        listener.Close();  
                    else  
                        listener.Abort();  
                }  
            }  
        }  
    }  
}  

另请参阅

使用 WCF 通道模型开发 Oracle Database 应用程序