使用 WCF 服务模型在 Oracle 数据库中接收基于轮询的数据更改消息

可以将适用于 Oracle 数据库的 Microsoft BizTalk 适配器配置为针对 Oracle 表或视图接收基于轮询的数据更改消息。 为了接收数据更改的消息,适配器定期针对 Oracle 表或视图执行 SQL 查询,后跟可选的 PL/SQL 代码块。 然后,在入站 POLLINGSTMT 操作中,Oracle 数据库适配器将 SQL 查询的结果作为强类型结果集返回给应用程序。 有关用于使用 Oracle 数据库适配器在 Oracle 数据库上配置和执行轮询的机制的详细信息,请参阅 在 Oracle 数据库适配器中接收基于轮询的数据更改消息。 强烈建议在继续之前先阅读本主题。

若要在使用 WCF 服务模型时接收 POLLINGSTMT 操作,必须:

  • 从适配器公开的元数据 (接口) 为 POLLINGSTMT 操作生成 WCF 服务协定。 为此,请使用添加适配器服务参考 Visual Studio 插件或 ServiceModel 元数据实用工具 (svcutil.exe) 。

  • 从此接口实现 WCF 服务。

  • 使用服务主机 (System.ServiceModel.ServiceHost) 托管此 WCF 服务。

    本部分中的主题提供了有助于对 WCF 服务模型中的 Oracle 数据库表和视图执行轮询的信息和过程。

关于本主题中使用的示例

本主题中的示例使用 /SCOTT/ACCOUNTACTIVITY 表和 /SCOTT/Package/ACCOUNT_PKG/PROCESS_ACTIVITY 函数。 BizTalk 适配器包示例提供了生成这些项目的脚本。 有关示例的详细信息,请参阅 适配器示例

在 WCF 服务模型中配置轮询

通过将绑定属性和可选连接属性 (参数) ,将 Oracle 数据库适配器配置为对 Oracle 数据库表和视图执行轮询。 其中一些属性是必需的,有些属性必须同时在设计时和运行时设置才能生效。

  • 在设计时,在连接到 Oracle 数据库以生成 WCF 服务协定时设置连接参数和绑定属性。

  • 在运行时,在用于创建服务主机的 OracleDBBinding 对象上设置绑定属性。 将服务侦听器添加到服务主机时,可以设置连接参数。

    以下列表简要概述了用于配置轮询的绑定属性和连接参数:

  • PollingStatement 绑定属性。 必须在设计时和运行时设置此绑定属性。

  • 可选绑定属性。 只需在运行时设置这些设置。

  • AcceptCredentialsInUri 绑定属性。 如果要在连接 URI 中启用凭据,则必须在运行时将此绑定属性设置为 true 。 将服务终结点添加到服务主机时,连接 URI 中必须存在用户名和密码。

  • 连接 URI 中的 PollingId 查询字符串参数。 如果要更改 POLLINGSTMT 操作的命名空间,必须在设计时和运行时设置此连接属性。

    有关用于配置轮询的绑定属性和连接参数的完整说明,请参阅 在 Oracle 数据库适配器中接收基于轮询的数据更改消息

WCF 服务协定和类

使用添加适配器服务参考 Visual Studio 插件或 ServiceModel 元数据实用工具 (svcutil.exe) 创建 WCF 服务协定, (接口) 和 POLLINGSTMT 操作的支持类。

使用以下任一工具连接到 Oracle 数据库以生成 POLLINGSTMT 操作的服务协定时:

  • 必须指定 PollingStatement 绑定属性。 适配器使用此绑定属性中的 SELECT 语句为 POLLINGSTMT 操作返回的强类型结果集生成正确的元数据。

  • 可以选择在连接 URI 中指定 PollingId 参数。 适配器使用此参数为 POLLINGSTMT 操作生成命名空间。

    在以下示例中:

  • PollingStatement 设置为“SELECT * FROM ACCOUNTACTIVITY FOR UPDATE”。

  • PollingId 设置为“AcctActivity”。

WCF 服务协定 (接口)

以下代码演示为 POLLINGSTMT 操作生成的 WCF 服务协定 (接口) 。

[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "3.0.0.0")]  
[System.ServiceModel.ServiceContractAttribute(Namespace="http://Microsoft.LobServices.OracleDB/2007/03", ConfigurationName="POLLINGSTMT_OperationGroup")]  
public interface POLLINGSTMT_OperationGroup {  
  
