使用 WCF 服务模型在 Oracle Database接收基于轮询的数据更改消息

可以将 Microsoft BizTalk 适配器配置为Oracle Database Oracle 表或视图接收基于轮询的数据更改消息。 为了接收数据更改的消息,适配器会定期对 Oracle 表或视图执行SQL查询,后跟可选的 PL/SQL代码块。 然后,SQL查询的结果作为入站 POLLINGSTMT 操作中的Oracle Database强类型结果集返回到应用程序。 有关使用 Oracle Database 适配器在 Oracle 数据库上配置和执行轮询的机制详细信息,请参阅在 Oracle Database 适配器中接收基于轮询 的数据更改消息。 强烈建议在继续操作之前阅读本主题。

若要在使用 WCF 服务模型时接收 POLLINGSTMT 操作,必须:

  • 从适配器公开的元数据 (为 POLLINGSTMT) 生成 WCF 服务协定。 为此,请使用"添加适配器服务引用Visual Studio插件或 ServiceModel 元数据实用工具工具 (svcutil.exe) 。

  • 从此接口实现 WCF 服务。

  • 使用 System.ServiceModel.ServiceHost (服务主机托管 此 WCF) 。

    本节中的主题提供相关信息和过程,以帮助你在 WCF 服务模型中对 Oracle 数据库表和视图执行轮询。

关于本主题中使用的示例

本主题中的示例使用 /SCOTT/ACCOUNTACTIVITY 表和 /SCOTT/Package/ACCOUNT_PKG/PROCESS_ACTIVITY 函数。 BizTalk 适配器包示例提供了用于生成这些项目的脚本。 有关示例详细信息,请参阅 适配器示例

在 WCF 服务模型中配置轮询

通过设置绑定属性Oracle Database和可选的连接属性,将 Oracle Database 适配器配置为对 Oracle 数据库表和视图执行轮询 (参数) 。 其中一些属性是必需的,有些属性若要生效,必须在设计时和运行时进行设置。

  • 在设计时,在连接到客户端时设置连接参数和绑定Oracle Database生成 WCF 服务协定。

  • 在运行时,在用于创建服务主机的 OracleDBBinding 对象上设置绑定属性。 将服务侦听器添加到服务主机时设置连接参数。

    以下列表简要概述了用于配置轮询的绑定属性和连接参数:

  • PollingStatement 绑定属性。 必须在设计时和运行时设置此绑定属性。

  • 可选的绑定属性。 这些仅必须运行时设置。

  • AcceptCredentialsInUri 绑定属性。 如果要在连接 URI 中启用凭据,则必须在运行期间将此绑定属性设置为 true。 将服务终结点添加到服务主机时,用户名和密码必须存在于连接 URI 中。

  • 连接 URI 中的 PollingId 查询字符串参数。 如果要更改 POLLINGSTMT 操作命名空间,则必须在设计时和运行时设置此连接属性。

    有关用于配置轮询的绑定属性和连接参数的完整说明,请参阅在适配器中接收基于轮询的数据 更改Oracle Database消息

WCF 服务协定和类

可以使用添加适配器服务引用 Visual Studio 插件或 ServiceModel 元数据实用工具工具 (svcutil.exe) 为 POLLINGSTMT 操作创建 WCF 服务协定 (接口) 和支持类。

使用以下任一工具连接到 Oracle 数据库时,为 POLLINGSTMT 操作生成服务协定:

  • 必须指定 PollingStatement 绑定属性。 适配器使用此绑定属性中的 SELECT 语句为 POLLINGSTMT 操作返回的强类型结果集生成正确的元数据。

  • 可以选择在连接 URI 中指定 PollingId 参数。 适配器使用此参数为 POLLINGSTMT 操作生成命名空间。

    在以下示例中:

  • PollingStatement 设置为"SELECT * FROM ACCOUNTACTIVITY FOR UPDATE"。

  • PollingId 设置为"AcctActivity"。

WCF 服务协定 (接口)

以下代码显示为 POLLINGSTMT (生成的) WCF 服务协定。

[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "3.0.0.0")]  
[System.ServiceModel.ServiceContractAttribute(Namespace="http://Microsoft.LobServices.OracleDB/2007/03", ConfigurationName="POLLINGSTMT_OperationGroup")]  
public interface POLLINGSTMT_OperationGroup {  
  
