使用 XA 事务

下载 ODBC 驱动程序

概述

自版本 17.3 起,Microsoft ODBC Driver for SQL Server 开始支持在 Windows、Linux 和 macOS 上通过分布式事务处理协调器 (DTC) 使用 XA 事务。 借助驱动程序端上的 XA 实现,客户端应用程序可以将串行操作(如启动、提交、回滚事务分支等)发送到事务管理器 (TM)。 然后,TM 会根据这些操作与资源管理器 (RM) 进行通信。 若要详细了解 XA 规范和 Microsoft DTC (MS DTC) 实现,请参阅工作原理:SQL Server DTC(MSDTC 和 XA 事务)

XACALLPARAM 结构

XACALLPARAM 结构定义了 XA 事务管理器请求所需的信息。 它的定义如下:

typedef struct XACallParam {
    unsigned int sizeParam;
    int          operation;
    XID          xid;
    int          flags;
    int          status;
    unsigned int sizeData;
    unsigned int sizeReturned;
} XACALLPARAM, *PXACALLPARAM; 

sizeParam
XACALLPARAM 结构的大小。 此大小不包括 XACALLPARAM 后面的数据大小。

operation
要传递给 TM 的 XA 操作。 xadefs.h 中定义了可取操作值。

xid
事务分支标识符。

flags
与 TM 请求关联的标志。 xadefs.h 中定义了可取值。

status
从 TM 返回状态。 若要了解可能返回的状态,请参阅 xadefs.h 头。

sizeData
XACALLPARAM 后面的数据缓冲区大小。

sizeReturned
返回的数据的大小。

必须使用属性 SQL_COPT_SS_ENLIST_IN_XA 和指向 XACALLPARAM 对象的指针来调用 SQLSetConnectAttr 函数,才能发出 TM 请求。

SQLSetConnectAttr(hdbc, SQL_COPT_SS_ENLIST_IN_XA, param, SQL_IS_POINTER);  // XACALLPARAM *param

代码示例

下面的示例展示了如何就 XA 事务与 TM 通信,以及如何从客户端应用程序执行不同的操作。 如果测试是针对 Microsoft SQL Server 运行,必须将 MS DTC 正确配置为启用 XA 事务。 可以在 xadefs.h 头文件中找到 XA 定义。

// XA-DTC.cpp : Defines the entry point for the console application.
//

#include "sqlwindef.h"
#include "xplatsec.h"

#include <sql.h>
#include <sqlext.h>
#include "XaTestRunner.h"

#include <iostream>
#include <string>
#include <memory>
#include <thread>
#include <chrono>

enum class TestType { Commit, Commit1Phase, Rollback, Recover};

RETCODE GetRowCount(HSTMT hstmt, const std::string tableName, int& count)
{
    char query[256];
    count = 0;
    sprintf_s(query, sizeof(query), "SELECT COUNT(*) FROM %s", tableName.c_str());
    RETCODE rc = SQLExecDirectA(hstmt, (SQLCHAR*)query, SQL_NTS);
    XaTestRunner::CheckRC(rc, "GetRowCount::SQLExecDirectA", hstmt, SQL_HANDLE_STMT);
    if (!SQL_SUCCEEDED(rc))
    {
        return rc;
    }

    rc = SQLFetch(hstmt);
    XaTestRunner::CheckRC(rc, "GetRowCount::SQLFetch", hstmt, SQL_HANDLE_STMT);
    if (!SQL_SUCCEEDED(rc))
    {
        return rc;
    }

    rc = SQLGetData(hstmt, 1, SQL_C_LONG, &count, sizeof(count), NULL);
    XaTestRunner::CheckRC(rc, "GetRowCount::SQLGetData", hstmt, SQL_HANDLE_STMT);

    return rc;
}

