使用 XA 事务
概述
自版本 17.3 起,Microsoft ODBC Driver for SQL Server 开始支持在 Windows、Linux 和 macOS 上通过分布式事务处理协调器 (DTC) 使用 XA 事务。 借助驱动程序端上的 XA 实现,客户端应用程序可以将串行操作(如启动、提交、回滚事务分支等)发送到事务管理器 (TM)。 然后,TM 会根据这些操作与资源管理器 (RM) 进行通信。 若要详细了解 XA 规范和 Microsoft DTC (MS DTC) 实现,请参阅工作原理:SQL Server DTC(MSDTC 和 XA 事务)。
XACALLPARAM 结构
XACALLPARAM
结构定义了 XA 事务管理器请求所需的信息。 它的定义如下:
typedef struct XACallParam {
unsigned int sizeParam;
int operation;
XID xid;
int flags;
int status;
unsigned int sizeData;
unsigned int sizeReturned;
} XACALLPARAM, *PXACALLPARAM;
sizeParam
XACALLPARAM
结构的大小。 此大小不包括 XACALLPARAM
后面的数据大小。
operation
要传递给 TM 的 XA 操作。 xadefs.h 中定义了可取操作值。
xid
事务分支标识符。
flags
与 TM 请求关联的标志。 xadefs.h 中定义了可取值。
status
从 TM 返回状态。 若要了解可能返回的状态,请参阅 xadefs.h 头。
sizeData
XACALLPARAM
后面的数据缓冲区大小。
sizeReturned
返回的数据的大小。
必须使用属性 SQL_COPT_SS_ENLIST_IN_XA 和指向 XACALLPARAM
对象的指针来调用 SQLSetConnectAttr 函数,才能发出 TM 请求。
SQLSetConnectAttr(hdbc, SQL_COPT_SS_ENLIST_IN_XA, param, SQL_IS_POINTER); // XACALLPARAM *param
代码示例
下面的示例展示了如何就 XA 事务与 TM 通信,以及如何从客户端应用程序执行不同的操作。 如果测试是针对 Microsoft SQL Server 运行,必须将 MS DTC 正确配置为启用 XA 事务。 可以在 xadefs.h 头文件中找到 XA 定义。
// XA-DTC.cpp : Defines the entry point for the console application.
//
#include "sqlwindef.h"
#include "xplatsec.h"
#include <sql.h>
#include <sqlext.h>
#include "XaTestRunner.h"
#include <iostream>
#include <string>
#include <memory>
#include <thread>
#include <chrono>
enum class TestType { Commit, Commit1Phase, Rollback, Recover};
RETCODE GetRowCount(HSTMT hstmt, const std::string tableName, int& count)
{
char query[256];
count = 0;
sprintf_s(query, sizeof(query), "SELECT COUNT(*) FROM %s", tableName.c_str());
RETCODE rc = SQLExecDirectA(hstmt, (SQLCHAR*)query, SQL_NTS);
XaTestRunner::CheckRC(rc, "GetRowCount::SQLExecDirectA", hstmt, SQL_HANDLE_STMT);
if (!SQL_SUCCEEDED(rc))
{
return rc;
}
rc = SQLFetch(hstmt);
XaTestRunner::CheckRC(rc, "GetRowCount::SQLFetch", hstmt, SQL_HANDLE_STMT);
if (!SQL_SUCCEEDED(rc))
{
return rc;
}
rc = SQLGetData(hstmt, 1, SQL_C_LONG, &count, sizeof(count), NULL);
XaTestRunner::CheckRC(rc, "GetRowCount::SQLGetData", hstmt, SQL_HANDLE_STMT);
return rc;
}
bool TestXaRunner(HDBC hdbc, const char* connString, TestType testType, int timeout = 0)
{
SQLRETURN rc = SQLDriverConnect(hdbc, NULL, (SQLCHAR*)connString, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_NOPROMPT);
XaTestRunner::CheckRC(rc, "TestXaRunner::Connecting", hdbc, SQL_HANDLE_DBC);
if (!SQL_SUCCEEDED(rc))
{
return false;
}
SQLHSTMT hstmt;
rc = SQLAllocHandle(SQL_HANDLE_STMT, hdbc, &hstmt);
XaTestRunner::CheckRC(rc, "TestXaRunner::Alloc statement", hdbc, SQL_HANDLE_DBC);
const int ROWS_TO_TEST = 10;
int rowCount = 0;
bool result = false;
if (SQL_SUCCEEDED(rc))
{
std::string tableName;
auto testRunner = std::make_shared<XaTestRunner>(hdbc);
testRunner->GetUniqueName(tableName);
bool isTableCreated = false;
RETCODE xaStatus = SQL_ERROR;
bool isTimeoutTest = false;
XID xid;
XaTestRunner::GetUniqueXid(xid);
do
{
if (!(isTableCreated = testRunner->CreateTable(tableName)))
{
std::cout << "TestXaRunner::Failed to create table " << tableName.c_str() << std::endl;
break;
}
if (timeout > 0)
{
testRunner->SetTimeout(timeout);
isTimeoutTest = true;
}
rc = testRunner->Start(xid, TMNOFLAGS, xaStatus);
if (SQL_SUCCEEDED(xaStatus))
{
rc = testRunner->ExecuteInsertSequence(tableName, ROWS_TO_TEST, hstmt);
XaTestRunner::CheckRC(rc, "TestXaRunner::ExecuteInsertSequence", hstmt, SQL_HANDLE_STMT);
if (isTimeoutTest)
{
auto timeToSleep = timeout + 5;
std::cout << "Sleep for " << timeToSleep << " seconds" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(timeToSleep));
}
rc = testRunner->End(xid, TMSUCCESS, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA End failed status=" << xaStatus << std::endl;
break;
}
switch (testType)
{
case TestType::Commit:
rc = testRunner->Prepare(xid, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA Prepare failed status=" << xaStatus << std::endl;
}
else
{
rc = testRunner->Commit(xid, false, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA Commit failed status=" << xaStatus << std::endl;
}
}
break;
case TestType::Commit1Phase:
rc = testRunner->Commit(xid, true, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA Commit one phase failed status=" << xaStatus << std::endl;
}
break;
case TestType::Rollback:
rc = testRunner->Rollback(xid, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA Rollback failed status=" << xaStatus << std::endl;
}
break;
case TestType::Recover:
break;
default:
break;
}
}
else
{
std::cout << "TestXaRunner::XA Start failed status=" << xaStatus << std::endl;
}
} while (false);
if (isTimeoutTest)
{
result = xaStatus == XAER_NOTA;
std::cout << "TestXaRunner::TimeoutTest" " xaStatus=" << xaStatus << " test " << (result ? "Succeded" : "Failed") << std::endl;
}
else
{
auto isCommit = testType == TestType::Commit || testType == TestType::Commit1Phase;
rc = GetRowCount(hstmt, tableName, rowCount);
result = (rowCount == (isCommit ? ROWS_TO_TEST : 0)) && SQL_SUCCEEDED(xaStatus);
std::cout << "TestXaRunner::" << (isCommit ? "Commit" : "Rollback") << " rowCount=" << rowCount << " xaStatus=" << xaStatus << " test " << (result ? "Succeded" : "Failed") << std::endl;
}
if (isTableCreated)
{
testRunner->DropTable(tableName);
}
rc = SQLFreeHandle(SQL_HANDLE_STMT, hstmt);
rc = SQLDisconnect(hdbc);
}
return result;
}
bool TestCommit(HDBC hdbc, const char* connectionString)
{
return TestXaRunner(hdbc, connectionString, TestType::Commit);
}
bool TestCommit1Phase(HDBC hdbc, const char* connectionString)
{
return TestXaRunner(hdbc, connectionString, TestType::Commit1Phase);
}
bool TestRollback(HDBC hdbc, const char* connectionString)
{
return TestXaRunner(hdbc, connectionString, TestType::Rollback);
}
bool TestSetTimeout(HDBC hdbc, const char* connectionString)
{
bool result = false;
result = TestXaRunner(hdbc, connectionString, TestType::Commit, 2);
result = TestXaRunner(hdbc, connectionString, TestType::Rollback, 5);
return result;
}
bool TestRecover(HDBC hdbc, const char* connectionString)
{
SQLRETURN rc = SQLDriverConnect(hdbc, NULL, (SQLCHAR*)connectionString, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_NOPROMPT);
XaTestRunner::CheckRC(rc, "TestXaRunner::Connecting", hdbc, SQL_HANDLE_DBC);
if (!SQL_SUCCEEDED(rc))
{
return false;
}
const int ROWS_TO_TEST = 10;
const int transactionCount = 2;
int rowCount = 0;
bool result = false;
std::vector<std::string> tableNames;
auto testRunner = std::make_shared<XaTestRunner>(hdbc);
auto numCompletedTransactions = 0;
RETCODE xaStatus = SQL_ERROR;
const int sleepTime = 2;
for (auto tr = 0; tr < transactionCount; tr++)
{
std::string tbName;
testRunner->GetUniqueName(tbName);
bool isTableCreated = false;
RETCODE xaStatus = SQL_ERROR;
std::cout << "Started transaction " << tr << std::endl;
do
{
if (!(isTableCreated = testRunner->CreateTable(tbName)))
{
tableNames.emplace_back("");
std::cout << "TestRecover::Failed to create table " << tbName.c_str() << std::endl;
break;
}
tableNames.push_back(std::move(tbName));
XID xid;
XaTestRunner::GetUniqueXid(xid);
rc = testRunner->Start(xid, TMNOFLAGS, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA Start failed status=" << xaStatus << std::endl;
break;
}
rc = testRunner->ExecuteInsertSequence(tableNames[tr], ROWS_TO_TEST);
rc = testRunner->End(xid, TMSUCCESS, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA End failed status=" << xaStatus << std::endl;
break;
}
std::cout << "Completed transaction " << tr << " formatId=" <<xid.formatID <<std::endl;
numCompletedTransactions++;
rc = testRunner->Prepare(xid, xaStatus);
std::cout << "Prepared transaction " << tr << std::endl;
} while (false);
}
std::cout << "Sleep for " << sleepTime << "seconds" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(sleepTime));
std::vector<unsigned char> buff(8092);
unsigned int buffSize = static_cast<unsigned int>(buff.size());
testRunner->Recover(TMSTARTRSCAN | TMENDRSCAN, &buff[0], buffSize, xaStatus);
std::cout << "TestRecover:: After Recover buffSize=" << buffSize << " xaStatus=" << xaStatus << std::endl;
if (SQL_SUCCEEDED(xaStatus))
{
auto numRecoveredTransactions = buffSize / sizeof(XID);
std::cout << "TestRecover:: After Recover numRecoveredTransactions=" << numRecoveredTransactions << std::endl;
result = numCompletedTransactions == numRecoveredTransactions;
XID* pXid = (XID*)&buff[0];
for (auto tr = 0; tr < numRecoveredTransactions; tr++, pXid++)
{
rc = testRunner->Commit(*pXid, false, xaStatus);
if (SQL_SUCCEEDED(xaStatus))
{
std::cout << "TestRecover::Successfully committed recovered transaction " << tr << " formatId=" << pXid->formatID << std::endl;
}
else
{
std::cout << "TestRecover::Attempt to commit recovered transaction " << tr << " failed status=" << xaStatus << " formatId=" << pXid->formatID << std::endl;
}
}
}
for (const auto& name : tableNames)
{
if (!name.empty())
{
testRunner->DropTable(name);
}
}
SQLDisconnect(hdbc);
return result;
}
int main(int argc, char** argv)
{
const char* pConnStr = "";
if (argc < 2)
{
std::cout << "ERROR: Connection string is not specified" << std::endl;
return 0;
}
else
{
pConnStr = argv[1];
std::cout << "Connection string: " << pConnStr << std::endl;
}
SQLHENV henv = NULL;
SQLHDBC hdbc = NULL;
std::string connString = pConnStr;
SQLRETURN rc;
rc = SQLAllocHandle(SQL_HANDLE_ENV, NULL, &henv);
XaTestRunner::CheckRC(rc, "Allocating environment", NULL, 0);
rc = SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3, 0);
rc = SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc);
XaTestRunner::CheckRC(rc, "Allocating connection", henv, SQL_HANDLE_ENV);
bool result;
result = TestSetTimeout(hdbc, pConnStr);
result = TestCommit(hdbc, pConnStr);
result = TestCommit1Phase(hdbc, pConnStr);
result = TestRollback(hdbc, pConnStr);
result = TestRecover(hdbc, pConnStr);
SQLFreeHandle(SQL_HANDLE_DBC, hdbc);
SQLFreeHandle(SQL_HANDLE_ENV, henv);
return 0;
}
XATestRunner
类在与服务器通信时实现可能的 XA 调用。
XaTestRunner.h
#pragma once
#include "xadefs.h"
#include "sqlwindef.h"
#include "xplatsec.h"
#include <sql.h>
#include <sqlext.h>
#include <random>
struct RandomProvider
{
std::random_device rd;
};
class XidMgr
{
public:
static void GetUniqueXid(XID& xid);
static void GetUniqueXid(XID& xid, int formatId, unsigned char* globalId = nullptr, unsigned int sizeGlobalId = 0);
static int GetRandomNumber(int low = 0, int high = 0xffffffff);
static void GetRandomBuffer(unsigned char* buffer, unsigned int sizeBuffer);
static RandomProvider rndPrv;
};
class XaTestRunner
{
public:
XaTestRunner(HDBC dbc);
~XaTestRunner();
RETCODE Start(const XID& xid, const int flags, RETCODE& xaStatus);
RETCODE End(const XID& xid, const int flags, RETCODE& xaStatus);
RETCODE Prepare(const XID& xid, RETCODE& xaStatus);
RETCODE Commit(const XID& xid, const bool onePhase, RETCODE& xaStatus);
RETCODE Rollback(const XID& xid, RETCODE& xaStatus);
RETCODE Forget(const XID& xid, RETCODE& xaStatus);
RETCODE Recover(const int flags, unsigned char* buffer, unsigned int& sizeBuffer, RETCODE& xaStatus);
bool CreateTable(const std::string& name, SQLHSTMT stmt = NULL);
bool DropTable(const std::string& name, SQLHSTMT stmt = NULL);
void GetUniqueName(std::string& name);
bool ExecuteInsertSequence(const std::string& nameTable, int rows, SQLHSTMT stmt = NULL);
static int CheckRC(SQLRETURN rc, const char *msg, SQLHANDLE handle, SQLSMALLINT htype);
void SetTimeout(const int tmo);
int GetTimeout();
static void GetUniqueXid(XID& xid) { XidMgr::GetUniqueXid(xid); }
static void GetUniqueXid(XID& xid, int formatId, unsigned char* globalId = nullptr, unsigned int sizeGlobalId = 0)
{
XidMgr::GetUniqueXid(xid, formatId, globalId, sizeGlobalId);
}
static void XidShortToXid(const XID_SHORT& xids, XID& xid);
private:
HDBC m_hdbc;
std::string m_tableName;
std::string m_commandCreateTable;
std::string m_commandInsertRow;
static const char* COMMAND_CREATE_TABLE;
static const char* COMMAND_INSERT_ROW;
bool ExecuteQuery(const char* query, const char* msg, SQLHSTMT stmt = NULL);
RETCODE IssueXaCall(const XID* xid, int operation, const int flags, unsigned char* buffer, unsigned int& sizeBuffer, RETCODE& xaStatus);
};
XaTestRunner.cpp
#include "XaTestRunner.h"
#include <chrono>
#include <thread>
#include <ctime>
#include <atomic>
const char* XaTestRunner::COMMAND_CREATE_TABLE = "CREATE TABLE %s (c1 INT, c2 VARCHAR(300))";
const char* XaTestRunner::COMMAND_INSERT_ROW = "INSERT INTO %s Values (%d, 'Varchar data for row %d')";
RandomProvider XidMgr::rndPrv;
int XidMgr::GetRandomNumber(int low, int high)
{
std::mt19937 gen(XidMgr::rndPrv.rd());
std::uniform_int_distribution<> dis(low, high);
return dis(gen);
}
void XidMgr::GetRandomBuffer(unsigned char* buffer, unsigned int sizeBuffer)
{
std::mt19937 gen(XidMgr::rndPrv.rd());
std::uniform_int_distribution<> dis(0, 0xff);
for (unsigned int i = 0; i < sizeBuffer; i++)
{
buffer[i] = dis(gen);
}
}
XaTestRunner::XaTestRunner(HDBC dbc)
: m_hdbc(dbc)
{
GetUniqueName(m_tableName);
m_commandCreateTable = COMMAND_CREATE_TABLE;
m_commandInsertRow = COMMAND_INSERT_ROW;
}
XaTestRunner::~XaTestRunner()
{
}
void XidMgr::GetUniqueXid(XID& xid)
{
long formatId = (long)XidMgr::GetRandomNumber(0, 0xffff);
GetUniqueXid(xid, formatId);
}
void XidMgr::GetUniqueXid(XID& xid, int formatId, unsigned char* globalId, unsigned int sizeGlobalId)
{
auto isGlobalIdDefined = globalId != nullptr && sizeGlobalId > 0 && sizeGlobalId <= 64;
xid.formatID = formatId;
xid.bqual_length = 64;
xid.gtrid_length = isGlobalIdDefined ? sizeGlobalId : 64;
if (!isGlobalIdDefined)
{
GetRandomBuffer(&xid.data[0], xid.gtrid_length);
}
else
{
memcpy_s(&xid.data[0], sizeof(xid.data), globalId, xid.gtrid_length);
}
GetRandomBuffer(&xid.data[xid.gtrid_length], xid.bqual_length);
}
int XaTestRunner::CheckRC(SQLRETURN rc, const char *msg, SQLHANDLE handle, SQLSMALLINT htype)
{
if (rc == SQL_ERROR)
{
printf("Error occurred upon [%s]\n", msg);
if (handle)
{
SQLSMALLINT i = 0;
SQLSMALLINT outlen = 0;
SQLCHAR errmsg[1024];
SQLCHAR sql_state[6];
SQLINTEGER native_error = 0;
while ((rc = SQLGetDiagRec(htype, handle, ++i, sql_state, &native_error, errmsg, sizeof(errmsg), &outlen)) == SQL_SUCCESS
|| rc == SQL_SUCCESS_WITH_INFO)
{
printf("Error# %d: [%s] state [%s]\n", i, errmsg, sql_state);
}
}
return 0;
}
else if (rc == SQL_SUCCESS_WITH_INFO && handle)
{
SQLSMALLINT i = 0;
SQLSMALLINT outlen = 0;
SQLCHAR errmsg[1024];
SQLCHAR sql_state[6];
SQLINTEGER native_error = 0;
printf("Success with info for [%s]:\n", msg);
while ((rc = SQLGetDiagRec(htype, handle, ++i, sql_state, &native_error, errmsg, sizeof(errmsg), &outlen)) == SQL_SUCCESS
|| rc == SQL_SUCCESS_WITH_INFO)
{
printf("Msg# %d: [%s] state [%s]\n", i, errmsg, sql_state);
}
}
return 1;
}
RETCODE XaTestRunner::IssueXaCall(const XID* pXid, int operation, const int flags, unsigned char* buffer, unsigned int& sizeBuffer, RETCODE& xaStatus)
{
auto sizeLimit = sizeBuffer;
unsigned int sizeParam = sizeof(XACALLPARAM) + sizeBuffer;
std::vector<unsigned char> buff(sizeParam);
PXACALLPARAM param = (PXACALLPARAM)(void*)&buff[0];
memset(param, 0, sizeof(XACALLPARAM));
param->flags = flags;
param->operation = operation;
param->sizeParam = sizeParam;
if (pXid)
{
param->xid = *pXid;
}
if (sizeBuffer > 0)
{
param->sizeData = sizeBuffer;
memcpy_s(¶m[1], sizeBuffer, buffer, sizeBuffer);
}
RETCODE rc = SQLSetConnectAttr(m_hdbc, SQL_ATTR_ENLIST_IN_XA, param, SQL_IS_POINTER);
CheckRC(rc, " XaTestRunner::IssueXaCall", m_hdbc, SQL_HANDLE_DBC);
xaStatus = SQL_SUCCEEDED(rc) ? param->status : rc;
sizeBuffer = param->sizeReturned;
if (sizeBuffer)
{
memcpy_s(buffer, sizeLimit, ¶m[1], sizeBuffer);
}
return rc;
}
RETCODE XaTestRunner::Start(const XID& xid, const int flags, RETCODE& xaStatus)
{
unsigned int sizeBuffer = 0;
return IssueXaCall(&xid, OP_START, flags, nullptr, sizeBuffer, xaStatus);
}
RETCODE XaTestRunner::End(const XID& xid, const int flags, RETCODE& xaStatus)
{
unsigned int sizeBuffer = 0;
return IssueXaCall(&xid, OP_END, flags, nullptr, sizeBuffer, xaStatus);
}
RETCODE XaTestRunner::Prepare(const XID& xid, RETCODE& xaStatus)
{
unsigned int sizeBuffer = 0;
return IssueXaCall(&xid, OP_PREPARE, TMNOFLAGS, nullptr, sizeBuffer, xaStatus);
}
RETCODE XaTestRunner::Commit(const XID& xid, const bool onePhase, RETCODE& xaStatus)
{
unsigned int sizeBuffer = 0;
return IssueXaCall(&xid, OP_COMMIT, onePhase ? TMONEPHASE : TMNOFLAGS, nullptr, sizeBuffer, xaStatus);
}
RETCODE XaTestRunner::Rollback(const XID& xid, RETCODE& xaStatus)
{
unsigned int sizeBuffer = 0;
return IssueXaCall(&xid, OP_ROLLBACK, TMNOFLAGS, nullptr, sizeBuffer, xaStatus);
}
RETCODE XaTestRunner::Forget(const XID& xid, RETCODE& xaStatus)
{
unsigned int sizeBuffer = 0;
return IssueXaCall(&xid, OP_FORGET, TMNOFLAGS, nullptr, sizeBuffer, xaStatus);
}
RETCODE XaTestRunner::Recover(const int flags, unsigned char* buffer, unsigned int& sizeBuffer, RETCODE& xaStatus)
{
return IssueXaCall(nullptr, OP_RECOVER, flags, buffer, sizeBuffer, xaStatus);
}
void XaTestRunner::SetTimeout(const int tmo)
{
int timeout = tmo;
unsigned int sizeBuffer = sizeof(timeout);
RETCODE xaStatus;
IssueXaCall(nullptr, OP_SETTIMEOUT, TMNOFLAGS, (unsigned char*)&timeout, sizeBuffer, xaStatus);
}
int XaTestRunner::GetTimeout()
{
int timeout = 0;
unsigned int sizeBuffer = sizeof(timeout);
RETCODE xaStatus;
IssueXaCall(nullptr, OP_GETTIMEOUT, TMNOFLAGS, (unsigned char*)&timeout, sizeBuffer, xaStatus);
return timeout;
}
void XaTestRunner::XidShortToXid(const XID_SHORT& xids, XID& xid)
{
xid.formatID = xids.formatID;
xid.gtrid_length = xids.gtrid_length;
xid.bqual_length = xids.bqual_length;
memcpy_s(&xid.data[0], sizeof(xid.data), &xids.data[0], sizeof(xids.data));
}
void XaTestRunner::GetUniqueName(std::string& name)
{
static std::atomic<unsigned short> counter(0);
auto id = counter++;
auto duration = std::chrono::system_clock::now().time_since_epoch();
long long millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
char szName[64];
sprintf_s(szName, sizeof(szName), "test_%d_%lld", id, millis);
name = szName;
}
bool XaTestRunner::ExecuteQuery(const char* query, const char* msg, SQLHSTMT stmt)
{
RETCODE rc = SQL_SUCCESS;
SQLHSTMT hstmt = stmt;
bool isAllocateStatement = (stmt == NULL);
if (isAllocateStatement)
{
rc = SQLAllocHandle(SQL_HANDLE_STMT, m_hdbc, &hstmt);
}
if (SQL_SUCCEEDED(rc))
{
rc = SQLExecDirectA(hstmt, (SQLCHAR*)query, SQL_NTS);
if (!SQL_SUCCEEDED(rc))
{
CheckRC(rc, msg, hstmt, SQL_HANDLE_STMT);
}
if (isAllocateStatement)
{
SQLFreeStmt(hstmt, SQL_CLOSE);
}
}
else
{
CheckRC(rc, "Alloc Statement", m_hdbc, SQL_HANDLE_DBC);
}
return SQL_SUCCEEDED(rc);
}
bool XaTestRunner::CreateTable(const std::string& name, SQLHSTMT stmt)
{
char query[256];
sprintf_s(query, sizeof(query), m_commandCreateTable.c_str(), name.empty() ? "testRunner" : name.c_str());
return ExecuteQuery(query, "Create Table", stmt);
}
bool XaTestRunner::DropTable(const std::string& name, SQLHSTMT stmt)
{
char query[256];
const char* tableName = name.empty() ? "testRunner" : name.c_str();
sprintf_s(query, sizeof(query), " IF OBJECT_ID('%s', 'U') IS NOT NULL DROP TABLE %s", tableName, tableName);
return ExecuteQuery(query, "Drop Table", stmt);
}
bool XaTestRunner::ExecuteInsertSequence(const std::string& nameTable, int rows, SQLHSTMT stmt)
{
SQLHSTMT hstmt = stmt;
bool isAllocateStatement = (stmt == NULL);
RETCODE rc = SQL_SUCCESS;
if (isAllocateStatement)
{
rc = SQLAllocHandle(SQL_HANDLE_STMT, m_hdbc, &hstmt);
if (!SQL_SUCCEEDED(rc))
{
CheckRC(rc, "Alloc Statement", m_hdbc, SQL_HANDLE_DBC);
return false;
}
}
for (auto r = 0; r < rows; r++)
{
char query[256];
sprintf_s(query, sizeof(query), m_commandInsertRow.c_str(), nameTable.c_str(), r, r);
rc = ExecuteQuery(query, "Insert Row", hstmt);
}
if (isAllocateStatement)
{
SQLFreeStmt(hstmt, SQL_CLOSE);
}
return true;
}
附录
xadefs.h
// xadefs.h : XA specific definitions.
//
#pragma once
// from xa.h
/*
* Transaction branch identification: XID and NULLXID:
*/
#define XIDDATASIZE 128 /* size in bytes */
#define MAXGTRIDSIZE 64 /* maximum size in bytes of gtrid */
#define MAXBQUALSIZE 64 /* maximum size in bytes of bqual */
#ifndef _XID_T_DEFINED
#define _XID_T_DEFINED
struct xid_t
{
int formatID; /* format identifier */
int gtrid_length; /* value not to exceed 64 */
int bqual_length; /* value not to exceed 64 */
unsigned char data[XIDDATASIZE];
};
#endif
#pragma pack (push, 1)
struct xid_s
{
int formatID; /* format identifier */
unsigned char gtrid_length; /* value not to exceed 64 */
unsigned char bqual_length; /* value not to exceed 64 */
unsigned char data[XIDDATASIZE];
};
#pragma pack (pop)
/*
* xa_() return codes (resource manager reports to transaction manager)
*/
#define XA_RBBASE 100 /* The inclusive lower bound of the rollback codes */
#define XA_RBROLLBACK XA_RBBASE /* The rollback was caused by an unspecified reason */
#define XA_RBCOMMFAIL XA_RBBASE+1 /* The rollback was caused by a communication failure */
#define XA_RBDEADLOCK XA_RBBASE+2 /* A deadlock was detected */
#define XA_RBINTEGRITY XA_RBBASE+3 /* A condition that violates the integrity of the resources was detected */
#define XA_RBOTHER XA_RBBASE+4 /* The resource manager rolled back the transaction branch for a reason not on this list */
#define XA_RBPROTO XA_RBBASE+5 /* A protocol error occurred in the resource manager */
#define XA_RBTIMEOUT XA_RBBASE+6 /* A transaction branch took too long */
#define XA_RBTRANSIENT XA_RBBASE+7 /* May retry the transaction branch */
#define XA_RBEND XA_RBTRANSIENT /* The inclusive upper bound of the rollback codes */
#define XA_NOMIGRATE 9 /* resumption must occur where suspension occurred */
#define XA_HEURHAZ 8 /* the transaction branch may have been heuristically completed */
#define XA_HEURCOM 7 /* the transaction branch has been heuristically committed */
#define XA_HEURRB 6 /* the transaction branch has been heuristically rolled back */
#define XA_HEURMIX 5 /* the transaction branch has been heuristically committed and rolled back */
#define XA_RETRY 4 /* routine returned with no effect and may be re-issued */
#define XA_RDONLY 3 /* the transaction branch was read-only and has been committed */
#define XA_OK 0 /* normal execution */
#define XAER_ASYNC (-2) /* asynchronous operation already outstanding */
#define XAER_RMERR (-3) /* a resource manager error occurred in the transaction branch */
#define XAER_NOTA (-4) /* the XID is not valid */
#define XAER_INVAL (-5) /* invalid arguments were given */
#define XAER_PROTO (-6) /* routine invoked in an improper context */
#define XAER_RMFAIL (-7) /* resource manager unavailable */
#define XAER_DUPID (-8) /* the XID already exists */
#define XAER_OUTSIDE (-9) /* resource manager doing work outside */
/* global transaction */
#define TMNOFLAGS 0x00000000L /* no resource manager features selected */
#define TMREGISTER 0x00000001L /* resource manager dynamically registers */
#define TMNOMIGRATE 0x00000002L /* resource manager does not support association migration */
#define TMUSEASYNC 0x00000004L /* resource manager supports asynchronous operations */
/*
* Flag definitions for xa_ and ax_ routines
*/
/* use TMNOFLAGS, defined above, when not specifying other flags */
#define TMASYNC 0x80000000L /* perform routine asynchronously */
#define TMONEPHASE 0x40000000L /* caller is using one-phase commit optimisation */
#define TMFAIL 0x20000000L /* dissociates caller and marks transaction branch rollback-only */
#define TMNOWAIT 0x10000000L /* return if blocking condition exists */
#define TMRESUME 0x08000000L /* caller is resuming association with suspended transaction branch */
#define TMSUCCESS 0x04000000L /* dissociate caller from transaction branch */
#define TMSUSPEND 0x02000000L /* caller is suspending, not ending, association */
#define TMSTARTRSCAN 0x01000000L /* start a recovery scan */
#define TMENDRSCAN 0x00800000L /* end a recovery scan */
#define TMMULTIPLE 0x00400000L /* wait for any asynchronous operation */
#define TMJOIN 0x00200000L /* caller is joining existing transaction branch */
#define TMMIGRATE 0x00100000L /* caller intends to perform migration */
typedef struct xid_t XID;
typedef struct xid_s XID_SHORT;
enum class XaOperation { start, end, prepare, commit, rollback, forget, recover, getTimeout, setTimeout, prepareEx, rollbackEx, forgetEx };
const int OP_START = 0;
const int OP_END = 1;
const int OP_PREPARE = 2;
const int OP_COMMIT = 3;
const int OP_ROLLBACK = 4;
const int OP_FORGET = 5;
const int OP_RECOVER = 6;
const int OP_GETTIMEOUT = 7;
const int OP_SETTIMEOUT = 8;
// extended operations, not called directly by client
const int OP_PREPAREEX = 9;
const int OP_ROLLBACKEX = 10;
const int OP_FORGETEX = 11;
typedef struct XACallParam {
unsigned int sizeParam;
int operation;
XID xid;
int flags;
int status;
unsigned int sizeData;
unsigned int sizeReturned;
} XACALLPARAM, *PXACALLPARAM;
#define FLAG_TIGHTLYCOUPLED 0x8000