    // CODEGEN: Generating message contract since the wrapper namespace (http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity)  
    // of message POLLINGSTMT does not match the default value (http://Microsoft.LobServices.OracleDB/2007/03)  
    [System.ServiceModel.OperationContractAttribute(IsOneWay=true, Action="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMT")]  
    void POLLINGSTMT(POLLINGSTMT request);  
}  

消息协定

消息协定命名空间由连接 URI 中的 PollingId 参数修改。 请求消息返回一组强类型记录。

[System.Diagnostics.DebuggerStepThroughAttribute()]  
[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "3.0.0.0")]  
[System.ServiceModel.MessageContractAttribute(WrapperName="POLLINGSTMT", WrapperNamespace="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity", IsWrapped=true)]  
public partial class POLLINGSTMT {  
  
    [System.ServiceModel.MessageBodyMemberAttribute(Namespace="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity", Order=0)]  
    public microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity.POLLINGSTMTRECORD[] POLLINGSTMTRECORD;  
  
    public POLLINGSTMT() {  
    }  
  
    public POLLINGSTMT(microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity.POLLINGSTMTRECORD[] POLLINGSTMTRECORD) {  
        this.POLLINGSTMTRECORD = POLLINGSTMTRECORD;  
    }  
}  

数据协定命名空间

“数据协定”是在服务与客户端之间达成的正式协议,用于以抽象方式描述要交换的数据。 也就是说,为了进行通信,客户端和服务不必共享相同的类型,只需共享相同的数据协定。

如果发生数据更改消息,则如果在连接 URI 中指定) ,则 PollingId 参数也会修改数据协定命名空间 (。 数据协定由一个类组成,该类表示查询结果集中的强类型记录。 此示例省略了类定义的详细信息。 类包含表示结果集中的列的属性。

在以下示例中,使用 PollingId“AcctActivity”。

namespace microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity {  
    using System.Runtime.Serialization;  
  
    [System.Diagnostics.DebuggerStepThroughAttribute()]  
    [System.CodeDom.Compiler.GeneratedCodeAttribute("System.Runtime.Serialization", "3.0.0.0")]  
    [System.Runtime.Serialization.DataContractAttribute(Name="POLLINGSTMTRECORD", Namespace="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity")]  
    public partial class POLLINGSTMTRECORD : object, System.Runtime.Serialization.IExtensibleDataObject {…}  
     }  
}  

WCF 服务类

添加适配器服务引用插件还会生成一个文件,该文件具有从服务协定 (接口) 实现的 WCF 服务类的存根。 文件的名称是 OracleDBBindingService.cs。 可以插入逻辑以直接在此类中处理 POLLINGSTMT 操作。 如果使用 svcutil.exe 生成服务协定接口,则必须自行实现此类。 以下代码显示由添加适配器服务引用插件生成的 WCF 服务类。

namespace OracleDBBindingNamespace {  
  
    public class OracleDBBindingService : POLLINGSTMT_OperationGroup {  
  
        // CODEGEN: Generating message contract since the wrapper namespace (http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity)   
        // of message POLLINGSTMT does not match the default value (http://Microsoft.LobServices.OracleDB/2007/03)  
        public virtual void POLLINGSTMT(POLLINGSTMT request) {  
            throw new System.NotImplementedException("The method or operation is not implemented.");  
        }  
    }  
}  

接收 POLLINGSTMT 操作