bool TestXaRunner(HDBC hdbc, const char* connString, TestType testType, int timeout = 0)
{
    SQLRETURN rc = SQLDriverConnect(hdbc, NULL, (SQLCHAR*)connString, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_NOPROMPT);
    XaTestRunner::CheckRC(rc, "TestXaRunner::Connecting", hdbc, SQL_HANDLE_DBC);
    if (!SQL_SUCCEEDED(rc))
    {
        return false;
    }

    SQLHSTMT hstmt;
    rc = SQLAllocHandle(SQL_HANDLE_STMT, hdbc, &hstmt);
    XaTestRunner::CheckRC(rc, "TestXaRunner::Alloc statement", hdbc, SQL_HANDLE_DBC);

    const int ROWS_TO_TEST = 10;
    int rowCount = 0;
    bool result = false;

    if (SQL_SUCCEEDED(rc))
    {
        std::string tableName;
        auto testRunner = std::make_shared<XaTestRunner>(hdbc);
        testRunner->GetUniqueName(tableName);
        bool isTableCreated = false;
        RETCODE xaStatus = SQL_ERROR;
        bool isTimeoutTest = false;

        XID xid;
        XaTestRunner::GetUniqueXid(xid);

        do
        {
            if (!(isTableCreated = testRunner->CreateTable(tableName)))
            {
                std::cout << "TestXaRunner::Failed to create table " << tableName.c_str() << std::endl;
                break;
            }

            if (timeout > 0)
            {
                testRunner->SetTimeout(timeout);
                isTimeoutTest = true;
            }

            rc = testRunner->Start(xid, TMNOFLAGS, xaStatus);
            if (SQL_SUCCEEDED(xaStatus))
            {                
                rc = testRunner->ExecuteInsertSequence(tableName, ROWS_TO_TEST, hstmt);
                XaTestRunner::CheckRC(rc, "TestXaRunner::ExecuteInsertSequence", hstmt, SQL_HANDLE_STMT);

                if (isTimeoutTest)
                {
                    auto timeToSleep = timeout + 5;
                    std::cout << "Sleep for " << timeToSleep << " seconds" << std::endl;
                    std::this_thread::sleep_for(std::chrono::seconds(timeToSleep));
                }

                rc = testRunner->End(xid, TMSUCCESS, xaStatus);
                if (xaStatus < 0)
                {
                    std::cout << "TestXaRunner::XA End failed status=" << xaStatus << std::endl;
                    break;
                }


                switch (testType)
                {
                case TestType::Commit:
                    rc = testRunner->Prepare(xid, xaStatus);
                    if (xaStatus < 0)
                    {
                        std::cout << "TestXaRunner::XA Prepare failed status=" << xaStatus << std::endl;
                    }
                    else
                    {
                        rc = testRunner->Commit(xid, false, xaStatus);
                        if (xaStatus < 0)
                        {
                            std::cout << "TestXaRunner::XA Commit failed status=" << xaStatus << std::endl;
                        }
                    }
                    break;
                case TestType::Commit1Phase:
                    rc = testRunner->Commit(xid, true, xaStatus);
                    if (xaStatus < 0)
                    {
                        std::cout << "TestXaRunner::XA Commit one phase failed status=" << xaStatus << std::endl;
                    }
                    break;
                case TestType::Rollback:
                    rc = testRunner->Rollback(xid, xaStatus);
                    if (xaStatus < 0)
                    {
                        std::cout << "TestXaRunner::XA Rollback failed status=" << xaStatus << std::endl;
                    }
                    break;
                case TestType::Recover:
                    break;
                default:
                    break;
                }
            }
            else
            {
                std::cout << "TestXaRunner::XA Start failed status=" << xaStatus << std::endl;
            }
            
        } while (false);
        
        if (isTimeoutTest)
        {
            result = xaStatus == XAER_NOTA;
            std::cout << "TestXaRunner::TimeoutTest" " xaStatus=" << xaStatus << " test " << (result ? "Succeded" : "Failed") << std::endl;
        }
        else
        {
            auto isCommit = testType == TestType::Commit || testType == TestType::Commit1Phase;

            rc = GetRowCount(hstmt, tableName, rowCount);
            result = (rowCount == (isCommit ? ROWS_TO_TEST : 0)) && SQL_SUCCEEDED(xaStatus);

            std::cout << "TestXaRunner::" << (isCommit ? "Commit" : "Rollback") << " rowCount=" << rowCount << " xaStatus=" << xaStatus << " test " << (result ? "Succeded" : "Failed") << std::endl;
        }
       
        if (isTableCreated)
        {
            testRunner->DropTable(tableName);
        }

        rc = SQLFreeHandle(SQL_HANDLE_STMT, hstmt);
        rc = SQLDisconnect(hdbc);
    }

    return result;
}

bool TestCommit(HDBC hdbc, const char* connectionString)
{
    return TestXaRunner(hdbc, connectionString, TestType::Commit);
}


