Menggunakan Transaksi XA
Gambaran Umum
Driver Microsoft ODBC untuk SQL Server, mulai dari versi 17.3, menyediakan dukungan untuk transaksi XA dengan Koordinator Transaksi Terdistribusi (DTC) di Windows, Linux, dan macOS. Implementasi XA di sisi driver memungkinkan aplikasi klien untuk mengirim operasi serial (seperti start, commit, rollback cabang transaksi, dan sebagainya) ke Transaction Manager (TM). Dan kemudian TM akan berkomunikasi dengan Resource Manager (RM) sesuai dengan operasi ini. Untuk informasi selengkapnya tentang Spesifikasi XA dan implementasi Microsoft untuk DTC (MS DTC), lihat Cara Kerjanya: SQL Server DTC(Transaksi MSDTC dan XA).
Struktur XACALLPARAM
Struktur XACALLPARAM
mendefinisikan informasi yang diperlukan untuk permintaan manajer transaksi XA. Ini didefinisikan sebagai berikut:
typedef struct XACallParam {
unsigned int sizeParam;
int operation;
XID xid;
int flags;
int status;
unsigned int sizeData;
unsigned int sizeReturned;
} XACALLPARAM, *PXACALLPARAM;
sizeParam
XACALLPARAM
Ukuran struktur. Ukuran ini mengecualikan ukuran data berikut XACALLPARAM
.
operation
Operasi XA yang akan diteruskan ke TM. Kemungkinan operasi didefinisikan dalam xadefs.h.
xid
Pengidentifikasi cabang transaksi.
flags
Bendera yang terkait dengan permintaan TM. Nilai yang mungkin ditentukan dalam xadefs.h.
status
Mengembalikan status dari TM. Lihat header xadefs.h untuk kemungkinan status pengembalian.
sizeData
Ukuran buffer data berikut XACALLPARAM
.
sizeReturned
Ukuran data yang dikembalikan.
Untuk membuat permintaan TM, fungsi SQLSetConnectAttr perlu dipanggil dengan atribut SQL_COPT_SS_ENLIST_IN_XA dan penunjuk ke XACALLPARAM
objek.
SQLSetConnectAttr(hdbc, SQL_COPT_SS_ENLIST_IN_XA, param, SQL_IS_POINTER); // XACALLPARAM *param
Sampel Kode
Contoh berikut menunjukkan cara berkomunikasi dengan TM untuk transaksi XA dan menjalankan operasi yang berbeda dari aplikasi klien. Jika pengujian dijalankan terhadap Microsoft SQL Server, MS DTC perlu dikonfigurasi dengan benar untuk mengaktifkan transaksi XA. Definisi XA dapat ditemukan dalam file header xadefs.h .
// XA-DTC.cpp : Defines the entry point for the console application.
//
#include "sqlwindef.h"
#include "xplatsec.h"
#include <sql.h>
#include <sqlext.h>
#include "XaTestRunner.h"
#include <iostream>
#include <string>
#include <memory>
#include <thread>
#include <chrono>
enum class TestType { Commit, Commit1Phase, Rollback, Recover};
RETCODE GetRowCount(HSTMT hstmt, const std::string tableName, int& count)
{
char query[256];
count = 0;
sprintf_s(query, sizeof(query), "SELECT COUNT(*) FROM %s", tableName.c_str());
RETCODE rc = SQLExecDirectA(hstmt, (SQLCHAR*)query, SQL_NTS);
XaTestRunner::CheckRC(rc, "GetRowCount::SQLExecDirectA", hstmt, SQL_HANDLE_STMT);
if (!SQL_SUCCEEDED(rc))
{
return rc;
}
rc = SQLFetch(hstmt);
XaTestRunner::CheckRC(rc, "GetRowCount::SQLFetch", hstmt, SQL_HANDLE_STMT);
if (!SQL_SUCCEEDED(rc))
{
return rc;
}
rc = SQLGetData(hstmt, 1, SQL_C_LONG, &count, sizeof(count), NULL);
XaTestRunner::CheckRC(rc, "GetRowCount::SQLGetData", hstmt, SQL_HANDLE_STMT);
return rc;
}
bool TestXaRunner(HDBC hdbc, const char* connString, TestType testType, int timeout = 0)
{
SQLRETURN rc = SQLDriverConnect(hdbc, NULL, (SQLCHAR*)connString, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_NOPROMPT);
XaTestRunner::CheckRC(rc, "TestXaRunner::Connecting", hdbc, SQL_HANDLE_DBC);
if (!SQL_SUCCEEDED(rc))
{
return false;
}
SQLHSTMT hstmt;
rc = SQLAllocHandle(SQL_HANDLE_STMT, hdbc, &hstmt);
XaTestRunner::CheckRC(rc, "TestXaRunner::Alloc statement", hdbc, SQL_HANDLE_DBC);
const int ROWS_TO_TEST = 10;
int rowCount = 0;
bool result = false;
if (SQL_SUCCEEDED(rc))
{
std::string tableName;
auto testRunner = std::make_shared<XaTestRunner>(hdbc);
testRunner->GetUniqueName(tableName);
bool isTableCreated = false;
RETCODE xaStatus = SQL_ERROR;
bool isTimeoutTest = false;
XID xid;
XaTestRunner::GetUniqueXid(xid);
do
{
if (!(isTableCreated = testRunner->CreateTable(tableName)))
{
std::cout << "TestXaRunner::Failed to create table " << tableName.c_str() << std::endl;
break;
}
if (timeout > 0)
{
testRunner->SetTimeout(timeout);
isTimeoutTest = true;
}
rc = testRunner->Start(xid, TMNOFLAGS, xaStatus);
if (SQL_SUCCEEDED(xaStatus))
{
rc = testRunner->ExecuteInsertSequence(tableName, ROWS_TO_TEST, hstmt);
XaTestRunner::CheckRC(rc, "TestXaRunner::ExecuteInsertSequence", hstmt, SQL_HANDLE_STMT);
if (isTimeoutTest)
{
auto timeToSleep = timeout + 5;
std::cout << "Sleep for " << timeToSleep << " seconds" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(timeToSleep));
}
rc = testRunner->End(xid, TMSUCCESS, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA End failed status=" << xaStatus << std::endl;
break;
}
switch (testType)
{
case TestType::Commit:
rc = testRunner->Prepare(xid, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA Prepare failed status=" << xaStatus << std::endl;
}
else
{
rc = testRunner->Commit(xid, false, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA Commit failed status=" << xaStatus << std::endl;
}
}
break;
case TestType::Commit1Phase:
rc = testRunner->Commit(xid, true, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA Commit one phase failed status=" << xaStatus << std::endl;
}
break;
case TestType::Rollback:
rc = testRunner->Rollback(xid, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA Rollback failed status=" << xaStatus << std::endl;
}
break;
case TestType::Recover:
break;
default:
break;
}
}
else
{
std::cout << "TestXaRunner::XA Start failed status=" << xaStatus << std::endl;
}
} while (false);
if (isTimeoutTest)
{
result = xaStatus == XAER_NOTA;
std::cout << "TestXaRunner::TimeoutTest" " xaStatus=" << xaStatus << " test " << (result ? "Succeded" : "Failed") << std::endl;
}
else
{
auto isCommit = testType == TestType::Commit || testType == TestType::Commit1Phase;
rc = GetRowCount(hstmt, tableName, rowCount);
result = (rowCount == (isCommit ? ROWS_TO_TEST : 0)) && SQL_SUCCEEDED(xaStatus);
std::cout << "TestXaRunner::" << (isCommit ? "Commit" : "Rollback") << " rowCount=" << rowCount << " xaStatus=" << xaStatus << " test " << (result ? "Succeded" : "Failed") << std::endl;
}
if (isTableCreated)
{
testRunner->DropTable(tableName);
}
rc = SQLFreeHandle(SQL_HANDLE_STMT, hstmt);
rc = SQLDisconnect(hdbc);
}
return result;
}
bool TestCommit(HDBC hdbc, const char* connectionString)
{
return TestXaRunner(hdbc, connectionString, TestType::Commit);
}
bool TestCommit1Phase(HDBC hdbc, const char* connectionString)
{
return TestXaRunner(hdbc, connectionString, TestType::Commit1Phase);
}
bool TestRollback(HDBC hdbc, const char* connectionString)
{
return TestXaRunner(hdbc, connectionString, TestType::Rollback);
}
bool TestSetTimeout(HDBC hdbc, const char* connectionString)
{
bool result = false;
result = TestXaRunner(hdbc, connectionString, TestType::Commit, 2);
result = TestXaRunner(hdbc, connectionString, TestType::Rollback, 5);
return result;
}
bool TestRecover(HDBC hdbc, const char* connectionString)
{
SQLRETURN rc = SQLDriverConnect(hdbc, NULL, (SQLCHAR*)connectionString, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_NOPROMPT);
XaTestRunner::CheckRC(rc, "TestXaRunner::Connecting", hdbc, SQL_HANDLE_DBC);
if (!SQL_SUCCEEDED(rc))
{
return false;
}
const int ROWS_TO_TEST = 10;
const int transactionCount = 2;
int rowCount = 0;
bool result = false;
std::vector<std::string> tableNames;
auto testRunner = std::make_shared<XaTestRunner>(hdbc);
auto numCompletedTransactions = 0;
RETCODE xaStatus = SQL_ERROR;
const int sleepTime = 2;
for (auto tr = 0; tr < transactionCount; tr++)
{
std::string tbName;
testRunner->GetUniqueName(tbName);
bool isTableCreated = false;
RETCODE xaStatus = SQL_ERROR;
std::cout << "Started transaction " << tr << std::endl;
do
{
if (!(isTableCreated = testRunner->CreateTable(tbName)))
{
tableNames.emplace_back("");
std::cout << "TestRecover::Failed to create table " << tbName.c_str() << std::endl;
break;
}
tableNames.push_back(std::move(tbName));
XID xid;
XaTestRunner::GetUniqueXid(xid);
rc = testRunner->Start(xid, TMNOFLAGS, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA Start failed status=" << xaStatus << std::endl;
break;
}
rc = testRunner->ExecuteInsertSequence(tableNames[tr], ROWS_TO_TEST);
rc = testRunner->End(xid, TMSUCCESS, xaStatus);
if (xaStatus < 0)
{
std::cout << "TestXaRunner::XA End failed status=" << xaStatus << std::endl;
break;
}
std::cout << "Completed transaction " << tr << " formatId=" <<xid.formatID <<std::endl;
numCompletedTransactions++;
rc = testRunner->Prepare(xid, xaStatus);
std::cout << "Prepared transaction " << tr << std::endl;
} while (false);
}
std::cout << "Sleep for " << sleepTime << "seconds" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(sleepTime));
std::vector<unsigned char> buff(8092);
unsigned int buffSize = static_cast<unsigned int>(buff.size());
testRunner->Recover(TMSTARTRSCAN | TMENDRSCAN, &buff[0], buffSize, xaStatus);
std::cout << "TestRecover:: After Recover buffSize=" << buffSize << " xaStatus=" << xaStatus << std::endl;
if (SQL_SUCCEEDED(xaStatus))
{
auto numRecoveredTransactions = buffSize / sizeof(XID);
std::cout << "TestRecover:: After Recover numRecoveredTransactions=" << numRecoveredTransactions << std::endl;
result = numCompletedTransactions == numRecoveredTransactions;
XID* pXid = (XID*)&buff[0];
for (auto tr = 0; tr < numRecoveredTransactions; tr++, pXid++)
{
rc = testRunner->Commit(*pXid, false, xaStatus);
if (SQL_SUCCEEDED(xaStatus))
{
std::cout << "TestRecover::Successfully committed recovered transaction " << tr << " formatId=" << pXid->formatID << std::endl;
}
else
{
std::cout << "TestRecover::Attempt to commit recovered transaction " << tr << " failed status=" << xaStatus << " formatId=" << pXid->formatID << std::endl;
}
}
}
for (const auto& name : tableNames)
{
if (!name.empty())
{
testRunner->DropTable(name);
}
}
SQLDisconnect(hdbc);
return result;
}
int main(int argc, char** argv)
{
const char* pConnStr = "";
if (argc < 2)
{
std::cout << "ERROR: Connection string is not specified" << std::endl;
return 0;
}
else
{
pConnStr = argv[1];
std::cout << "Connection string: " << pConnStr << std::endl;
}
SQLHENV henv = NULL;
SQLHDBC hdbc = NULL;
std::string connString = pConnStr;
SQLRETURN rc;
rc = SQLAllocHandle(SQL_HANDLE_ENV, NULL, &henv);
XaTestRunner::CheckRC(rc, "Allocating environment", NULL, 0);
rc = SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3, 0);
rc = SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc);
XaTestRunner::CheckRC(rc, "Allocating connection", henv, SQL_HANDLE_ENV);
bool result;
result = TestSetTimeout(hdbc, pConnStr);
result = TestCommit(hdbc, pConnStr);
result = TestCommit1Phase(hdbc, pConnStr);
result = TestRollback(hdbc, pConnStr);
result = TestRecover(hdbc, pConnStr);
SQLFreeHandle(SQL_HANDLE_DBC, hdbc);
SQLFreeHandle(SQL_HANDLE_ENV, henv);
return 0;
}
Kelas XATestRunner
mengimplementasikan kemungkinan panggilan XA saat berkomunikasi dengan server.
XaTestRunner.h
#pragma once
#include "xadefs.h"
#include "sqlwindef.h"
#include "xplatsec.h"
#include <sql.h>
#include <sqlext.h>
#include <random>
struct RandomProvider
{
std::random_device rd;
};
class XidMgr
{
public:
static void GetUniqueXid(XID& xid);
static void GetUniqueXid(XID& xid, int formatId, unsigned char* globalId = nullptr, unsigned int sizeGlobalId = 0);
static int GetRandomNumber(int low = 0, int high = 0xffffffff);
static void GetRandomBuffer(unsigned char* buffer, unsigned int sizeBuffer);
static RandomProvider rndPrv;
};
class XaTestRunner
{
public:
XaTestRunner(HDBC dbc);
~XaTestRunner();
RETCODE Start(const XID& xid, const int flags, RETCODE& xaStatus);
RETCODE End(const XID& xid, const int flags, RETCODE& xaStatus);
RETCODE Prepare(const XID& xid, RETCODE& xaStatus);
RETCODE Commit(const XID& xid, const bool onePhase, RETCODE& xaStatus);
RETCODE Rollback(const XID& xid, RETCODE& xaStatus);
RETCODE Forget(const XID& xid, RETCODE& xaStatus);
RETCODE Recover(const int flags, unsigned char* buffer, unsigned int& sizeBuffer, RETCODE& xaStatus);
bool CreateTable(const std::string& name, SQLHSTMT stmt = NULL);
bool DropTable(const std::string& name, SQLHSTMT stmt = NULL);
void GetUniqueName(std::string& name);
bool ExecuteInsertSequence(const std::string& nameTable, int rows, SQLHSTMT stmt = NULL);
static int CheckRC(SQLRETURN rc, const char *msg, SQLHANDLE handle, SQLSMALLINT htype);
void SetTimeout(const int tmo);
int GetTimeout();
static void GetUniqueXid(XID& xid) { XidMgr::GetUniqueXid(xid); }
static void GetUniqueXid(XID& xid, int formatId, unsigned char* globalId = nullptr, unsigned int sizeGlobalId = 0)
{
XidMgr::GetUniqueXid(xid, formatId, globalId, sizeGlobalId);
}
static void XidShortToXid(const XID_SHORT& xids, XID& xid);
private:
HDBC m_hdbc;
std::string m_tableName;
std::string m_commandCreateTable;
std::string m_commandInsertRow;
static const char* COMMAND_CREATE_TABLE;
static const char* COMMAND_INSERT_ROW;
bool ExecuteQuery(const char* query, const char* msg, SQLHSTMT stmt = NULL);
RETCODE IssueXaCall(const XID* xid, int operation, const int flags, unsigned char* buffer, unsigned int& sizeBuffer, RETCODE& xaStatus);
};
XaTestRunner.cpp
#include "XaTestRunner.h"
#include <chrono>
#include <thread>
#include <ctime>
#include <atomic>
const char* XaTestRunner::COMMAND_CREATE_TABLE = "CREATE TABLE %s (c1 INT, c2 VARCHAR(300))";
const char* XaTestRunner::COMMAND_INSERT_ROW = "INSERT INTO %s Values (%d, 'Varchar data for row %d')";
RandomProvider XidMgr::rndPrv;
int XidMgr::GetRandomNumber(int low, int high)
{
std::mt19937 gen(XidMgr::rndPrv.rd());
std::uniform_int_distribution<> dis(low, high);
return dis(gen);
}
void XidMgr::GetRandomBuffer(unsigned char* buffer, unsigned int sizeBuffer)
{
std::mt19937 gen(XidMgr::rndPrv.rd());
std::uniform_int_distribution<> dis(0, 0xff);
for (unsigned int i = 0; i < sizeBuffer; i++)
{
buffer[i] = dis(gen);
}
}
XaTestRunner::XaTestRunner(HDBC dbc)
: m_hdbc(dbc)
{
GetUniqueName(m_tableName);
m_commandCreateTable = COMMAND_CREATE_TABLE;
m_commandInsertRow = COMMAND_INSERT_ROW;
}
XaTestRunner::~XaTestRunner()
{
}
void XidMgr::GetUniqueXid(XID& xid)
{
long formatId = (long)XidMgr::GetRandomNumber(0, 0xffff);
GetUniqueXid(xid, formatId);
}
void XidMgr::GetUniqueXid(XID& xid, int formatId, unsigned char* globalId, unsigned int sizeGlobalId)
{
auto isGlobalIdDefined = globalId != nullptr && sizeGlobalId > 0 && sizeGlobalId <= 64;
xid.formatID = formatId;
xid.bqual_length = 64;
xid.gtrid_length = isGlobalIdDefined ? sizeGlobalId : 64;
if (!isGlobalIdDefined)
{
GetRandomBuffer(&xid.data[0], xid.gtrid_length);
}
else
{
memcpy_s(&xid.data[0], sizeof(xid.data), globalId, xid.gtrid_length);
}
GetRandomBuffer(&xid.data[xid.gtrid_length], xid.bqual_length);
}
int XaTestRunner::CheckRC(SQLRETURN rc, const char *msg, SQLHANDLE handle, SQLSMALLINT htype)
{
if (rc == SQL_ERROR)
{
printf("Error occurred upon [%s]\n", msg);
if (handle)
{
SQLSMALLINT i = 0;
SQLSMALLINT outlen = 0;
SQLCHAR errmsg[1024];
SQLCHAR sql_state[6];
SQLINTEGER native_error = 0;
while ((rc = SQLGetDiagRec(htype, handle, ++i, sql_state, &native_error, errmsg, sizeof(errmsg), &outlen)) == SQL_SUCCESS
|| rc == SQL_SUCCESS_WITH_INFO)
{
printf("Error# %d: [%s] state [%s]\n", i, errmsg, sql_state);
}
}
return 0;
}
else if (rc == SQL_SUCCESS_WITH_INFO && handle)
{
SQLSMALLINT i = 0;
SQLSMALLINT outlen = 0;
SQLCHAR errmsg[1024];
SQLCHAR sql_state[6];
SQLINTEGER native_error = 0;
printf("Success with info for [%s]:\n", msg);
while ((rc = SQLGetDiagRec(htype, handle, ++i, sql_state, &native_error, errmsg, sizeof(errmsg), &outlen)) == SQL_SUCCESS
|| rc == SQL_SUCCESS_WITH_INFO)
{
printf("Msg# %d: [%s] state [%s]\n", i, errmsg, sql_state);
}
}
return 1;
}
RETCODE XaTestRunner::IssueXaCall(const XID* pXid, int operation, const int flags, unsigned char* buffer, unsigned int& sizeBuffer, RETCODE& xaStatus)
{
auto sizeLimit = sizeBuffer;
unsigned int sizeParam = sizeof(XACALLPARAM) + sizeBuffer;
std::vector<unsigned char> buff(sizeParam);
PXACALLPARAM param = (PXACALLPARAM)(void*)&buff[0];
memset(param, 0, sizeof(XACALLPARAM));
param->flags = flags;
param->operation = operation;
param->sizeParam = sizeParam;
if (pXid)
{
param->xid = *pXid;
}
if (sizeBuffer > 0)
{
param->sizeData = sizeBuffer;
memcpy_s(¶m[1], sizeBuffer, buffer, sizeBuffer);
}
RETCODE rc = SQLSetConnectAttr(m_hdbc, SQL_ATTR_ENLIST_IN_XA, param, SQL_IS_POINTER);
CheckRC(rc, " XaTestRunner::IssueXaCall", m_hdbc, SQL_HANDLE_DBC);
xaStatus = SQL_SUCCEEDED(rc) ? param->status : rc;
sizeBuffer = param->sizeReturned;
if (sizeBuffer)
{
memcpy_s(buffer, sizeLimit, ¶m[1], sizeBuffer);
}
return rc;
}
RETCODE XaTestRunner::Start(const XID& xid, const int flags, RETCODE& xaStatus)
{
unsigned int sizeBuffer = 0;
return IssueXaCall(&xid, OP_START, flags, nullptr, sizeBuffer, xaStatus);
}
RETCODE XaTestRunner::End(const XID& xid, const int flags, RETCODE& xaStatus)
{
unsigned int sizeBuffer = 0;
return IssueXaCall(&xid, OP_END, flags, nullptr, sizeBuffer, xaStatus);
}
RETCODE XaTestRunner::Prepare(const XID& xid, RETCODE& xaStatus)
{
unsigned int sizeBuffer = 0;
return IssueXaCall(&xid, OP_PREPARE, TMNOFLAGS, nullptr, sizeBuffer, xaStatus);
}
RETCODE XaTestRunner::Commit(const XID& xid, const bool onePhase, RETCODE& xaStatus)
{
unsigned int sizeBuffer = 0;
return IssueXaCall(&xid, OP_COMMIT, onePhase ? TMONEPHASE : TMNOFLAGS, nullptr, sizeBuffer, xaStatus);
}
RETCODE XaTestRunner::Rollback(const XID& xid, RETCODE& xaStatus)
{
unsigned int sizeBuffer = 0;
return IssueXaCall(&xid, OP_ROLLBACK, TMNOFLAGS, nullptr, sizeBuffer, xaStatus);
}
RETCODE XaTestRunner::Forget(const XID& xid, RETCODE& xaStatus)
{
unsigned int sizeBuffer = 0;
return IssueXaCall(&xid, OP_FORGET, TMNOFLAGS, nullptr, sizeBuffer, xaStatus);
}
RETCODE XaTestRunner::Recover(const int flags, unsigned char* buffer, unsigned int& sizeBuffer, RETCODE& xaStatus)
{
return IssueXaCall(nullptr, OP_RECOVER, flags, buffer, sizeBuffer, xaStatus);
}
void XaTestRunner::SetTimeout(const int tmo)
{
int timeout = tmo;
unsigned int sizeBuffer = sizeof(timeout);
RETCODE xaStatus;
IssueXaCall(nullptr, OP_SETTIMEOUT, TMNOFLAGS, (unsigned char*)&timeout, sizeBuffer, xaStatus);
}
int XaTestRunner::GetTimeout()
{
int timeout = 0;
unsigned int sizeBuffer = sizeof(timeout);
RETCODE xaStatus;
IssueXaCall(nullptr, OP_GETTIMEOUT, TMNOFLAGS, (unsigned char*)&timeout, sizeBuffer, xaStatus);
return timeout;
}
void XaTestRunner::XidShortToXid(const XID_SHORT& xids, XID& xid)
{
xid.formatID = xids.formatID;
xid.gtrid_length = xids.gtrid_length;
xid.bqual_length = xids.bqual_length;
memcpy_s(&xid.data[0], sizeof(xid.data), &xids.data[0], sizeof(xids.data));
}
void XaTestRunner::GetUniqueName(std::string& name)
{
static std::atomic<unsigned short> counter(0);
auto id = counter++;
auto duration = std::chrono::system_clock::now().time_since_epoch();
long long millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
char szName[64];
sprintf_s(szName, sizeof(szName), "test_%d_%lld", id, millis);
name = szName;
}
bool XaTestRunner::ExecuteQuery(const char* query, const char* msg, SQLHSTMT stmt)
{
RETCODE rc = SQL_SUCCESS;
SQLHSTMT hstmt = stmt;
bool isAllocateStatement = (stmt == NULL);
if (isAllocateStatement)
{
rc = SQLAllocHandle(SQL_HANDLE_STMT, m_hdbc, &hstmt);
}
if (SQL_SUCCEEDED(rc))
{
rc = SQLExecDirectA(hstmt, (SQLCHAR*)query, SQL_NTS);
if (!SQL_SUCCEEDED(rc))
{
CheckRC(rc, msg, hstmt, SQL_HANDLE_STMT);
}
if (isAllocateStatement)
{
SQLFreeStmt(hstmt, SQL_CLOSE);
}
}
else
{
CheckRC(rc, "Alloc Statement", m_hdbc, SQL_HANDLE_DBC);
}
return SQL_SUCCEEDED(rc);
}
bool XaTestRunner::CreateTable(const std::string& name, SQLHSTMT stmt)
{
char query[256];
sprintf_s(query, sizeof(query), m_commandCreateTable.c_str(), name.empty() ? "testRunner" : name.c_str());
return ExecuteQuery(query, "Create Table", stmt);
}
bool XaTestRunner::DropTable(const std::string& name, SQLHSTMT stmt)
{
char query[256];
const char* tableName = name.empty() ? "testRunner" : name.c_str();
sprintf_s(query, sizeof(query), " IF OBJECT_ID('%s', 'U') IS NOT NULL DROP TABLE %s", tableName, tableName);
return ExecuteQuery(query, "Drop Table", stmt);
}
bool XaTestRunner::ExecuteInsertSequence(const std::string& nameTable, int rows, SQLHSTMT stmt)
{
SQLHSTMT hstmt = stmt;
bool isAllocateStatement = (stmt == NULL);
RETCODE rc = SQL_SUCCESS;
if (isAllocateStatement)
{
rc = SQLAllocHandle(SQL_HANDLE_STMT, m_hdbc, &hstmt);
if (!SQL_SUCCEEDED(rc))
{
CheckRC(rc, "Alloc Statement", m_hdbc, SQL_HANDLE_DBC);
return false;
}
}
for (auto r = 0; r < rows; r++)
{
char query[256];
sprintf_s(query, sizeof(query), m_commandInsertRow.c_str(), nameTable.c_str(), r, r);
rc = ExecuteQuery(query, "Insert Row", hstmt);
}
if (isAllocateStatement)
{
SQLFreeStmt(hstmt, SQL_CLOSE);
}
return true;
}
Lampiran
xadefs.h
// xadefs.h : XA specific definitions.
//
#pragma once
// from xa.h
/*
* Transaction branch identification: XID and NULLXID:
*/
#define XIDDATASIZE 128 /* size in bytes */
#define MAXGTRIDSIZE 64 /* maximum size in bytes of gtrid */
#define MAXBQUALSIZE 64 /* maximum size in bytes of bqual */
#ifndef _XID_T_DEFINED
#define _XID_T_DEFINED
struct xid_t
{
int formatID; /* format identifier */
int gtrid_length; /* value not to exceed 64 */
int bqual_length; /* value not to exceed 64 */
unsigned char data[XIDDATASIZE];
};
#endif
#pragma pack (push, 1)
struct xid_s
{
int formatID; /* format identifier */
unsigned char gtrid_length; /* value not to exceed 64 */
unsigned char bqual_length; /* value not to exceed 64 */
unsigned char data[XIDDATASIZE];
};
#pragma pack (pop)
/*
* xa_() return codes (resource manager reports to transaction manager)
*/
#define XA_RBBASE 100 /* The inclusive lower bound of the rollback codes */
#define XA_RBROLLBACK XA_RBBASE /* The rollback was caused by an unspecified reason */
#define XA_RBCOMMFAIL XA_RBBASE+1 /* The rollback was caused by a communication failure */
#define XA_RBDEADLOCK XA_RBBASE+2 /* A deadlock was detected */
#define XA_RBINTEGRITY XA_RBBASE+3 /* A condition that violates the integrity of the resources was detected */
#define XA_RBOTHER XA_RBBASE+4 /* The resource manager rolled back the transaction branch for a reason not on this list */
#define XA_RBPROTO XA_RBBASE+5 /* A protocol error occurred in the resource manager */
#define XA_RBTIMEOUT XA_RBBASE+6 /* A transaction branch took too long */
#define XA_RBTRANSIENT XA_RBBASE+7 /* May retry the transaction branch */
#define XA_RBEND XA_RBTRANSIENT /* The inclusive upper bound of the rollback codes */
#define XA_NOMIGRATE 9 /* resumption must occur where suspension occurred */
#define XA_HEURHAZ 8 /* the transaction branch may have been heuristically completed */
#define XA_HEURCOM 7 /* the transaction branch has been heuristically committed */
#define XA_HEURRB 6 /* the transaction branch has been heuristically rolled back */
#define XA_HEURMIX 5 /* the transaction branch has been heuristically committed and rolled back */
#define XA_RETRY 4 /* routine returned with no effect and may be re-issued */
#define XA_RDONLY 3 /* the transaction branch was read-only and has been committed */
#define XA_OK 0 /* normal execution */
#define XAER_ASYNC (-2) /* asynchronous operation already outstanding */
#define XAER_RMERR (-3) /* a resource manager error occurred in the transaction branch */
#define XAER_NOTA (-4) /* the XID is not valid */
#define XAER_INVAL (-5) /* invalid arguments were given */
#define XAER_PROTO (-6) /* routine invoked in an improper context */
#define XAER_RMFAIL (-7) /* resource manager unavailable */
#define XAER_DUPID (-8) /* the XID already exists */
#define XAER_OUTSIDE (-9) /* resource manager doing work outside */
/* global transaction */
#define TMNOFLAGS 0x00000000L /* no resource manager features selected */
#define TMREGISTER 0x00000001L /* resource manager dynamically registers */
#define TMNOMIGRATE 0x00000002L /* resource manager does not support association migration */
#define TMUSEASYNC 0x00000004L /* resource manager supports asynchronous operations */
/*
* Flag definitions for xa_ and ax_ routines
*/
/* use TMNOFLAGS, defined above, when not specifying other flags */
#define TMASYNC 0x80000000L /* perform routine asynchronously */
#define TMONEPHASE 0x40000000L /* caller is using one-phase commit optimisation */
#define TMFAIL 0x20000000L /* dissociates caller and marks transaction branch rollback-only */
#define TMNOWAIT 0x10000000L /* return if blocking condition exists */
#define TMRESUME 0x08000000L /* caller is resuming association with suspended transaction branch */
#define TMSUCCESS 0x04000000L /* dissociate caller from transaction branch */
#define TMSUSPEND 0x02000000L /* caller is suspending, not ending, association */
#define TMSTARTRSCAN 0x01000000L /* start a recovery scan */
#define TMENDRSCAN 0x00800000L /* end a recovery scan */
#define TMMULTIPLE 0x00400000L /* wait for any asynchronous operation */
#define TMJOIN 0x00200000L /* caller is joining existing transaction branch */
#define TMMIGRATE 0x00100000L /* caller intends to perform migration */
typedef struct xid_t XID;
typedef struct xid_s XID_SHORT;
enum class XaOperation { start, end, prepare, commit, rollback, forget, recover, getTimeout, setTimeout, prepareEx, rollbackEx, forgetEx };
const int OP_START = 0;
const int OP_END = 1;
const int OP_PREPARE = 2;
const int OP_COMMIT = 3;
const int OP_ROLLBACK = 4;
const int OP_FORGET = 5;
const int OP_RECOVER = 6;
const int OP_GETTIMEOUT = 7;
const int OP_SETTIMEOUT = 8;
// extended operations, not called directly by client
const int OP_PREPAREEX = 9;
const int OP_ROLLBACKEX = 10;
const int OP_FORGETEX = 11;
typedef struct XACallParam {
unsigned int sizeParam;
int operation;
XID xid;
int flags;
int status;
unsigned int sizeData;
unsigned int sizeReturned;
} XACALLPARAM, *PXACALLPARAM;
#define FLAG_TIGHTLYCOUPLED 0x8000