在 (Native Client OLE DB) 中使用 IROWSETFASTLOAD 和 ISEQUENTIALSTREAM 将 BLOB 数据发送到 SQL SERVER
适用于:SQL Server Azure SQL 数据库 Azure SQL 托管实例 Azure Synapse Analytics Analytics Platform System (PDW)
此示例说明如何使用 IRowsetFastLoad 以流的形式发送每行的变长 BLOB 数据。
默认情况下,此示例说明如何使用 IRowsetFastLoad 通过内联绑定发送每行的变长 BLOB 数据。 内联 BLOB 数据必须能够容纳于可用内存中。 当 BLOB 数据小于几兆字节时,此方法的性能最佳,因为没有额外的流开销。 对于大于几兆字节的数据,尤其是对于不以块的形式提供的数据,以流的形式发送数据具有更佳的性能。
在源代码中,当您取消注释 #define USE_ISEQSTREAM 时,示例将使用 ISequentialStream。 流实现在示例中定义,只需更改 MAX_BLOB 即可发送任意大小的 BLOB 数据。 流数据不必容纳于内存中或以一个块的形式提供。 可以使用 IRowsetFastLoad::InsertRow 来调用此访问接口。 使用 IRowsetFastLoad::InsertRow 将一个指针连同可从流中读取的数据量传递到数据缓冲区(rgBinding.obValue 偏移量)中的流实现。 一些访问接口可能不必知道绑定时的数据长度。 在这种情况下,可以从绑定中忽略此长度。
此示例不使用访问接口的流接口将数据写入访问接口。 而是将一个指针传递到访问接口将用来读取数据的流对象。 通常,Microsoft 访问接口(SQLOLEDB 和 SQLNCLI)将从该对象读取数据(以 1024 字节为一个块),直到所有数据都得到处理。 SQLOLEDB 和 SQLNCLI 都没有完整的实现来允许使用者将数据写入访问接口的流对象。 只有零长度的数据可以通过访问接口的流对象发送。
通过将一个参数绑定为 DBTYPE_IUNKNOWN,可以将使用者实现的 ISequentialStream 对象与行集数据(IRowsetChange::InsertRow、IRowsetChange::SetData)以及参数一起使用。
因为 DBTYPE_IUNKNOWN 在绑定中指定为数据类型,所以它必须与列或目标参数的类型匹配。 当通过 ISequentialStream 从行集接口发送数据时,无法进行转换。 对于参数,应避免使用 ICommandWithParameters::SetParameterInfo,而需要指定一个不同的类型来强制转换;这要求访问接口在本地缓存所有 BLOB 数据,以便在发送到 SQL Server 之前对其进行转换。 在本地缓存并转换大型 BLOB 不能提供良好的性能。
有关详细信息,请参阅 BLOB 对象和 OLE 对象。
重要
请尽可能使用 Windows 身份验证。 如果 Windows 身份验证不可用,请在运行时提示用户输入其凭据。 不要将凭据存储在一个文件中。 如果必须保存凭据,应当用 Win32 crypto API(Win32 加密 API)加密它们。
示例
执行第一个 (Transact-SQL) 代码列表,以创建该应用程序要使用的表。
使用 ole32.lib 和 oleaut32.lib 编译并执行以下 C++ 代码列表。 此应用程序连接到您的计算机上默认的 SQL Server 实例。 在某些 Windows 操作系统上,您需要将 (localhost) 或 (local) 更改为您的 SQL Server 实例的名称。 若要连接到命名实例,请将连接字符串从 L"(local)" 更改为 L"(local)\\name",其中 name 是命名实例。 默认情况下,SQL Server Express 安装在命名实例中。 请确保 INCLUDE 环境变量包含包含 sqlncli.h 的目录。
执行第三个 (Transact-SQL) 代码列表,以删除该应用程序使用的表。
use master
create table fltest(col1 int, col2 int, col3 image)
// compile with: ole32.lib oleaut32.lib
#include <windows.h>
#define DBINITCONSTANTS // Must be defined to initialize constants in oledb.h
#define INITGUID
#include <sqloledb.h>
#include <oledb.h>
#include <msdasc.h>
#include <stdio.h>
#include <stdlib.h>
#include <conio.h>
#define MAX_BLOB 200 // For stream binding this can be any size, but for inline it must fit in memory
#define MAX_ROWS 100
#define SAFE_RELEASE(p) { \
if (p) { \
(p)->Release(); \
(p)=NULL; \
} \
}
#ifdef USE_ISEQSTREAM
// ISequentialStream implementation for streaming data
class MySequentialStream : public ISequentialStream {
private:
ULONG m_ulRefCount;
ULONG m_ulBufSize;
ULONG m_ulReadSize;
ULONG m_ulBytesLeft;
ULONG m_ulReadPos;
BYTE * m_pSrcData;
BYTE * m_pReadPtr;
BOOL m_fWasRead;
public:
MySequentialStream() {
m_ulRefCount = 1;
m_ulBufSize = 0;
m_ulReadSize = 0;
m_ulBytesLeft = 0;
m_ulReadPos = 0;
m_pSrcData = NULL;
m_pReadPtr = NULL;
m_fWasRead = FALSE;
}
~MySequentialStream() {}
virtual ULONG STDMETHODCALLTYPE AddRef() {
return ++m_ulRefCount;
}
virtual ULONG STDMETHODCALLTYPE Release() {
--m_ulRefCount;
if (m_ulRefCount == 0) {
delete this;
return 0;
}
return m_ulRefCount;
}
virtual HRESULT STDMETHODCALLTYPE QueryInterface(REFIID riid, void ** ppvObj) {
if (!ppvObj)
return E_INVALIDARG;
else
*ppvObj = NULL;
if (riid != IID_ISequentialStream && riid != IID_IUnknown)
return E_NOINTERFACE;
AddRef();
*ppvObj = this;
return S_OK;
}
HRESULT Init(const void * pSrcData, const ULONG ulBufSize, const ULONG ulReadSize) {
if (NULL == pSrcData)
return E_INVALIDARG;
// Data length must be non-zero
if (0 == ulBufSize)
return E_INVALIDARG;
m_ulBufSize = ulBufSize;
m_ulReadSize = ulReadSize;
m_pSrcData = (BYTE *)pSrcData;
m_pReadPtr = m_pSrcData;
m_ulBytesLeft = m_ulReadSize;
m_ulReadPos = 0;
m_fWasRead = FALSE;
return S_OK;
}
// Can't write data to SQL Server providers (SQLOLEDB/SQLNCLI). Instead, they read from our object.
virtual HRESULT STDMETHODCALLTYPE Write(const void *, ULONG, ULONG * ) {
return E_NOTIMPL;
}
// This implementation simply copies data from the source buffer in whatever size requested.
// But you can do anything here such as reading from a file, reading from a different rowset, stream, etc.
virtual HRESULT STDMETHODCALLTYPE Read(void * pv, ULONG cb, ULONG * pcbRead) {
ULONG ulBytesWritten = 0;
ULONG ulCBToWrite = cb;
ULONG ulCBToCopy;
BYTE * pvb = (BYTE *)pv;
m_fWasRead = TRUE;
if (NULL == m_pSrcData)
return E_FAIL;
if (NULL == pv)
return STG_E_INVALIDPOINTER;
while (ulBytesWritten < ulCBToWrite && m_ulBytesLeft) {
// Make sure we don't write more than our max read size or the size they asked for
ulCBToCopy = min(m_ulBytesLeft, cb);
// Make sure we don't read past the end of the internal buffer
ulCBToCopy = min(m_ulBufSize - m_ulReadPos, ulCBToCopy);
memcpy(pvb, m_pReadPtr + m_ulReadPos, ulCBToCopy);
pvb += ulCBToCopy;
ulBytesWritten += ulCBToCopy;
m_ulBytesLeft -= ulCBToCopy;
cb -= ulCBToCopy;
// Wrap reads around the src buffer
m_ulReadPos += ulCBToCopy;
if (m_ulReadPos >= m_ulBufSize)
m_ulReadPos = 0;
}
if (pcbRead)
*pcbRead = ulBytesWritten;
return S_OK;
}
};
#endif // USE_ISEQSTREAM
HRESULT SetFastLoadProperty(IDBInitialize * pIDBInitialize) {
HRESULT hr = S_OK;
IDBProperties * pIDBProps = NULL;
DBPROP rgProps[1];
DBPROPSET PropSet;
VariantInit(&rgProps[0].vValue);
rgProps[0].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProps[0].colid = DB_NULLID;
rgProps[0].vValue.vt = VT_BOOL;
rgProps[0].dwPropertyID = SSPROP_ENABLEFASTLOAD;
rgProps[0].vValue.boolVal = VARIANT_TRUE;
PropSet.rgProperties = rgProps;
PropSet.cProperties = 1;
PropSet.guidPropertySet = DBPROPSET_SQLSERVERDATASOURCE;
if (SUCCEEDED(hr = pIDBInitialize->QueryInterface(IID_IDBProperties, (LPVOID *)&pIDBProps))) {
hr = pIDBProps->SetProperties(1, &PropSet);
}
VariantClear(&rgProps[0].vValue);
if (pIDBProps)
pIDBProps->Release();
return hr;
}
void wmain() {
// Setup the initialization options
ULONG cProperties = 0;
DBPROP rgProperties[10];
ULONG cPropSets = 0;
DBPROPSET rgPropSets[1];
LPWSTR pwszProgID = L"SQLOLEDB";
LPWSTR pwszDataSource = NULL;
LPWSTR pwszUserID = NULL;
LPWSTR pwszPassword = NULL;
LPWSTR pwszProviderString = L"server=(local);trusted_connection=yes;";
IDBInitialize * pIDBInitialize = NULL;
IDBCreateSession * pIDBCrtSess = NULL;
IOpenRowset * pIOpenRowset = NULL;
IDBCreateCommand * pIDBCrtCmd = NULL;
ICommandText * pICmdText = NULL;
IAccessor * pIAccessor = NULL;
IRowsetFastLoad * pIRowsetFastLoad = NULL;
IDBProperties * pIDBProperties = NULL;
DBBINDING rgBinding[3];
DBBINDSTATUS rgStatus[3];
ULONG ulOffset = 0;
HACCESSOR hAcc = DB_NULL_HACCESSOR;
BYTE * pData = NULL;
ULONG iRow = 0;
LPWSTR pwszTableName = L"fltest";
DBID TableID;
HRESULT hr;
#ifdef USE_ISEQSTREAM
BYTE bSrcBuf[1024]; // A buffer to hold our data for streaming
memset((void *)&bSrcBuf, 0xAB, sizeof(bSrcBuf)); // Stream data value 0xAB
MySequentialStream * pMySeqStream = new MySequentialStream();
DBOBJECT MyObject = {STGM_READ, IID_ISequentialStream}; // NULL pObject implies STGM_READ and IID_IUnknown, but not recommended
#endif
memset(rgBinding, 0, ( sizeof(rgBinding) / sizeof(rgBinding[0])) * sizeof(DBBINDING) );
TableID.eKind = DBKIND_NAME;
TableID.uName.pwszName = pwszTableName;
// Col1
rgBinding[0].iOrdinal = 1;
rgBinding[0].wType = DBTYPE_I4;
rgBinding[0].obStatus = ulOffset;
ulOffset+=sizeof(DBSTATUS);
rgBinding[0].obLength = ulOffset;
ulOffset+=sizeof(DBLENGTH);
rgBinding[0].obValue = ulOffset;
ulOffset += sizeof(LONG);
rgBinding[0].cbMaxLen = sizeof(LONG);
rgBinding[0].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;
rgBinding[0].eParamIO = DBPARAMIO_NOTPARAM;
rgBinding[0].dwMemOwner = DBMEMOWNER_CLIENTOWNED;
//Col2
rgBinding[1].iOrdinal = 2;
rgBinding[1].wType = DBTYPE_I4;
rgBinding[1].obStatus = ulOffset;
ulOffset+=sizeof(DBSTATUS);
rgBinding[1].obLength = ulOffset;
ulOffset+=sizeof(DBLENGTH);
rgBinding[1].obValue = ulOffset;
ulOffset += sizeof(LONG);
rgBinding[1].cbMaxLen = sizeof(LONG);
rgBinding[1].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;
rgBinding[1].eParamIO = DBPARAMIO_NOTPARAM;
rgBinding[1].dwMemOwner = DBMEMOWNER_CLIENTOWNED;
//Col3
rgBinding[2].iOrdinal = 3;
rgBinding[2].obStatus = ulOffset;
ulOffset+=sizeof(DBSTATUS);
rgBinding[2].obLength = ulOffset;
ulOffset+=sizeof(DBLENGTH);
rgBinding[2].obValue = ulOffset;
rgBinding[2].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH; // DBPART_LENGTH not needed for providers that don't require length
rgBinding[2].eParamIO = DBPARAMIO_NOTPARAM;
rgBinding[2].dwMemOwner = DBMEMOWNER_CLIENTOWNED;
#ifdef USE_ISEQSTREAM
rgBinding[2].wType = DBTYPE_IUNKNOWN;
ulOffset += sizeof(ISequentialStream *); // Technically should be sizeof(MySequentialStream *), but who's counting?
rgBinding[2].cbMaxLen = sizeof(ISequentialStream *);
rgBinding[2].pObject = &MyObject;
#else
rgBinding[2].wType = DBTYPE_BYTES;
ulOffset += MAX_BLOB;
rgBinding[2].cbMaxLen = MAX_BLOB;
#endif
// Set init props
for ( ULONG i = 0 ; i < sizeof(rgProperties) / sizeof(rgProperties[0]) ; i++ )
VariantInit(&rgProperties[i].vValue);
// Obtain the provider's clsid
CLSID clsidProv;
hr = CLSIDFromProgID(pwszProgID, &clsidProv);
// Get our initial connection
CoInitialize(NULL);
if (SUCCEEDED(hr))
hr = CoCreateInstance(clsidProv, NULL, CLSCTX_ALL, IID_IDBInitialize,(void **)&pIDBInitialize);
if (SUCCEEDED(hr))
hr = pIDBInitialize->QueryInterface(IID_IDBProperties, (void **)&pIDBProperties);
// DBPROP_INIT_DATASOURCE
if (pwszDataSource) {
rgProperties[cProperties].dwPropertyID = DBPROP_INIT_DATASOURCE;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszDataSource);
cProperties++;
}
// DBPROP_AUTH_USERID
if (pwszUserID) {
rgProperties[cProperties].dwPropertyID = DBPROP_AUTH_USERID;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszUserID);
cProperties++;
}
// DBPROP_AUTH_PASSWORD
if (pwszPassword) {
rgProperties[cProperties].dwPropertyID = DBPROP_AUTH_PASSWORD;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszPassword);
cProperties++;
}
// DBPROP_INIT_PROVIDERSTRING
if (pwszProviderString) {
rgProperties[cProperties].dwPropertyID = DBPROP_INIT_PROVIDERSTRING;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszProviderString);
cProperties++;
}
if (cProperties) {
rgPropSets[cPropSets].cProperties = cProperties;
rgPropSets[cPropSets].rgProperties = rgProperties;
rgPropSets[cPropSets].guidPropertySet = DBPROPSET_DBINIT;
cPropSets++;
}
// Initialize
if (SUCCEEDED(hr))
hr = pIDBProperties->SetProperties(cPropSets, rgPropSets);
if (SUCCEEDED(hr))
hr = pIDBInitialize->Initialize();
if (SUCCEEDED(hr)) {
printf("\tConnected!\r\n");
}
else
printf("Unable to connect\r\n");
// Set fastload prop
if (SUCCEEDED(hr))
hr = SetFastLoadProperty(pIDBInitialize);
if (SUCCEEDED(hr))
hr = pIDBInitialize->QueryInterface(IID_IDBCreateSession, (void **)&pIDBCrtSess);
if (SUCCEEDED(hr))
hr = pIDBCrtSess->CreateSession(NULL, IID_IOpenRowset, (IUnknown **)&pIOpenRowset);
if (SUCCEEDED(hr))
hr = pIOpenRowset->OpenRowset(NULL, &TableID, NULL, IID_IRowsetFastLoad, 0, NULL, (IUnknown **)&pIRowsetFastLoad);
if (SUCCEEDED(hr))
hr = pIRowsetFastLoad->QueryInterface(IID_IAccessor, (void **)&pIAccessor);
if (SUCCEEDED(hr))
hr = pIAccessor->CreateAccessor(DBACCESSOR_ROWDATA, 3, rgBinding, ulOffset, &hAcc, (DBBINDSTATUS *)&rgStatus);
if (SUCCEEDED(hr)) {
pData = (BYTE *)malloc(ulOffset);
for (iRow = 0 ; iRow < MAX_ROWS ; iRow++) {
// Column 1 data
*(DBSTATUS *)(pData + rgBinding[0].obStatus) = DBSTATUS_S_OK;
*(DBLENGTH *)(pData + rgBinding[0].obLength) = 1234567; // Ignored for I4 data
*(LONG *)(pData + rgBinding[0].obValue) = iRow;
// Column 2 data
*(DBSTATUS *)(pData + rgBinding[1].obStatus) = DBSTATUS_S_OK;
*(DBLENGTH *)(pData + rgBinding[1].obLength) = 1234567; // Ignored for I4 data
*(LONG *)(pData + rgBinding[1].obValue) = iRow + 1;
// Column 3 data
*(DBSTATUS *)(pData + rgBinding[2].obStatus) = DBSTATUS_S_OK;
*(DBLENGTH *)(pData + rgBinding[2].obLength) = MAX_BLOB/(iRow + 1); // Not needed for providers that don't require length
#ifdef USE_ISEQSTREAM
// DBLENGTH is used to tell the provider how much BLOB data to expect from the stream, not required
// if provider supports sending data without length
*(ISequentialStream **)(pData+rgBinding[2].obValue) = (ISequentialStream *)pMySeqStream;
pMySeqStream->Init((void *)&bSrcBuf, sizeof(bSrcBuf), MAX_BLOB / (iRow + 1)); // Here we set the size we will let the provider read
pMySeqStream->AddRef(); // The provider releases the object, so we addref it so it doesn't get destructed
#else
memset(pData + rgBinding[2].obValue, 0, MAX_BLOB); // Not strictly necessary
memset(pData + rgBinding[2].obValue, 0x23, MAX_BLOB / (iRow + 1));
#endif
if (SUCCEEDED(hr))
hr = pIRowsetFastLoad->InsertRow(hAcc, pData);
}
}
if (SUCCEEDED(hr))
hr = pIRowsetFastLoad->Commit(TRUE);
if (hAcc)
pIAccessor->ReleaseAccessor(hAcc, NULL);
SAFE_RELEASE(pIDBInitialize);
SAFE_RELEASE(pIDBCrtSess);
SAFE_RELEASE(pIOpenRowset);
SAFE_RELEASE(pIDBCrtCmd);
SAFE_RELEASE(pICmdText);
SAFE_RELEASE(pIAccessor);
SAFE_RELEASE(pIRowsetFastLoad);
SAFE_RELEASE(pIDBProperties);
#ifdef USE_ISEQSTREAM
SAFE_RELEASE(pMySeqStream);
#endif
if (pData)
free(pData);
CoUninitialize();
}
use master
drop table fltest