bool TestCommit1Phase(HDBC hdbc, const char* connectionString)
{
    return TestXaRunner(hdbc, connectionString, TestType::Commit1Phase);
}


bool TestRollback(HDBC hdbc, const char* connectionString)
{
    return TestXaRunner(hdbc, connectionString, TestType::Rollback);
}


bool TestSetTimeout(HDBC hdbc, const char* connectionString)
{
    bool result = false;
    result = TestXaRunner(hdbc, connectionString, TestType::Commit, 2);
    result = TestXaRunner(hdbc, connectionString, TestType::Rollback, 5);

    return result;
}

bool TestRecover(HDBC hdbc, const char* connectionString)
{
    SQLRETURN rc = SQLDriverConnect(hdbc, NULL, (SQLCHAR*)connectionString, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_NOPROMPT);
    XaTestRunner::CheckRC(rc, "TestXaRunner::Connecting", hdbc, SQL_HANDLE_DBC);
    if (!SQL_SUCCEEDED(rc))
    {
        return false;
    }

    const int ROWS_TO_TEST = 10;
    const int transactionCount = 2;
    int rowCount = 0;
    bool result = false;
    std::vector<std::string> tableNames;
    auto testRunner = std::make_shared<XaTestRunner>(hdbc);
    auto numCompletedTransactions = 0;
    RETCODE xaStatus = SQL_ERROR;
    const int sleepTime = 2;

    for (auto tr = 0; tr < transactionCount; tr++)
    {
        std::string tbName;
        testRunner->GetUniqueName(tbName);
        bool isTableCreated = false;
        RETCODE xaStatus = SQL_ERROR;

        std::cout << "Started transaction " << tr << std::endl;
        do
        {
            if (!(isTableCreated = testRunner->CreateTable(tbName)))
            {
                tableNames.emplace_back("");
                std::cout << "TestRecover::Failed to create table " << tbName.c_str() << std::endl;
                break;
            }
            tableNames.push_back(std::move(tbName));

            XID xid;
            XaTestRunner::GetUniqueXid(xid);
            rc = testRunner->Start(xid, TMNOFLAGS, xaStatus);
            if (xaStatus < 0)
            {
                std::cout << "TestXaRunner::XA Start failed status=" << xaStatus << std::endl;
                break;
            }
            
            rc = testRunner->ExecuteInsertSequence(tableNames[tr], ROWS_TO_TEST);

            rc = testRunner->End(xid, TMSUCCESS, xaStatus);
            if (xaStatus < 0)
            {
                std::cout << "TestXaRunner::XA End failed status=" << xaStatus << std::endl;
                break;
            }

            std::cout << "Completed transaction " << tr << " formatId=" <<xid.formatID <<std::endl;
            numCompletedTransactions++;

            rc = testRunner->Prepare(xid, xaStatus);
            std::cout << "Prepared transaction " << tr << std::endl;

        } while (false);
    }

    std::cout << "Sleep for " << sleepTime << "seconds" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(sleepTime));

    std::vector<unsigned char> buff(8092);
    unsigned int buffSize = static_cast<unsigned int>(buff.size());
    testRunner->Recover(TMSTARTRSCAN | TMENDRSCAN, &buff[0], buffSize, xaStatus);
    std::cout << "TestRecover:: After Recover buffSize=" << buffSize << " xaStatus=" << xaStatus << std::endl;
    if (SQL_SUCCEEDED(xaStatus))
    {
        auto numRecoveredTransactions = buffSize / sizeof(XID);
        std::cout << "TestRecover:: After Recover numRecoveredTransactions=" << numRecoveredTransactions << std::endl;
        result = numCompletedTransactions == numRecoveredTransactions;
        XID* pXid = (XID*)&buff[0];
        for (auto tr = 0; tr < numRecoveredTransactions; tr++, pXid++)
        {
            rc = testRunner->Commit(*pXid, false, xaStatus);
            if (SQL_SUCCEEDED(xaStatus))
            {
                std::cout << "TestRecover::Successfully committed recovered transaction " << tr << " formatId=" << pXid->formatID << std::endl;
            }
            else
            {
                std::cout << "TestRecover::Attempt to commit recovered transaction " << tr << " failed status=" << xaStatus << " formatId=" << pXid->formatID  << std::endl;
            }
        }
    }

    for (const auto& name : tableNames)
    {
        if (!name.empty())
        {
            testRunner->DropTable(name);
        }
    }

    SQLDisconnect(hdbc);
    return result;
}