    // CODEGEN: Generating message contract since the wrapper namespace (http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity)  
    // of message POLLINGSTMT does not match the default value (http://Microsoft.LobServices.OracleDB/2007/03)  
    [System.ServiceModel.OperationContractAttribute(IsOneWay=true, Action="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMT")]  
    void POLLINGSTMT(POLLINGSTMT request);  
}  

消息协定

消息协定命名空间由连接 URI 中的 PollingId 参数修改。 请求消息返回一组强类型记录。

[System.Diagnostics.DebuggerStepThroughAttribute()]  
[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "3.0.0.0")]  
[System.ServiceModel.MessageContractAttribute(WrapperName="POLLINGSTMT", WrapperNamespace="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity", IsWrapped=true)]  
public partial class POLLINGSTMT {  
  
    [System.ServiceModel.MessageBodyMemberAttribute(Namespace="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity", Order=0)]  
    public microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity.POLLINGSTMTRECORD[] POLLINGSTMTRECORD;  
  
    public POLLINGSTMT() {  
    }  
  
    public POLLINGSTMT(microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity.POLLINGSTMTRECORD[] POLLINGSTMTRECORD) {  
        this.POLLINGSTMTRECORD = POLLINGSTMTRECORD;  
    }  
}  

数据协定命名空间

“数据协定”是在服务与客户端之间达成的正式协议,用于以抽象方式描述要交换的数据。 也就是说,为了进行通信,客户端和服务不需要共享相同的类型,只需共享相同的数据协定。

对于数据更改消息,如果连接 URI (,则数据协定命名空间) 由 PollingId 参数修改。 数据协定由表示查询结果集内强类型记录的类组成。 此示例中省略了类定义的详细信息。 类包含表示结果集内列的属性。

在下面的示例中,使用 PollingId"AcctActivity"。

namespace microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity {  
    using System.Runtime.Serialization;  
  
    [System.Diagnostics.DebuggerStepThroughAttribute()]  
    [System.CodeDom.Compiler.GeneratedCodeAttribute("System.Runtime.Serialization", "3.0.0.0")]  
    [System.Runtime.Serialization.DataContractAttribute(Name="POLLINGSTMTRECORD", Namespace="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity")]  
    public partial class POLLINGSTMTRECORD : object, System.Runtime.Serialization.IExtensibleDataObject {…}  
     }  
}  

WCF 服务类

添加适配器服务引用插件还会生成一个文件,该文件具有从服务协定和接口接口实现的 WCF 服务 (存) 。 该文件的名称为 OracleDBBindingService.cs。 可以将逻辑插入到此类中,以直接处理 POLLINGSTMT 操作。 如果使用 svcutil.exe生成服务协定接口,则必须自己实现此类。 以下代码显示添加适配器服务引用插件生成的 WCF 服务类。

namespace OracleDBBindingNamespace {  
  
    public class OracleDBBindingService : POLLINGSTMT_OperationGroup {  
  
        // CODEGEN: Generating message contract since the wrapper namespace (http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity)   
        // of message POLLINGSTMT does not match the default value (http://Microsoft.LobServices.OracleDB/2007/03)  
        public virtual void POLLINGSTMT(POLLINGSTMT request) {  
            throw new System.NotImplementedException("The method or operation is not implemented.");  
        }  
    }  
}  

接收 POLLINGSTMT 操作