从 Oracle 数据库适配器接收轮询数据

  1. 使用添加适配器服务引用插件或 svcutil.exe 为 POLLINGSTMT 操作生成 WCF 服务协定 (接口) 和帮助程序类。 有关详细信息,请参阅 为 Oracle 数据库解决方案项目生成 WCF 客户端或 WCF 服务协定。 连接到适配器时,至少必须设置 PollingStatement 绑定属性。 可以选择在连接 URI 中指定 PollingId 参数。 如果使用添加适配器服务引用插件,则应设置配置所需的所有绑定参数。 这可以保证在生成的配置文件中正确设置它们。

  2. 从步骤 1 中生成的接口和帮助程序类实现 WCF 服务。 如果处理从 POLLINGSTMT 操作接收的数据时遇到错误,此类的 POLLINGSTMT 方法可能会引发异常来中止轮询事务;否则, 方法不返回任何内容。 必须按如下所示对 WCF 服务类进行属性设置:

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]  
    
    1. 如果使用了添加适配器服务引用插件来生成接口,则可以直接在生成的 OracleDBBindingService 类的 POLLINGSTMT 方法中实现逻辑。 此类可在 OracleDBBindingService.cs 中找到。 此示例中的此代码子类为 OracleDBBindingService 类。

      [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]  
      
      public class PollingStmtService : OracleDBBindingService  
      {  
          public override void POLLINGSTMT(POLLINGSTMT request)  
          {  
              Console.WriteLine("\nNew Polling Records Received");  
              Console.WriteLine("Tx Id\tAccount\tAmount\tDate\t\t\tDescription");  
              for (int i = 0; i < request.POLLINGSTMTRECORD.Length; i++)  
              {  
                  Console.WriteLine("{0}\t{1}\t{2}\t{3}\t{4}", request.POLLINGSTMTRECORD[i].TID,  
                                      request.POLLINGSTMTRECORD[i].ACCOUNT,  
                                      request.POLLINGSTMTRECORD[i].AMOUNT,  
                                      request.POLLINGSTMTRECORD[i].TRANSDATE,  
                                      request.POLLINGSTMTRECORD[i].DESCRIPTION);  
              }  
          }  
      }  
      
    2. 如果使用 svcutil.exe 来生成接口,则必须创建实现接口的 WCF 服务,并在此类的 POLLINGSTMT 方法中实现逻辑。

  3. 创建在步骤 2 中创建的 WCF 服务的实例。

    // create service instance  
    PollingStmtService pollingInstance = new PollingStmtService();  
    
  4. 使用 WCF 服务和基本连接 URI 创建 System.ServiceModel.ServiceHost 的实例。 基本连接 URI 不能包含 userinfoparams 或query_string。

    // Enable service host  
    Uri[] baseUri = new Uri[] { new Uri("oracledb://Adapter") };  
    ServiceHost srvHost = new ServiceHost(pollingInstance, baseUri);  
    
  5. 创建 OracleDBBinding 并通过设置其绑定属性来配置轮询操作。 可以在代码中显式执行此操作,也可以在配置中以声明方式执行此操作。 至少必须指定轮询语句和轮询间隔。 在此示例中,将凭据指定为 URI 的一部分,因此还必须将 AcceptCredentialsInUri 设置为 true

    // Create and configure a binding for the service endpoint. NOTE: binding  
    // parameters are set here for clarity, but these are already set in the  
    // the generated configuration file  
    OracleDBBinding binding = new OracleDBBinding();  
    
    // The credentials are included in the connection URI, so set this property to true  
    binding.AcceptCredentialsInUri = true;  
    
    // Same as statement specified in Configure Adapter dialog box  
    binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";  
    binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";  
    
    // Be sure to set the interval long enough to complete processing before  
    // the next poll  
    binding.PollingInterval = 15;  
    // Polling is transactional; be sure to set an adequate isolation level   
    // for your environment  
    binding.TransactionIsolationLevel = TransactionIsolationLevel.ReadCommitted;  
    
  6. 将服务终结点添加到服务主机。 为此,请按以下步骤操作:

    • 使用步骤 5 中创建的绑定。

    • 指定包含凭据的连接 URI,并根据需要指定 PollingId。

    • 将协定指定为“POLLINGSTMT_OperationGroup”。

    // Add service endpoint: be sure to specify POLLINGSTMT_OperationGroup as the contract  
    Uri serviceUri = new Uri("oracledb://User=SCOTT;Password=TIGER@Adapter?PollingId=AcctActivity");  
    srvHost.AddServiceEndpoint("POLLINGSTMT_OperationGroup", binding, serviceUri);  
    
  7. 若要接收轮询数据,请打开服务主机。 每当查询返回结果集时,适配器都会返回数据。

    // Open the service host to begin polling  
    srvHost.Open();  
    
  8. 若要终止轮询,请关闭服务主机。

    重要

    适配器将继续轮询,直到服务主机关闭。

    srvHost.Close();  
    

示例

以下示例演示针对 /SCOTT/ACCOUNTACTIVITY 表执行的轮询查询。 轮询后语句调用一个 Oracle 函数,该函数将已处理的记录移到另一个表 /SCOTT/ACCOUNTHISTORY。 通过在连接 URI 中将 PollingId 参数设置为“AccountActivity”来修改 POLLINGSTMT 操作的命名空间。 在此示例中,POLLINGSTMT 操作的 WCF 服务是通过对生成的 OracleDBBindingService 类进行子类创建的;但是,可以直接在生成的类中实现逻辑。

using System;  
using System.Collections.Generic;  
using System.Text;  
  
// Add these three references to use the Oracle adapter  
using System.ServiceModel;  
using Microsoft.ServiceModel.Channels;  
using Microsoft.Adapters.OracleDB;  
  
using microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity;  
using OracleDBBindingNamespace;  
  