int main(int argc, char** argv)
{
    const char* pConnStr = "";
    if (argc < 2)
    {
        std::cout << "ERROR: Connection string is not specified" << std::endl;
        return 0;
    }
    else
    {
        pConnStr = argv[1];
        std::cout << "Connection string: " << pConnStr << std::endl;
    }

    SQLHENV henv = NULL;
    SQLHDBC hdbc = NULL;

    std::string connString = pConnStr;
    SQLRETURN rc;

    rc = SQLAllocHandle(SQL_HANDLE_ENV, NULL, &henv);
    XaTestRunner::CheckRC(rc, "Allocating environment", NULL, 0);

    rc = SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3, 0);

    rc = SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc);
    XaTestRunner::CheckRC(rc, "Allocating connection", henv, SQL_HANDLE_ENV);
    bool result;

    result = TestSetTimeout(hdbc, pConnStr);
    result = TestCommit(hdbc, pConnStr);
    result = TestCommit1Phase(hdbc, pConnStr);
    result = TestRollback(hdbc, pConnStr);
    result = TestRecover(hdbc, pConnStr);

    SQLFreeHandle(SQL_HANDLE_DBC, hdbc);
    SQLFreeHandle(SQL_HANDLE_ENV, henv);

    return 0;
}

XATestRunner 类在与服务器通信时实现可能的 XA 调用。

XaTestRunner.h

#pragma once
#include "xadefs.h"
#include "sqlwindef.h"
#include "xplatsec.h"

#include <sql.h>
#include <sqlext.h>
#include <random>

struct RandomProvider
{
    std::random_device rd;
};

class XidMgr
{
public:
    static void GetUniqueXid(XID& xid);
    static void GetUniqueXid(XID& xid, int formatId, unsigned char* globalId = nullptr, unsigned int sizeGlobalId = 0);
    static int  GetRandomNumber(int low = 0, int high = 0xffffffff);
    static void GetRandomBuffer(unsigned char* buffer, unsigned int sizeBuffer);

    static RandomProvider rndPrv;

};

class XaTestRunner
{
public:
    XaTestRunner(HDBC dbc);
    ~XaTestRunner();

    RETCODE Start(const XID& xid, const int flags, RETCODE& xaStatus);
    RETCODE End(const XID& xid, const int flags, RETCODE& xaStatus);
    RETCODE Prepare(const XID& xid, RETCODE& xaStatus);
    RETCODE Commit(const XID& xid, const bool onePhase, RETCODE& xaStatus);
    RETCODE Rollback(const XID& xid, RETCODE& xaStatus);
    RETCODE Forget(const XID& xid, RETCODE& xaStatus);
    RETCODE Recover(const int flags, unsigned char* buffer, unsigned int& sizeBuffer, RETCODE& xaStatus);

    bool CreateTable(const std::string& name, SQLHSTMT stmt = NULL);
    bool DropTable(const std::string& name, SQLHSTMT stmt = NULL);

    void GetUniqueName(std::string& name);
    bool ExecuteInsertSequence(const std::string& nameTable, int rows, SQLHSTMT stmt = NULL);

    static int CheckRC(SQLRETURN rc, const char *msg, SQLHANDLE handle, SQLSMALLINT htype);

    void SetTimeout(const int tmo);
    int GetTimeout();

    static void GetUniqueXid(XID& xid) { XidMgr::GetUniqueXid(xid); }
    static void GetUniqueXid(XID& xid, int formatId, unsigned char* globalId = nullptr, unsigned int sizeGlobalId = 0)
    {
        XidMgr::GetUniqueXid(xid, formatId, globalId, sizeGlobalId);
    }

    static void XidShortToXid(const XID_SHORT& xids, XID& xid);

private:
    HDBC m_hdbc;
    std::string m_tableName;
    std::string m_commandCreateTable;
    std::string m_commandInsertRow;

    static const char* COMMAND_CREATE_TABLE;
    static const char* COMMAND_INSERT_ROW;

    bool ExecuteQuery(const char* query, const char* msg, SQLHSTMT stmt = NULL);
    RETCODE IssueXaCall(const XID* xid, int operation, const int flags, unsigned char* buffer, unsigned int& sizeBuffer, RETCODE& xaStatus);

};

XaTestRunner.cpp