从客户端适配器接收Oracle Database数据

  1. 使用添加适配器服务引用插件或svcutil.exe为 POLLINGSTMT 操作 (接口) 生成 WCF 服务协定。 有关详细信息,请参阅 为解决方案项目生成 WCF 客户端或 WCF Oracle Database协定。 连接到适配器时,至少必须设置 PollingStatement 绑定属性。 可以选择在连接 URI 中指定 PollingId 参数。 如果使用"添加适配器服务引用插件",应设置配置所需的所有绑定参数。 这可保证在生成的配置文件中正确设置它们。

  2. 通过步骤 1 中生成的接口和帮助程序类实现 WCF 服务。 如果在处理从 POLLINGSTMT 操作接收的数据时遇到错误,此类的 POLLINGSTMT 方法可能会引发异常以中止轮询事务;否则, 方法不返回任何内容。 必须按如下方式对 WCF 服务类进行属性处理:

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]  
    
    1. 如果使用了"添加适配器服务引用插件"生成接口,可以直接在生成的 OracleDBBindingService 类中的 POLLINGSTMT 方法中实现逻辑。 此类可在 OracleDBBindingService.cs 中找到。 此示例中的此代码对 OracleDBBindingService 类进行子 类化。

      [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]  
      
      public class PollingStmtService : OracleDBBindingService  
      {  
          public override void POLLINGSTMT(POLLINGSTMT request)  
          {  
              Console.WriteLine("\nNew Polling Records Received");  
              Console.WriteLine("Tx Id\tAccount\tAmount\tDate\t\t\tDescription");  
              for (int i = 0; i < request.POLLINGSTMTRECORD.Length; i++)  
              {  
                  Console.WriteLine("{0}\t{1}\t{2}\t{3}\t{4}", request.POLLINGSTMTRECORD[i].TID,  
                                      request.POLLINGSTMTRECORD[i].ACCOUNT,  
                                      request.POLLINGSTMTRECORD[i].AMOUNT,  
                                      request.POLLINGSTMTRECORD[i].TRANSDATE,  
                                      request.POLLINGSTMTRECORD[i].DESCRIPTION);  
              }  
          }  
      }  
      
    2. 如果使用 svcutil.exe 生成接口,则必须创建一个 WCF 服务,该服务实现 接口,在此类的 POLLINGSTMT 方法中实现逻辑。

  3. 创建在步骤 2 中创建的 WCF 服务的实例。

    // create service instance  
    PollingStmtService pollingInstance = new PollingStmtService();  
    
  4. 使用 WCF 服务和基本连接 URI 创建 System.ServiceModel.ServiceHost 的实例。 基连接 URI 不能包含 userinfoparams 或 query_string。

    // Enable service host  
    Uri[] baseUri = new Uri[] { new Uri("oracledb://Adapter") };  
    ServiceHost srvHost = new ServiceHost(pollingInstance, baseUri);  
    
  5. 创建 OracleDBBinding ,并设置其绑定属性来配置轮询操作。 可以在代码中显式执行此操作,或在配置中以声明性进行此操作。 至少必须指定轮询语句和轮询间隔。 本示例将凭据指定为 URI 的一部分,因此还必须将 AcceptCredentialsInUri 设置为 true

    // Create and configure a binding for the service endpoint. NOTE: binding  
    // parameters are set here for clarity, but these are already set in the  
    // the generated configuration file  
    OracleDBBinding binding = new OracleDBBinding();  
    
    // The credentials are included in the connection URI, so set this property to true  
    binding.AcceptCredentialsInUri = true;  
    
    // Same as statement specified in Configure Adapter dialog box  
    binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";  
    binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";  
    
    // Be sure to set the interval long enough to complete processing before  
    // the next poll  
    binding.PollingInterval = 15;  
    // Polling is transactional; be sure to set an adequate isolation level   
    // for your environment  
    binding.TransactionIsolationLevel = TransactionIsolationLevel.ReadCommitted;  
    
  6. 将服务终结点添加到服务主机。 要执行此操作:

    • 使用在步骤 5 中创建的绑定。

    • 指定包含凭据的连接 URI,并根据需要指定 PollingId。

    • 将协定指定为"POLLINGSTMT_OperationGroup"。

    // Add service endpoint: be sure to specify POLLINGSTMT_OperationGroup as the contract  
    Uri serviceUri = new Uri("oracledb://User=SCOTT;Password=TIGER@Adapter?PollingId=AcctActivity");  
    srvHost.AddServiceEndpoint("POLLINGSTMT_OperationGroup", binding, serviceUri);  
    
  7. 若要接收轮询数据,请打开服务主机。 只要查询返回结果集,适配器就会返回数据。

    // Open the service host to begin polling  
    srvHost.Open();  
    
  8. 若要终止轮询,请关闭服务主机。

    重要

    适配器将继续轮询,直到服务主机关闭。

    srvHost.Close();  
    

示例

以下示例演示对 /SCOTT/ACCOUNTACTIVITY 表执行的轮询查询。 轮询后语句调用 Oracle 函数,该函数将处理的记录移动到另一个表 /SCOTT/ACCOUNTHISTORY。 通过连接 URI 中将 PollingId 参数设置为"AccountActivity"来修改 POLLINGSTMT 操作命名空间。 在此示例中,POLLINGSTMT 操作的 WCF 服务由分类生成的 OracleDBBindingService 类创建;但是,您可以直接在生成的类中实现逻辑。

using System;  
using System.Collections.Generic;  
using System.Text;  
  
// Add these three references to use the Oracle adapter  
using System.ServiceModel;  
using Microsoft.ServiceModel.Channels;  
using Microsoft.Adapters.OracleDB;  
  
using microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity;  
using OracleDBBindingNamespace;  
  