namespace OraclePollingSM  
{  
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]  
  
    public class PollingStmtService : OracleDBBindingService  
    {  
        public override void POLLINGSTMT(POLLINGSTMT request)  
        {  
            Console.WriteLine("\nNew Polling Records Received");  
            Console.WriteLine("Tx Id\tAccount\tAmount\tDate\t\t\tDescription");  
            for (int i = 0; i < request.POLLINGSTMTRECORD.Length; i++)  
            {  
                Console.WriteLine("{0}\t{1}\t{2}\t{3}\t{4}", request.POLLINGSTMTRECORD[i].TID,  
                                    request.POLLINGSTMTRECORD[i].ACCOUNT,  
                                    request.POLLINGSTMTRECORD[i].AMOUNT,  
                                    request.POLLINGSTMTRECORD[i].TRANSDATE,  
                                    request.POLLINGSTMTRECORD[i].DESCRIPTION);  
  
            }  
            Console.WriteLine("\nHit <RETURN> to stop polling");  
         }  
    }  
  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            ServiceHost srvHost = null;  
  
            // This URI is used to specify the address for the ServiceEndpoint  
            // It must contain credentials and the PollingId (if any) that was used to generate  
            // the WCF service callback interface  
            Uri serviceUri = new Uri("OracleDb://User=SCOTT;Password=TIGER@Adapter?PollingId=AcctActivity");  
  
            // This URI is used to initialize the ServiceHost. It cannot contain  
            // userinfoparms (credentials) or a query_string (PollingId); otherwise,  
            // an exception is thrown when the ServiceHost is initialized.  
            Uri[] baseUri = new Uri[] { new Uri("OracleDb://Adapter") };  
  
            Console.WriteLine("Sample started, initializing service host -- please wait");  
  
            // create an instanc of the WCF service callback class  
            PollingStmtService pollingInstance = new PollingStmtService();  
  
            try  
            {  
                // Create a ServiceHost with the service callback instance and a base URI (address)  
                srvHost = new ServiceHost(pollingInstance, baseUri);  
  
                // Create and configure a binding for the service endpoint. Note: binding  
                // parameters are set here for clarity but these are already set in the  
                // generated configuration file  
                //  
                // The following properties are set  
                //    AcceptCredentialsInUri (true) to enable credentials in the connection URI for AddServiceEndpoint  
                //    PollingStatement  
                //    PostPollStatement calls PROCESS_ACTIVITY on Oracle. This procedure moves the queried records to  
                //                      the ACCOUNTHISTORY table  
                //    PollingInterval (15 seconds)  
                //    TransactionIsolationLevel   
  
                OracleDBBinding binding = new OracleDBBinding();  
  
                // The Credentials are included in the Connection Uri so set this property true  
                binding.AcceptCredentialsInUri = true;  
  
                // Same as statement specified in Configure Adapter dialog box  
                binding.InboundOperationType = InboundOperation.Polling;  
                binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";  
                binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";  
  
                // Be sure to set the interval long enough to complete processing before  
                // the next poll  
                binding.PollingInterval = 15;  
  
                // Polling is transactional, be sure to set an adequate isolation level   
                // for your environment  
                binding.TransactionIsolationLevel = TransactionIsolationLevel.ReadCommitted;  
  
                // Add service endpoint: be sure to specify POLLINGSTMT_OperationGroup as the contract  
                srvHost.AddServiceEndpoint("POLLINGSTMT_OperationGroup", binding, serviceUri);  
  
                Console.WriteLine("Opening the service host");  
                // Open the service host to begin polling  
                srvHost.Open();  
  
                // Wait to receive request  
                Console.WriteLine("\nPolling started. Returned records will be written to the console.");  
                Console.WriteLine("Hit <RETURN> to stop polling");  
                Console.ReadLine();  
            }  
            catch (Exception e)  
            {  
                Console.WriteLine("Exception :" + e.Message);  
                Console.ReadLine();  
  
                /* If there is an Oracle Error it will be specified in the inner exception */  
                if (e.InnerException != null)  
                {  
                    Console.WriteLine("InnerException: " + e.InnerException.Message);  
                    Console.ReadLine();  
                }  
            }  
            finally  
            {  
                // IMPORTANT: you must close the ServiceHost to stop polling  
                if (srvHost.State == CommunicationState.Opened)  
                    srvHost.Close();  
                else  
                    srvHost.Abort();  
            }  
  
        }  
    }  
}  

另请参阅

使用 WCF 服务模型开发 Oracle 数据库应用程序