#include "XaTestRunner.h"
#include <chrono>
#include <thread>
#include <ctime>
#include <atomic>

const char* XaTestRunner::COMMAND_CREATE_TABLE = "CREATE TABLE %s (c1 INT, c2 VARCHAR(300))";
const char* XaTestRunner::COMMAND_INSERT_ROW = "INSERT INTO %s Values (%d, 'Varchar data for row %d')";

RandomProvider XidMgr::rndPrv;

int XidMgr::GetRandomNumber(int low, int high)
{
    std::mt19937 gen(XidMgr::rndPrv.rd());
    std::uniform_int_distribution<> dis(low, high);
    return dis(gen);
}

void XidMgr::GetRandomBuffer(unsigned char* buffer, unsigned int sizeBuffer)
{
    std::mt19937 gen(XidMgr::rndPrv.rd());
    std::uniform_int_distribution<> dis(0, 0xff);
    for (unsigned int i = 0; i < sizeBuffer; i++)
    {
        buffer[i] = dis(gen);
    }
}

XaTestRunner::XaTestRunner(HDBC dbc)
             : m_hdbc(dbc)
{
    GetUniqueName(m_tableName);
    m_commandCreateTable = COMMAND_CREATE_TABLE;
    m_commandInsertRow = COMMAND_INSERT_ROW;
}

XaTestRunner::~XaTestRunner()
{
}

void XidMgr::GetUniqueXid(XID& xid)
{
    long formatId = (long)XidMgr::GetRandomNumber(0, 0xffff);
    GetUniqueXid(xid, formatId);
}

void XidMgr::GetUniqueXid(XID& xid, int formatId, unsigned char* globalId, unsigned int sizeGlobalId)
{    
    auto isGlobalIdDefined = globalId != nullptr && sizeGlobalId > 0 && sizeGlobalId <= 64;

    xid.formatID = formatId;
    xid.bqual_length = 64;
    xid.gtrid_length = isGlobalIdDefined ? sizeGlobalId : 64;
    if (!isGlobalIdDefined)
    {
        GetRandomBuffer(&xid.data[0], xid.gtrid_length);
    }
    else
    { 
        memcpy_s(&xid.data[0], sizeof(xid.data), globalId, xid.gtrid_length);
    }

    GetRandomBuffer(&xid.data[xid.gtrid_length], xid.bqual_length);
}

int XaTestRunner::CheckRC(SQLRETURN rc, const char *msg, SQLHANDLE handle, SQLSMALLINT htype)
{
    if (rc == SQL_ERROR)
    {
        printf("Error occurred upon [%s]\n", msg);

        if (handle)
        {
            SQLSMALLINT i = 0;
            SQLSMALLINT outlen = 0;
            SQLCHAR errmsg[1024];
            SQLCHAR sql_state[6];
            SQLINTEGER native_error = 0;

            while ((rc = SQLGetDiagRec(htype, handle, ++i, sql_state, &native_error, errmsg, sizeof(errmsg), &outlen)) == SQL_SUCCESS
                || rc == SQL_SUCCESS_WITH_INFO)
            {
                printf("Error# %d: [%s] state [%s]\n", i, errmsg, sql_state);
            }
        }

        return 0;
    }
    else if (rc == SQL_SUCCESS_WITH_INFO && handle)
    {
        SQLSMALLINT i = 0;
        SQLSMALLINT outlen = 0;
        SQLCHAR errmsg[1024];
        SQLCHAR sql_state[6];
        SQLINTEGER native_error = 0;

        printf("Success with info for [%s]:\n", msg);

        while ((rc = SQLGetDiagRec(htype, handle, ++i, sql_state, &native_error, errmsg, sizeof(errmsg), &outlen)) == SQL_SUCCESS
            || rc == SQL_SUCCESS_WITH_INFO)
        {
            printf("Msg# %d: [%s] state [%s]\n", i, errmsg, sql_state);
        }
    }
    return 1;
}