namespace OraclePollingSM  
{  
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]  
  
    public class PollingStmtService : OracleDBBindingService  
    {  
        public override void POLLINGSTMT(POLLINGSTMT request)  
        {  
            Console.WriteLine("\nNew Polling Records Received");  
            Console.WriteLine("Tx Id\tAccount\tAmount\tDate\t\t\tDescription");  
            for (int i = 0; i < request.POLLINGSTMTRECORD.Length; i++)  
            {  
                Console.WriteLine("{0}\t{1}\t{2}\t{3}\t{4}", request.POLLINGSTMTRECORD[i].TID,  
                                    request.POLLINGSTMTRECORD[i].ACCOUNT,  
                                    request.POLLINGSTMTRECORD[i].AMOUNT,  
                                    request.POLLINGSTMTRECORD[i].TRANSDATE,  
                                    request.POLLINGSTMTRECORD[i].DESCRIPTION);  
  
            }  
            Console.WriteLine("\nHit <RETURN> to stop polling");  
         }  
    }  
  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            ServiceHost srvHost = null;  
  
            // This URI is used to specify the address for the ServiceEndpoint  
            // It must contain credentials and the PollingId (if any) that was used to generate  
            // the WCF service callback interface  
            Uri serviceUri = new Uri("OracleDb://User=SCOTT;Password=TIGER@Adapter?PollingId=AcctActivity");  
  
            // This URI is used to initialize the ServiceHost. It cannot contain  
            // userinfoparms (credentials) or a query_string (PollingId); otherwise,  
            // an exception is thrown when the ServiceHost is initialized.  
            Uri[] baseUri = new Uri[] { new Uri("OracleDb://Adapter") };  
  
            Console.WriteLine("Sample started, initializing service host -- please wait");  
  
            // create an instanc of the WCF service callback class  
            PollingStmtService pollingInstance = new PollingStmtService();  
  
            try  
            {  
                // Create a ServiceHost with the service callback instance and a base URI (address)  
                srvHost = new ServiceHost(pollingInstance, baseUri);  
  
                // Create and configure a binding for the service endpoint. Note: binding  
                // parameters are set here for clarity but these are already set in the  
                // generated configuration file  
                //  
                // The following properties are set  
                //    AcceptCredentialsInUri (true) to enable credentials in the connection URI for AddServiceEndpoint  
                //    PollingStatement  
                //    PostPollStatement calls PROCESS_ACTIVITY on Oracle. This procedure moves the queried records to  
                //                      the ACCOUNTHISTORY table  
                //    PollingInterval (15 seconds)  
                //    TransactionIsolationLevel   
  
                OracleDBBinding binding = new OracleDBBinding();  
  
                // The Credentials are included in the Connection Uri so set this property true  
                binding.AcceptCredentialsInUri = true;  
  
                // Same as statement specified in Configure Adapter dialog box  
                binding.InboundOperationType = InboundOperation.Polling;  
                binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";  
                binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";  
  
                // Be sure to set the interval long enough to complete processing before  
                // the next poll  
                binding.PollingInterval = 15;  
  
                // Polling is transactional, be sure to set an adequate isolation level   
                // for your environment  
                binding.TransactionIsolationLevel = TransactionIsolationLevel.ReadCommitted;  
  
                // Add service endpoint: be sure to specify POLLINGSTMT_OperationGroup as the contract  
                srvHost.AddServiceEndpoint("POLLINGSTMT_OperationGroup", binding, serviceUri);  
  
                Console.WriteLine("Opening the service host");  
                // Open the service host to begin polling  
                srvHost.Open();  
  
                // Wait to receive request  
                Console.WriteLine("\nPolling started. Returned records will be written to the console.");  
                Console.WriteLine("Hit <RETURN> to stop polling");  
                Console.ReadLine();  
            }  
            catch (Exception e)  
            {  
                Console.WriteLine("Exception :" + e.Message);  
                Console.ReadLine();  
  
                /* If there is an Oracle Error it will be specified in the inner exception */  
                if (e.InnerException != null)  
                {  
                    Console.WriteLine("InnerException: " + e.InnerException.Message);  
                    Console.ReadLine();  
                }  
            }  
            finally  
            {  
                // IMPORTANT: you must close the ServiceHost to stop polling  
                if (srvHost.State == CommunicationState.Opened)  
                    srvHost.Close();  
                else  
                    srvHost.Abort();  
            }  
  
        }  
    }  
}  

另请参阅

使用 WCF 服务模型开发 Oracle Database 应用程序