Mengirim Data BLOB ke SQL SERVER Menggunakan IROWSETFASTLOAD dan ISEQUENTIALSTREAM di (Native Client OLE DB)
Berlaku untuk: SQL Server Azure SQL Database Azure SQL Managed Instance Azure Synapse Analytics Analytics Platform System (PDW)
Sampel ini menunjukkan cara menggunakan IRowsetFastLoad untuk mengalirkan data BLOB dengan panjang yang bervariasi per baris.
Secara default, sampel ini menunjukkan cara menggunakan IRowsetFastLoad untuk mengirim data BLOB panjang variabel per baris dengan menggunakan pengikatan sebaris. Data BLOB in-line harus pas dalam memori yang tersedia. Metode ini berkinerja terbaik ketika data BLOB kurang dari beberapa megabyte, karena tidak ada overhead aliran tambahan. Untuk data yang lebih besar dari beberapa megabyte, terutama data yang tidak tersedia dalam blok, streaming memberikan performa yang lebih baik.
Dalam kode sumber, saat Anda membatalkan komentar #define USE_ISEQSTREAM , sampel akan menggunakan ISequentialStream. Implementasi aliran didefinisikan dalam sampel, dan dapat mengirim data BLOB ukuran apa pun hanya dengan mengubah MAX_BLOB . Data streaming tidak harus pas dalam memori atau tersedia dalam satu blok. Anda memanggil penyedia ini dengan menggunakan IRowsetFastLoad::InsertRow. Teruskan pointer menggunakan IRowsetFastLoad::InsertRow ke implementasi aliran di buffer data (offset rgBinding.obValue) bersama dengan jumlah data yang tersedia untuk dibaca dari aliran. Beberapa penyedia mungkin tidak perlu mengetahui panjang data saat pengikatan terjadi. Dalam hal ini, panjangnya dapat dihilangkan dari pengikatan.
Sampel tidak menggunakan antarmuka aliran penyedia untuk menulis data ke penyedia. Sebagai gantinya, sampel meneruskan penunjuk ke objek aliran yang akan digunakan penyedia untuk membaca data. Biasanya, penyedia Microsoft (SQLOLEDB dan SQLNCLI) akan membaca data dalam gugus 1024 byte dari objek sampai semua data telah diproses. Baik SQLOLEDB maupun SQLNCLI tidak memiliki implementasi penuh untuk memungkinkan konsumen menulis data ke objek aliran penyedia. Hanya data panjang nol yang dapat dikirim melalui objek streaming penyedia.
Objek ISequentialStream yang diterapkan konsumen dapat digunakan dengan data set baris (IRowsetChange::InsertRow, IRowsetChange::SetData) dan dengan parameter dengan mengikat parameter sebagai DBTYPE_IUNKNOWN.
Karena DBTYPE_IUNKNOWN ditentukan sebagai jenis data dalam pengikatan, jenis kolom atau parameter target harus cocok. Konversi tidak dimungkinkan saat mengirim data melalui ISequentialStream dari antarmuka set baris. Untuk parameter, Anda harus menghindari penggunaan ICommandWithParameters::SetParameterInfo dan menentukan jenis yang berbeda untuk memaksa konversi; ini akan mengharuskan penyedia untuk menyimpan semua data BLOB secara lokal, untuk mengonversinya sebelum mengirimnya ke SQL Server. Penembolokan BLOB besar dan mengonversinya secara lokal tidak memberikan performa yang baik.
Untuk informasi selengkapnya, lihat Blob dan Objek OLE.
Penting
Jika memungkinkan, gunakan Autentikasi Windows. Jika Autentikasi Windows tidak tersedia, minta pengguna untuk memasukkan kredensial mereka pada waktu proses. Hindari menyimpan kredensial dalam file. Jika Anda harus mempertahankan kredensial, Anda harus mengenkripsinya dengan API kripto Win32.
Jalankan daftar kode pertama (Transact-SQL) untuk membuat tabel yang digunakan oleh aplikasi.
Kompilasi dengan ole32.lib oleaut32.lib dan jalankan daftar kode C++ berikut. Aplikasi ini tersambung ke instans SQL Server default komputer Anda. Pada beberapa sistem operasi Windows, Anda harus mengubah (localhost) atau (lokal) ke nama instans SQL Server Anda. Untuk menyambungkan ke instans bernama, ubah string koneksi dari L"(local)" ke L"(local)\\name" , di mana nama adalah instans bernama. Secara default, SQL Server Express menginstal ke instans bernama. Pastikan variabel lingkungan INCLUDE Anda menyertakan direktori yang berisi sqlncli.h.
Jalankan daftar kode ketiga (Transact-SQL) untuk menghapus tabel yang digunakan oleh aplikasi.
use master
create table fltest(col1 int, col2 int, col3 image)
// compile with: ole32.lib oleaut32.lib
#include <windows.h>
#define DBINITCONSTANTS // Must be defined to initialize constants in oledb.h
#define INITGUID
#include <sqloledb.h>
#include <oledb.h>
#include <msdasc.h>
#include <stdio.h>
#include <stdlib.h>
#include <conio.h>
#define MAX_BLOB 200 // For stream binding this can be any size, but for inline it must fit in memory
#define MAX_ROWS 100
#define SAFE_RELEASE(p) { \
if (p) { \
(p)->Release(); \
(p)=NULL; \
} \
}
#ifdef USE_ISEQSTREAM
// ISequentialStream implementation for streaming data
class MySequentialStream : public ISequentialStream {
private:
ULONG m_ulRefCount;
ULONG m_ulBufSize;
ULONG m_ulReadSize;
ULONG m_ulBytesLeft;
ULONG m_ulReadPos;
BYTE * m_pSrcData;
BYTE * m_pReadPtr;
BOOL m_fWasRead;
public:
MySequentialStream() {
m_ulRefCount = 1;
m_ulBufSize = 0;
m_ulReadSize = 0;
m_ulBytesLeft = 0;
m_ulReadPos = 0;
m_pSrcData = NULL;
m_pReadPtr = NULL;
m_fWasRead = FALSE;
}
~MySequentialStream() {}
virtual ULONG STDMETHODCALLTYPE AddRef() {
return ++m_ulRefCount;
}
virtual ULONG STDMETHODCALLTYPE Release() {
--m_ulRefCount;
if (m_ulRefCount == 0) {
delete this;
return 0;
}
return m_ulRefCount;
}
virtual HRESULT STDMETHODCALLTYPE QueryInterface(REFIID riid, void ** ppvObj) {
if (!ppvObj)
return E_INVALIDARG;
else
*ppvObj = NULL;
if (riid != IID_ISequentialStream && riid != IID_IUnknown)
return E_NOINTERFACE;
AddRef();
*ppvObj = this;
return S_OK;
}
HRESULT Init(const void * pSrcData, const ULONG ulBufSize, const ULONG ulReadSize) {
if (NULL == pSrcData)
return E_INVALIDARG;
// Data length must be non-zero
if (0 == ulBufSize)
return E_INVALIDARG;
m_ulBufSize = ulBufSize;
m_ulReadSize = ulReadSize;
m_pSrcData = (BYTE *)pSrcData;
m_pReadPtr = m_pSrcData;
m_ulBytesLeft = m_ulReadSize;
m_ulReadPos = 0;
m_fWasRead = FALSE;
return S_OK;
}
// Can't write data to SQL Server providers (SQLOLEDB/SQLNCLI). Instead, they read from our object.
virtual HRESULT STDMETHODCALLTYPE Write(const void *, ULONG, ULONG * ) {
return E_NOTIMPL;
}
// This implementation simply copies data from the source buffer in whatever size requested.
// But you can do anything here such as reading from a file, reading from a different rowset, stream, etc.
virtual HRESULT STDMETHODCALLTYPE Read(void * pv, ULONG cb, ULONG * pcbRead) {
ULONG ulBytesWritten = 0;
ULONG ulCBToWrite = cb;
ULONG ulCBToCopy;
BYTE * pvb = (BYTE *)pv;
m_fWasRead = TRUE;
if (NULL == m_pSrcData)
return E_FAIL;
if (NULL == pv)
return STG_E_INVALIDPOINTER;
while (ulBytesWritten < ulCBToWrite && m_ulBytesLeft) {
// Make sure we don't write more than our max read size or the size they asked for
ulCBToCopy = min(m_ulBytesLeft, cb);
// Make sure we don't read past the end of the internal buffer
ulCBToCopy = min(m_ulBufSize - m_ulReadPos, ulCBToCopy);
memcpy(pvb, m_pReadPtr + m_ulReadPos, ulCBToCopy);
pvb += ulCBToCopy;
ulBytesWritten += ulCBToCopy;
m_ulBytesLeft -= ulCBToCopy;
cb -= ulCBToCopy;
// Wrap reads around the src buffer
m_ulReadPos += ulCBToCopy;
if (m_ulReadPos >= m_ulBufSize)
m_ulReadPos = 0;
}
if (pcbRead)
*pcbRead = ulBytesWritten;
return S_OK;
}
};
#endif // USE_ISEQSTREAM
HRESULT SetFastLoadProperty(IDBInitialize * pIDBInitialize) {
HRESULT hr = S_OK;
IDBProperties * pIDBProps = NULL;
DBPROP rgProps[1];
DBPROPSET PropSet;
VariantInit(&rgProps[0].vValue);
rgProps[0].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProps[0].colid = DB_NULLID;
rgProps[0].vValue.vt = VT_BOOL;
rgProps[0].dwPropertyID = SSPROP_ENABLEFASTLOAD;
rgProps[0].vValue.boolVal = VARIANT_TRUE;
PropSet.rgProperties = rgProps;
PropSet.cProperties = 1;
PropSet.guidPropertySet = DBPROPSET_SQLSERVERDATASOURCE;
if (SUCCEEDED(hr = pIDBInitialize->QueryInterface(IID_IDBProperties, (LPVOID *)&pIDBProps))) {
hr = pIDBProps->SetProperties(1, &PropSet);
}
VariantClear(&rgProps[0].vValue);
if (pIDBProps)
pIDBProps->Release();
return hr;
}
void wmain() {
// Setup the initialization options
ULONG cProperties = 0;
DBPROP rgProperties[10];
ULONG cPropSets = 0;
DBPROPSET rgPropSets[1];
LPWSTR pwszProgID = L"SQLOLEDB";
LPWSTR pwszDataSource = NULL;
LPWSTR pwszUserID = NULL;
LPWSTR pwszPassword = NULL;
LPWSTR pwszProviderString = L"server=(local);trusted_connection=yes;";
IDBInitialize * pIDBInitialize = NULL;
IDBCreateSession * pIDBCrtSess = NULL;
IOpenRowset * pIOpenRowset = NULL;
IDBCreateCommand * pIDBCrtCmd = NULL;
ICommandText * pICmdText = NULL;
IAccessor * pIAccessor = NULL;
IRowsetFastLoad * pIRowsetFastLoad = NULL;
IDBProperties * pIDBProperties = NULL;
DBBINDING rgBinding[3];
DBBINDSTATUS rgStatus[3];
ULONG ulOffset = 0;
HACCESSOR hAcc = DB_NULL_HACCESSOR;
BYTE * pData = NULL;
ULONG iRow = 0;
LPWSTR pwszTableName = L"fltest";
DBID TableID;
HRESULT hr;
#ifdef USE_ISEQSTREAM
BYTE bSrcBuf[1024]; // A buffer to hold our data for streaming
memset((void *)&bSrcBuf, 0xAB, sizeof(bSrcBuf)); // Stream data value 0xAB
MySequentialStream * pMySeqStream = new MySequentialStream();
DBOBJECT MyObject = {STGM_READ, IID_ISequentialStream}; // NULL pObject implies STGM_READ and IID_IUnknown, but not recommended
#endif
memset(rgBinding, 0, ( sizeof(rgBinding) / sizeof(rgBinding[0])) * sizeof(DBBINDING) );
TableID.eKind = DBKIND_NAME;
TableID.uName.pwszName = pwszTableName;
// Col1
rgBinding[0].iOrdinal = 1;
rgBinding[0].wType = DBTYPE_I4;
rgBinding[0].obStatus = ulOffset;
ulOffset+=sizeof(DBSTATUS);
rgBinding[0].obLength = ulOffset;
ulOffset+=sizeof(DBLENGTH);
rgBinding[0].obValue = ulOffset;
ulOffset += sizeof(LONG);
rgBinding[0].cbMaxLen = sizeof(LONG);
rgBinding[0].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;
rgBinding[0].eParamIO = DBPARAMIO_NOTPARAM;
rgBinding[0].dwMemOwner = DBMEMOWNER_CLIENTOWNED;
//Col2
rgBinding[1].iOrdinal = 2;
rgBinding[1].wType = DBTYPE_I4;
rgBinding[1].obStatus = ulOffset;
ulOffset+=sizeof(DBSTATUS);
rgBinding[1].obLength = ulOffset;
ulOffset+=sizeof(DBLENGTH);
rgBinding[1].obValue = ulOffset;
ulOffset += sizeof(LONG);
rgBinding[1].cbMaxLen = sizeof(LONG);
rgBinding[1].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;
rgBinding[1].eParamIO = DBPARAMIO_NOTPARAM;
rgBinding[1].dwMemOwner = DBMEMOWNER_CLIENTOWNED;
//Col3
rgBinding[2].iOrdinal = 3;
rgBinding[2].obStatus = ulOffset;
ulOffset+=sizeof(DBSTATUS);
rgBinding[2].obLength = ulOffset;
ulOffset+=sizeof(DBLENGTH);
rgBinding[2].obValue = ulOffset;
rgBinding[2].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH; // DBPART_LENGTH not needed for providers that don't require length
rgBinding[2].eParamIO = DBPARAMIO_NOTPARAM;
rgBinding[2].dwMemOwner = DBMEMOWNER_CLIENTOWNED;
#ifdef USE_ISEQSTREAM
rgBinding[2].wType = DBTYPE_IUNKNOWN;
ulOffset += sizeof(ISequentialStream *); // Technically should be sizeof(MySequentialStream *), but who's counting?
rgBinding[2].cbMaxLen = sizeof(ISequentialStream *);
rgBinding[2].pObject = &MyObject;
#else
rgBinding[2].wType = DBTYPE_BYTES;
ulOffset += MAX_BLOB;
rgBinding[2].cbMaxLen = MAX_BLOB;
#endif
// Set init props
for ( ULONG i = 0 ; i < sizeof(rgProperties) / sizeof(rgProperties[0]) ; i++ )
VariantInit(&rgProperties[i].vValue);
// Obtain the provider's clsid
CLSID clsidProv;
hr = CLSIDFromProgID(pwszProgID, &clsidProv);
// Get our initial connection
CoInitialize(NULL);
if (SUCCEEDED(hr))
hr = CoCreateInstance(clsidProv, NULL, CLSCTX_ALL, IID_IDBInitialize,(void **)&pIDBInitialize);
if (SUCCEEDED(hr))
hr = pIDBInitialize->QueryInterface(IID_IDBProperties, (void **)&pIDBProperties);
// DBPROP_INIT_DATASOURCE
if (pwszDataSource) {
rgProperties[cProperties].dwPropertyID = DBPROP_INIT_DATASOURCE;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszDataSource);
cProperties++;
}
// DBPROP_AUTH_USERID
if (pwszUserID) {
rgProperties[cProperties].dwPropertyID = DBPROP_AUTH_USERID;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszUserID);
cProperties++;
}
// DBPROP_AUTH_PASSWORD
if (pwszPassword) {
rgProperties[cProperties].dwPropertyID = DBPROP_AUTH_PASSWORD;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszPassword);
cProperties++;
}
// DBPROP_INIT_PROVIDERSTRING
if (pwszProviderString) {
rgProperties[cProperties].dwPropertyID = DBPROP_INIT_PROVIDERSTRING;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszProviderString);
cProperties++;
}
if (cProperties) {
rgPropSets[cPropSets].cProperties = cProperties;
rgPropSets[cPropSets].rgProperties = rgProperties;
rgPropSets[cPropSets].guidPropertySet = DBPROPSET_DBINIT;
cPropSets++;
}
// Initialize
if (SUCCEEDED(hr))
hr = pIDBProperties->SetProperties(cPropSets, rgPropSets);
if (SUCCEEDED(hr))
hr = pIDBInitialize->Initialize();
if (SUCCEEDED(hr)) {
printf("\tConnected!\r\n");
}
else
printf("Unable to connect\r\n");
// Set fastload prop
if (SUCCEEDED(hr))
hr = SetFastLoadProperty(pIDBInitialize);
if (SUCCEEDED(hr))
hr = pIDBInitialize->QueryInterface(IID_IDBCreateSession, (void **)&pIDBCrtSess);
if (SUCCEEDED(hr))
hr = pIDBCrtSess->CreateSession(NULL, IID_IOpenRowset, (IUnknown **)&pIOpenRowset);
if (SUCCEEDED(hr))
hr = pIOpenRowset->OpenRowset(NULL, &TableID, NULL, IID_IRowsetFastLoad, 0, NULL, (IUnknown **)&pIRowsetFastLoad);
if (SUCCEEDED(hr))
hr = pIRowsetFastLoad->QueryInterface(IID_IAccessor, (void **)&pIAccessor);
if (SUCCEEDED(hr))
hr = pIAccessor->CreateAccessor(DBACCESSOR_ROWDATA, 3, rgBinding, ulOffset, &hAcc, (DBBINDSTATUS *)&rgStatus);
if (SUCCEEDED(hr)) {
pData = (BYTE *)malloc(ulOffset);
for (iRow = 0 ; iRow < MAX_ROWS ; iRow++) {
// Column 1 data
*(DBSTATUS *)(pData + rgBinding[0].obStatus) = DBSTATUS_S_OK;
*(DBLENGTH *)(pData + rgBinding[0].obLength) = 1234567; // Ignored for I4 data
*(LONG *)(pData + rgBinding[0].obValue) = iRow;
// Column 2 data
*(DBSTATUS *)(pData + rgBinding[1].obStatus) = DBSTATUS_S_OK;
*(DBLENGTH *)(pData + rgBinding[1].obLength) = 1234567; // Ignored for I4 data
*(LONG *)(pData + rgBinding[1].obValue) = iRow + 1;
// Column 3 data
*(DBSTATUS *)(pData + rgBinding[2].obStatus) = DBSTATUS_S_OK;
*(DBLENGTH *)(pData + rgBinding[2].obLength) = MAX_BLOB/(iRow + 1); // Not needed for providers that don't require length
#ifdef USE_ISEQSTREAM
// DBLENGTH is used to tell the provider how much BLOB data to expect from the stream, not required
// if provider supports sending data without length
*(ISequentialStream **)(pData+rgBinding[2].obValue) = (ISequentialStream *)pMySeqStream;
pMySeqStream->Init((void *)&bSrcBuf, sizeof(bSrcBuf), MAX_BLOB / (iRow + 1)); // Here we set the size we will let the provider read
pMySeqStream->AddRef(); // The provider releases the object, so we addref it so it doesn't get destructed
#else
memset(pData + rgBinding[2].obValue, 0, MAX_BLOB); // Not strictly necessary
memset(pData + rgBinding[2].obValue, 0x23, MAX_BLOB / (iRow + 1));
#endif
if (SUCCEEDED(hr))
hr = pIRowsetFastLoad->InsertRow(hAcc, pData);
}
}
if (SUCCEEDED(hr))
hr = pIRowsetFastLoad->Commit(TRUE);
if (hAcc)
pIAccessor->ReleaseAccessor(hAcc, NULL);
SAFE_RELEASE(pIDBInitialize);
SAFE_RELEASE(pIDBCrtSess);
SAFE_RELEASE(pIOpenRowset);
SAFE_RELEASE(pIDBCrtCmd);
SAFE_RELEASE(pICmdText);
SAFE_RELEASE(pIAccessor);
SAFE_RELEASE(pIRowsetFastLoad);
SAFE_RELEASE(pIDBProperties);
#ifdef USE_ISEQSTREAM
SAFE_RELEASE(pMySeqStream);
#endif
if (pData)
free(pData);
CoUninitialize();
}
use master
drop table fltest