RETCODE XaTestRunner::IssueXaCall(const XID* pXid, int operation, const int flags, unsigned char* buffer, unsigned int& sizeBuffer, RETCODE& xaStatus)
{
    auto sizeLimit = sizeBuffer;
    unsigned int sizeParam = sizeof(XACALLPARAM) + sizeBuffer;
    std::vector<unsigned char> buff(sizeParam);
    PXACALLPARAM param = (PXACALLPARAM)(void*)&buff[0];
    memset(param, 0, sizeof(XACALLPARAM));
    param->flags = flags;
    param->operation = operation;
    param->sizeParam = sizeParam;
    if (pXid)
    {
        param->xid = *pXid;
    }
    if (sizeBuffer > 0)
    {
        param->sizeData = sizeBuffer;
        memcpy_s(&param[1], sizeBuffer, buffer, sizeBuffer);
    }

    RETCODE rc = SQLSetConnectAttr(m_hdbc, SQL_ATTR_ENLIST_IN_XA, param, SQL_IS_POINTER);
    CheckRC(rc, " XaTestRunner::IssueXaCall", m_hdbc, SQL_HANDLE_DBC);
    xaStatus = SQL_SUCCEEDED(rc) ? param->status : rc;
    sizeBuffer = param->sizeReturned;
    if (sizeBuffer)
    {
        memcpy_s(buffer, sizeLimit, &param[1], sizeBuffer);
    }

    return rc;

}

RETCODE XaTestRunner::Start(const XID& xid, const int flags, RETCODE& xaStatus)
{
    unsigned int sizeBuffer = 0;
    return IssueXaCall(&xid, OP_START, flags, nullptr, sizeBuffer, xaStatus);
}

RETCODE XaTestRunner::End(const XID& xid, const int flags, RETCODE& xaStatus)
{
    unsigned int sizeBuffer = 0;
    return IssueXaCall(&xid, OP_END, flags, nullptr, sizeBuffer, xaStatus);
}

RETCODE XaTestRunner::Prepare(const XID& xid, RETCODE& xaStatus)
{
    unsigned int sizeBuffer = 0;
    return IssueXaCall(&xid, OP_PREPARE, TMNOFLAGS, nullptr, sizeBuffer, xaStatus);
}

RETCODE XaTestRunner::Commit(const XID& xid, const bool onePhase, RETCODE& xaStatus)
{
    unsigned int sizeBuffer = 0;
    return IssueXaCall(&xid, OP_COMMIT, onePhase ? TMONEPHASE : TMNOFLAGS, nullptr, sizeBuffer, xaStatus);
}

RETCODE XaTestRunner::Rollback(const XID& xid, RETCODE& xaStatus)
{
    unsigned int sizeBuffer = 0;
    return IssueXaCall(&xid, OP_ROLLBACK, TMNOFLAGS, nullptr, sizeBuffer, xaStatus);
}

RETCODE XaTestRunner::Forget(const XID& xid, RETCODE& xaStatus)
{
    unsigned int sizeBuffer = 0;
    return IssueXaCall(&xid, OP_FORGET, TMNOFLAGS, nullptr, sizeBuffer, xaStatus);
}

RETCODE XaTestRunner::Recover(const int flags, unsigned char* buffer, unsigned int& sizeBuffer, RETCODE& xaStatus)
{
    return IssueXaCall(nullptr, OP_RECOVER, flags, buffer, sizeBuffer, xaStatus);
}

void XaTestRunner::SetTimeout(const int tmo)
{
    int timeout = tmo;
    unsigned int sizeBuffer = sizeof(timeout);
    RETCODE xaStatus;
    IssueXaCall(nullptr, OP_SETTIMEOUT, TMNOFLAGS, (unsigned char*)&timeout, sizeBuffer, xaStatus);
}

int XaTestRunner::GetTimeout()
{
    int timeout = 0;
    unsigned int sizeBuffer = sizeof(timeout);
    RETCODE xaStatus;
    IssueXaCall(nullptr, OP_GETTIMEOUT, TMNOFLAGS, (unsigned char*)&timeout, sizeBuffer, xaStatus);
    return timeout;
}

void XaTestRunner::XidShortToXid(const XID_SHORT& xids, XID& xid)
{
    xid.formatID = xids.formatID;
    xid.gtrid_length = xids.gtrid_length;
    xid.bqual_length = xids.bqual_length;
    memcpy_s(&xid.data[0], sizeof(xid.data), &xids.data[0], sizeof(xids.data));
}

void XaTestRunner::GetUniqueName(std::string& name)
{
    static std::atomic<unsigned short> counter(0);
    auto id = counter++;

    auto duration = std::chrono::system_clock::now().time_since_epoch();
    long long millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
    char szName[64];
    sprintf_s(szName, sizeof(szName), "test_%d_%lld", id, millis);
    name = szName;
}

bool XaTestRunner::ExecuteQuery(const char* query, const char* msg, SQLHSTMT stmt)
{
    RETCODE rc = SQL_SUCCESS;
    SQLHSTMT hstmt = stmt;
    bool isAllocateStatement = (stmt == NULL);

    if (isAllocateStatement)
    {
        rc = SQLAllocHandle(SQL_HANDLE_STMT, m_hdbc, &hstmt);
    }

    if (SQL_SUCCEEDED(rc))
    {
        rc = SQLExecDirectA(hstmt, (SQLCHAR*)query, SQL_NTS);
        if (!SQL_SUCCEEDED(rc))
        {
            CheckRC(rc, msg, hstmt, SQL_HANDLE_STMT);
        }

        if (isAllocateStatement)
        {
            SQLFreeStmt(hstmt, SQL_CLOSE);
        }
    }
    else
    {
        CheckRC(rc, "Alloc Statement", m_hdbc, SQL_HANDLE_DBC);
    }

    return SQL_SUCCEEDED(rc);

}

bool XaTestRunner::CreateTable(const std::string& name, SQLHSTMT stmt)
{
    char query[256];
    sprintf_s(query, sizeof(query), m_commandCreateTable.c_str(), name.empty() ? "testRunner" : name.c_str());

    return ExecuteQuery(query, "Create Table", stmt);
}

bool XaTestRunner::DropTable(const std::string& name, SQLHSTMT stmt)
{
    char query[256];
    const char* tableName = name.empty() ? "testRunner" : name.c_str();
    sprintf_s(query, sizeof(query), " IF OBJECT_ID('%s', 'U') IS NOT NULL DROP TABLE %s", tableName, tableName);

    return ExecuteQuery(query, "Drop Table", stmt);
}

bool XaTestRunner::ExecuteInsertSequence(const std::string& nameTable, int rows, SQLHSTMT stmt)
{
    SQLHSTMT hstmt = stmt;
    bool isAllocateStatement = (stmt == NULL);
    RETCODE rc = SQL_SUCCESS;

    if (isAllocateStatement)
    {
        rc = SQLAllocHandle(SQL_HANDLE_STMT, m_hdbc, &hstmt);
        if (!SQL_SUCCEEDED(rc))
        {
            CheckRC(rc, "Alloc Statement", m_hdbc, SQL_HANDLE_DBC);
            return false;
        }
    }

    for (auto r = 0; r < rows; r++)
    {
        char query[256];
        sprintf_s(query, sizeof(query), m_commandInsertRow.c_str(), nameTable.c_str(), r, r);
        rc = ExecuteQuery(query, "Insert Row", hstmt);
    }

    if (isAllocateStatement)
    {
        SQLFreeStmt(hstmt, SQL_CLOSE);
    }

    return true;
}

附录

xadefs.h

// xadefs.h : XA specific definitions.
//

#pragma once

// from xa.h
/*
 * Transaction branch identification: XID and NULLXID:
 */
#define XIDDATASIZE     128         /* size in bytes */
#define MAXGTRIDSIZE    64          /* maximum size in bytes of gtrid */
#define MAXBQUALSIZE    64          /* maximum size in bytes of bqual */

#ifndef _XID_T_DEFINED
#define _XID_T_DEFINED
struct xid_t
{
    int formatID;                   /* format identifier */
    int gtrid_length;               /* value not to exceed 64 */
    int bqual_length;               /* value not to exceed 64 */
    unsigned char data[XIDDATASIZE];
};
#endif

#pragma pack (push, 1)
struct xid_s
{
    int formatID;                           /* format identifier */
    unsigned char gtrid_length;             /* value not to exceed 64 */
    unsigned char bqual_length;             /* value not to exceed 64 */
    unsigned char data[XIDDATASIZE];
};
#pragma pack (pop)

/*
 * xa_() return codes (resource manager reports to transaction manager)
 */
#define XA_RBBASE       100                 /* The inclusive lower bound of the rollback codes */
#define XA_RBROLLBACK   XA_RBBASE           /* The rollback was caused by an unspecified reason */
#define XA_RBCOMMFAIL   XA_RBBASE+1         /* The rollback was caused by a communication failure */
#define XA_RBDEADLOCK   XA_RBBASE+2         /* A deadlock was detected */
#define XA_RBINTEGRITY  XA_RBBASE+3         /* A condition that violates the integrity of the resources was detected */
#define XA_RBOTHER      XA_RBBASE+4         /* The resource manager rolled back the transaction branch for a reason not on this list */
#define XA_RBPROTO      XA_RBBASE+5         /* A protocol error occurred in the resource manager */
#define XA_RBTIMEOUT    XA_RBBASE+6         /* A transaction branch took too long */
#define XA_RBTRANSIENT  XA_RBBASE+7         /* May retry the transaction branch */
#define XA_RBEND        XA_RBTRANSIENT      /* The inclusive upper bound of the rollback codes */

#define XA_NOMIGRATE    9                   /* resumption must occur where suspension occurred */
#define XA_HEURHAZ      8                   /* the transaction branch may have been heuristically completed */
#define XA_HEURCOM      7                   /* the transaction branch has been heuristically committed */
#define XA_HEURRB       6                   /* the transaction branch has been heuristically rolled back */
#define XA_HEURMIX      5                   /* the transaction branch has been heuristically committed and rolled back */
#define XA_RETRY        4                   /* routine returned with no effect and may be re-issued */
#define XA_RDONLY       3                   /* the transaction branch was read-only and has been committed */
#define XA_OK           0                   /* normal execution */
#define XAER_ASYNC      (-2)                /* asynchronous operation already outstanding */
#define XAER_RMERR      (-3)                /* a resource manager error occurred in the transaction branch */
#define XAER_NOTA       (-4)                /* the XID is not valid */
#define XAER_INVAL      (-5)                /* invalid arguments were given */
#define XAER_PROTO      (-6)                /* routine invoked in an improper context */
#define XAER_RMFAIL     (-7)                /* resource manager unavailable */
#define XAER_DUPID      (-8)                /* the XID already exists */
#define XAER_OUTSIDE    (-9)                /* resource manager doing work outside */
                                            /* global transaction */


#define TMNOFLAGS       0x00000000L         /* no resource manager features selected */
#define TMREGISTER      0x00000001L         /* resource manager dynamically registers */
#define TMNOMIGRATE     0x00000002L         /* resource manager does not support association migration */
#define TMUSEASYNC      0x00000004L         /* resource manager supports asynchronous operations */
/*
 * Flag definitions for xa_ and ax_ routines
 */
/* use TMNOFLAGS, defined above, when not specifying other flags */
#define TMASYNC         0x80000000L         /* perform routine asynchronously */
#define TMONEPHASE      0x40000000L         /* caller is using one-phase commit optimisation */
#define TMFAIL          0x20000000L         /* dissociates caller and marks transaction branch rollback-only */
#define TMNOWAIT        0x10000000L         /* return if blocking condition exists */
#define TMRESUME        0x08000000L         /* caller is resuming association with suspended transaction branch */
#define TMSUCCESS       0x04000000L         /* dissociate caller from transaction branch */
#define TMSUSPEND       0x02000000L         /* caller is suspending, not ending, association */
#define TMSTARTRSCAN    0x01000000L         /* start a recovery scan */
#define TMENDRSCAN      0x00800000L         /* end a recovery scan */
#define TMMULTIPLE      0x00400000L         /* wait for any asynchronous operation */
#define TMJOIN          0x00200000L         /* caller is joining existing transaction branch */
#define TMMIGRATE       0x00100000L         /* caller intends to perform migration */

typedef struct xid_t XID;
typedef struct xid_s XID_SHORT;

enum class XaOperation { start, end, prepare, commit, rollback, forget, recover, getTimeout, setTimeout, prepareEx, rollbackEx, forgetEx };
const int OP_START      =  0;
const int OP_END        =  1;
const int OP_PREPARE    =  2;
const int OP_COMMIT     =  3;
const int OP_ROLLBACK   =  4;
const int OP_FORGET     =  5;
const int OP_RECOVER    =  6;
const int OP_GETTIMEOUT =  7;
const int OP_SETTIMEOUT =  8;

// extended operations, not called directly by client
const int OP_PREPAREEX  =  9;
const int OP_ROLLBACKEX =  10;
const int OP_FORGETEX   =  11;

typedef struct XACallParam {
    unsigned int sizeParam;
    int          operation;
    XID          xid;
    int          flags;
    int          status;
    unsigned int sizeData;
    unsigned int sizeReturned;
} XACALLPARAM, *PXACALLPARAM; 

#define FLAG_TIGHTLYCOUPLED  0x8000