IROWSETFASTLOAD 및 ISEQUENTIALSTREAM을 사용하여 SQL SERVER에 BLOB 데이터 보내기(Native Client OLE DB)
적용 대상: SQL Server Azure SQL 데이터베이스 Azure SQL Managed Instance Azure Synapse Analytics Analytics Platform System(PDW)
이 예제는 IRowsetFastLoad를 사용하여 다양한 길이의 BLOB 데이터를 행별로 스트리밍하는 방법을 보여 줍니다.
기본적으로 이 예제는 인라인 바인딩을 사용하여 다양한 길이의 BLOB 데이터를 행별로 보내기 위해 IRowsetFastLoad를 사용하는 방법을 보여 줍니다. 인라인 BLOB 데이터는 가용 메모리를 초과해서는 안 됩니다. 이 메서드는 추가 스트림 오버헤드가 없으므로 BLOB 데이터가 몇 메가바이트 미만인 경우 가장 잘 수행됩니다. 수 MB 이상의 데이터인 경우, 특히 하나의 블록으로 사용할 수 없는 데이터인 경우 스트리밍이 더 나은 성능을 제공합니다.
소스 코드에서 #define USE_ISEQSTREAM 주석 처리를 제거하면 샘플에서 ISequentialStream을 사용합니다. 스트림 구현은 예제에 정의되어 있으며 MAX_BLOB만 변경하면 크기에 관계없이 BLOB 데이터를 보낼 수 있습니다. 따라서 스트림 데이터는 메모리 크기에 맞거나 하나의 블록 크기일 필요가 없습니다. IRowsetFastLoad::InsertRow를 사용하여 이 공급자를 호출합니다. IRowsetFastLoad::InsertRow를 사용하는 포인터를 스트림에서 읽을 수 있는 데이터의 양과 함께 데이터 버퍼(rgBinding.obValue 오프셋)에 있는 스트림 구현으로 전달합니다. 일부 공급자는 바인딩이 발생할 때 데이터의 길이를 반드시 알 필요가 없습니다. 이런 경우 바인딩에서 길이가 생략될 수 있습니다.
샘플은 공급자의 스트림 인터페이스를 사용하여 공급자에 데이터를 쓰지 않습니다. 대신 공급자가 데이터를 읽는 데 소모하는 스트림 개체로 포인터를 전달합니다. 일반적으로 Microsoft 공급자(SQLOLEDB 및 SQLNCLI)는 모든 데이터가 처리될 때까지 개체에서 1024바이트 청크로 데이터를 읽습니다. SQLOLEDB나 SQLNCLI 모두 소비자가 공급자의 스트림 개체에 데이터를 쓰도록 허용하기 위한 일체의 구현을 갖고 있지 않습니다. 공급자의 스트림 개체를 통해서 보낼 수 있는 데이터는 길이가 0인 데이터뿐입니다.
소비자가 구현한 ISequentialStream 개체는 행 집합 데이터(IRowsetChange::InsertRow, IRowsetChange::SetData)와 함께 사용하거나 매개 변수를 DBTYPE_IUNKNOWN으로 바인딩하여 매개 변수와 함께 사용할 수 있습니다.
DBTYPE_IUNKNOWN은 바인딩에 데이터 형식으로 지정되었으므로 열이나 대상 매개 변수의 유형과 일치해야 합니다. 행 집합 인터페이스에서 ISequentialStream을 통해 데이터를 보낼 때는 변환할 수 없습니다. 매개 변수의 경우 ICommandWithParameters::SetParameterInfo를 사용하지 말고 다른 형식을 지정하여 강제로 변환해야 합니다. 이렇게 하려면 공급자가 SQL Server로 보내기 전에 모든 BLOB 데이터를 로컬로 캐시하여 변환해야 합니다. 큰 BLOB을 캐싱하고 로컬로 변환해도 성능이 좋지 않습니다.
자세한 내용은 Blob 및 OLE 개체를 참조하세요.
Important
가능하면 Windows 인증을 사용하세요. Windows 인증을 사용할 수 없으면 런타임에 사용자에게 자격 증명을 입력하라는 메시지를 표시합니다. 자격 증명은 파일에 저장하지 않는 것이 좋습니다. 자격 증명을 유지하려면 Win32 crypto API를 사용하여 자격 증명을 암호화해야 합니다.
예제
첫 번째(Transact-SQL) 코드 목록을 실행하여 애플리케이션에서 사용하는 테이블을 만듭니다.
ole32.lib oleaut32.lib를 사용하여 컴파일하고 다음 C++ 코드 목록을 실행합니다. 이 애플리케이션은 컴퓨터의 기본 SQL Server 인스턴스에 연결됩니다. 일부 Windows 운영 체제에서는 (localhost) 또는 (local)을 해당 SQL Server 인스턴스의 이름으로 변경해야 합니다. 명명된 인스턴스에 연결하려면 연결 문자열을 L"(local)"에서 L"(local)\\name"으로 변경합니다. 여기서 name은 명명된 인스턴스입니다. 기본적으로 SQL Server Express는 명명된 인스턴스에 설치됩니다. INCLUDE 환경 변수에 sqlncli.h가 포함된 디렉터리가 포함되어 있는지 확인합니다.
세 번째(Transact-SQL) 코드 목록을 실행하여 애플리케이션에서 사용하는 테이블을 삭제합니다.
use master
create table fltest(col1 int, col2 int, col3 image)
// compile with: ole32.lib oleaut32.lib
#include <windows.h>
#define DBINITCONSTANTS // Must be defined to initialize constants in oledb.h
#define INITGUID
#include <sqloledb.h>
#include <oledb.h>
#include <msdasc.h>
#include <stdio.h>
#include <stdlib.h>
#include <conio.h>
#define MAX_BLOB 200 // For stream binding this can be any size, but for inline it must fit in memory
#define MAX_ROWS 100
#define SAFE_RELEASE(p) { \
if (p) { \
(p)->Release(); \
(p)=NULL; \
} \
}
#ifdef USE_ISEQSTREAM
// ISequentialStream implementation for streaming data
class MySequentialStream : public ISequentialStream {
private:
ULONG m_ulRefCount;
ULONG m_ulBufSize;
ULONG m_ulReadSize;
ULONG m_ulBytesLeft;
ULONG m_ulReadPos;
BYTE * m_pSrcData;
BYTE * m_pReadPtr;
BOOL m_fWasRead;
public:
MySequentialStream() {
m_ulRefCount = 1;
m_ulBufSize = 0;
m_ulReadSize = 0;
m_ulBytesLeft = 0;
m_ulReadPos = 0;
m_pSrcData = NULL;
m_pReadPtr = NULL;
m_fWasRead = FALSE;
}
~MySequentialStream() {}
virtual ULONG STDMETHODCALLTYPE AddRef() {
return ++m_ulRefCount;
}
virtual ULONG STDMETHODCALLTYPE Release() {
--m_ulRefCount;
if (m_ulRefCount == 0) {
delete this;
return 0;
}
return m_ulRefCount;
}
virtual HRESULT STDMETHODCALLTYPE QueryInterface(REFIID riid, void ** ppvObj) {
if (!ppvObj)
return E_INVALIDARG;
else
*ppvObj = NULL;
if (riid != IID_ISequentialStream && riid != IID_IUnknown)
return E_NOINTERFACE;
AddRef();
*ppvObj = this;
return S_OK;
}
HRESULT Init(const void * pSrcData, const ULONG ulBufSize, const ULONG ulReadSize) {
if (NULL == pSrcData)
return E_INVALIDARG;
// Data length must be non-zero
if (0 == ulBufSize)
return E_INVALIDARG;
m_ulBufSize = ulBufSize;
m_ulReadSize = ulReadSize;
m_pSrcData = (BYTE *)pSrcData;
m_pReadPtr = m_pSrcData;
m_ulBytesLeft = m_ulReadSize;
m_ulReadPos = 0;
m_fWasRead = FALSE;
return S_OK;
}
// Can't write data to SQL Server providers (SQLOLEDB/SQLNCLI). Instead, they read from our object.
virtual HRESULT STDMETHODCALLTYPE Write(const void *, ULONG, ULONG * ) {
return E_NOTIMPL;
}
// This implementation simply copies data from the source buffer in whatever size requested.
// But you can do anything here such as reading from a file, reading from a different rowset, stream, etc.
virtual HRESULT STDMETHODCALLTYPE Read(void * pv, ULONG cb, ULONG * pcbRead) {
ULONG ulBytesWritten = 0;
ULONG ulCBToWrite = cb;
ULONG ulCBToCopy;
BYTE * pvb = (BYTE *)pv;
m_fWasRead = TRUE;
if (NULL == m_pSrcData)
return E_FAIL;
if (NULL == pv)
return STG_E_INVALIDPOINTER;
while (ulBytesWritten < ulCBToWrite && m_ulBytesLeft) {
// Make sure we don't write more than our max read size or the size they asked for
ulCBToCopy = min(m_ulBytesLeft, cb);
// Make sure we don't read past the end of the internal buffer
ulCBToCopy = min(m_ulBufSize - m_ulReadPos, ulCBToCopy);
memcpy(pvb, m_pReadPtr + m_ulReadPos, ulCBToCopy);
pvb += ulCBToCopy;
ulBytesWritten += ulCBToCopy;
m_ulBytesLeft -= ulCBToCopy;
cb -= ulCBToCopy;
// Wrap reads around the src buffer
m_ulReadPos += ulCBToCopy;
if (m_ulReadPos >= m_ulBufSize)
m_ulReadPos = 0;
}
if (pcbRead)
*pcbRead = ulBytesWritten;
return S_OK;
}
};
#endif // USE_ISEQSTREAM
HRESULT SetFastLoadProperty(IDBInitialize * pIDBInitialize) {
HRESULT hr = S_OK;
IDBProperties * pIDBProps = NULL;
DBPROP rgProps[1];
DBPROPSET PropSet;
VariantInit(&rgProps[0].vValue);
rgProps[0].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProps[0].colid = DB_NULLID;
rgProps[0].vValue.vt = VT_BOOL;
rgProps[0].dwPropertyID = SSPROP_ENABLEFASTLOAD;
rgProps[0].vValue.boolVal = VARIANT_TRUE;
PropSet.rgProperties = rgProps;
PropSet.cProperties = 1;
PropSet.guidPropertySet = DBPROPSET_SQLSERVERDATASOURCE;
if (SUCCEEDED(hr = pIDBInitialize->QueryInterface(IID_IDBProperties, (LPVOID *)&pIDBProps))) {
hr = pIDBProps->SetProperties(1, &PropSet);
}
VariantClear(&rgProps[0].vValue);
if (pIDBProps)
pIDBProps->Release();
return hr;
}
void wmain() {
// Setup the initialization options
ULONG cProperties = 0;
DBPROP rgProperties[10];
ULONG cPropSets = 0;
DBPROPSET rgPropSets[1];
LPWSTR pwszProgID = L"SQLOLEDB";
LPWSTR pwszDataSource = NULL;
LPWSTR pwszUserID = NULL;
LPWSTR pwszPassword = NULL;
LPWSTR pwszProviderString = L"server=(local);trusted_connection=yes;";
IDBInitialize * pIDBInitialize = NULL;
IDBCreateSession * pIDBCrtSess = NULL;
IOpenRowset * pIOpenRowset = NULL;
IDBCreateCommand * pIDBCrtCmd = NULL;
ICommandText * pICmdText = NULL;
IAccessor * pIAccessor = NULL;
IRowsetFastLoad * pIRowsetFastLoad = NULL;
IDBProperties * pIDBProperties = NULL;
DBBINDING rgBinding[3];
DBBINDSTATUS rgStatus[3];
ULONG ulOffset = 0;
HACCESSOR hAcc = DB_NULL_HACCESSOR;
BYTE * pData = NULL;
ULONG iRow = 0;
LPWSTR pwszTableName = L"fltest";
DBID TableID;
HRESULT hr;
#ifdef USE_ISEQSTREAM
BYTE bSrcBuf[1024]; // A buffer to hold our data for streaming
memset((void *)&bSrcBuf, 0xAB, sizeof(bSrcBuf)); // Stream data value 0xAB
MySequentialStream * pMySeqStream = new MySequentialStream();
DBOBJECT MyObject = {STGM_READ, IID_ISequentialStream}; // NULL pObject implies STGM_READ and IID_IUnknown, but not recommended
#endif
memset(rgBinding, 0, ( sizeof(rgBinding) / sizeof(rgBinding[0])) * sizeof(DBBINDING) );
TableID.eKind = DBKIND_NAME;
TableID.uName.pwszName = pwszTableName;
// Col1
rgBinding[0].iOrdinal = 1;
rgBinding[0].wType = DBTYPE_I4;
rgBinding[0].obStatus = ulOffset;
ulOffset+=sizeof(DBSTATUS);
rgBinding[0].obLength = ulOffset;
ulOffset+=sizeof(DBLENGTH);
rgBinding[0].obValue = ulOffset;
ulOffset += sizeof(LONG);
rgBinding[0].cbMaxLen = sizeof(LONG);
rgBinding[0].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;
rgBinding[0].eParamIO = DBPARAMIO_NOTPARAM;
rgBinding[0].dwMemOwner = DBMEMOWNER_CLIENTOWNED;
//Col2
rgBinding[1].iOrdinal = 2;
rgBinding[1].wType = DBTYPE_I4;
rgBinding[1].obStatus = ulOffset;
ulOffset+=sizeof(DBSTATUS);
rgBinding[1].obLength = ulOffset;
ulOffset+=sizeof(DBLENGTH);
rgBinding[1].obValue = ulOffset;
ulOffset += sizeof(LONG);
rgBinding[1].cbMaxLen = sizeof(LONG);
rgBinding[1].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;
rgBinding[1].eParamIO = DBPARAMIO_NOTPARAM;
rgBinding[1].dwMemOwner = DBMEMOWNER_CLIENTOWNED;
//Col3
rgBinding[2].iOrdinal = 3;
rgBinding[2].obStatus = ulOffset;
ulOffset+=sizeof(DBSTATUS);
rgBinding[2].obLength = ulOffset;
ulOffset+=sizeof(DBLENGTH);
rgBinding[2].obValue = ulOffset;
rgBinding[2].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH; // DBPART_LENGTH not needed for providers that don't require length
rgBinding[2].eParamIO = DBPARAMIO_NOTPARAM;
rgBinding[2].dwMemOwner = DBMEMOWNER_CLIENTOWNED;
#ifdef USE_ISEQSTREAM
rgBinding[2].wType = DBTYPE_IUNKNOWN;
ulOffset += sizeof(ISequentialStream *); // Technically should be sizeof(MySequentialStream *), but who's counting?
rgBinding[2].cbMaxLen = sizeof(ISequentialStream *);
rgBinding[2].pObject = &MyObject;
#else
rgBinding[2].wType = DBTYPE_BYTES;
ulOffset += MAX_BLOB;
rgBinding[2].cbMaxLen = MAX_BLOB;
#endif
// Set init props
for ( ULONG i = 0 ; i < sizeof(rgProperties) / sizeof(rgProperties[0]) ; i++ )
VariantInit(&rgProperties[i].vValue);
// Obtain the provider's clsid
CLSID clsidProv;
hr = CLSIDFromProgID(pwszProgID, &clsidProv);
// Get our initial connection
CoInitialize(NULL);
if (SUCCEEDED(hr))
hr = CoCreateInstance(clsidProv, NULL, CLSCTX_ALL, IID_IDBInitialize,(void **)&pIDBInitialize);
if (SUCCEEDED(hr))
hr = pIDBInitialize->QueryInterface(IID_IDBProperties, (void **)&pIDBProperties);
// DBPROP_INIT_DATASOURCE
if (pwszDataSource) {
rgProperties[cProperties].dwPropertyID = DBPROP_INIT_DATASOURCE;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszDataSource);
cProperties++;
}
// DBPROP_AUTH_USERID
if (pwszUserID) {
rgProperties[cProperties].dwPropertyID = DBPROP_AUTH_USERID;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszUserID);
cProperties++;
}
// DBPROP_AUTH_PASSWORD
if (pwszPassword) {
rgProperties[cProperties].dwPropertyID = DBPROP_AUTH_PASSWORD;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszPassword);
cProperties++;
}
// DBPROP_INIT_PROVIDERSTRING
if (pwszProviderString) {
rgProperties[cProperties].dwPropertyID = DBPROP_INIT_PROVIDERSTRING;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszProviderString);
cProperties++;
}
if (cProperties) {
rgPropSets[cPropSets].cProperties = cProperties;
rgPropSets[cPropSets].rgProperties = rgProperties;
rgPropSets[cPropSets].guidPropertySet = DBPROPSET_DBINIT;
cPropSets++;
}
// Initialize
if (SUCCEEDED(hr))
hr = pIDBProperties->SetProperties(cPropSets, rgPropSets);
if (SUCCEEDED(hr))
hr = pIDBInitialize->Initialize();
if (SUCCEEDED(hr)) {
printf("\tConnected!\r\n");
}
else
printf("Unable to connect\r\n");
// Set fastload prop
if (SUCCEEDED(hr))
hr = SetFastLoadProperty(pIDBInitialize);
if (SUCCEEDED(hr))
hr = pIDBInitialize->QueryInterface(IID_IDBCreateSession, (void **)&pIDBCrtSess);
if (SUCCEEDED(hr))
hr = pIDBCrtSess->CreateSession(NULL, IID_IOpenRowset, (IUnknown **)&pIOpenRowset);
if (SUCCEEDED(hr))
hr = pIOpenRowset->OpenRowset(NULL, &TableID, NULL, IID_IRowsetFastLoad, 0, NULL, (IUnknown **)&pIRowsetFastLoad);
if (SUCCEEDED(hr))
hr = pIRowsetFastLoad->QueryInterface(IID_IAccessor, (void **)&pIAccessor);
if (SUCCEEDED(hr))
hr = pIAccessor->CreateAccessor(DBACCESSOR_ROWDATA, 3, rgBinding, ulOffset, &hAcc, (DBBINDSTATUS *)&rgStatus);
if (SUCCEEDED(hr)) {
pData = (BYTE *)malloc(ulOffset);
for (iRow = 0 ; iRow < MAX_ROWS ; iRow++) {
// Column 1 data
*(DBSTATUS *)(pData + rgBinding[0].obStatus) = DBSTATUS_S_OK;
*(DBLENGTH *)(pData + rgBinding[0].obLength) = 1234567; // Ignored for I4 data
*(LONG *)(pData + rgBinding[0].obValue) = iRow;
// Column 2 data
*(DBSTATUS *)(pData + rgBinding[1].obStatus) = DBSTATUS_S_OK;
*(DBLENGTH *)(pData + rgBinding[1].obLength) = 1234567; // Ignored for I4 data
*(LONG *)(pData + rgBinding[1].obValue) = iRow + 1;
// Column 3 data
*(DBSTATUS *)(pData + rgBinding[2].obStatus) = DBSTATUS_S_OK;
*(DBLENGTH *)(pData + rgBinding[2].obLength) = MAX_BLOB/(iRow + 1); // Not needed for providers that don't require length
#ifdef USE_ISEQSTREAM
// DBLENGTH is used to tell the provider how much BLOB data to expect from the stream, not required
// if provider supports sending data without length
*(ISequentialStream **)(pData+rgBinding[2].obValue) = (ISequentialStream *)pMySeqStream;
pMySeqStream->Init((void *)&bSrcBuf, sizeof(bSrcBuf), MAX_BLOB / (iRow + 1)); // Here we set the size we will let the provider read
pMySeqStream->AddRef(); // The provider releases the object, so we addref it so it doesn't get destructed
#else
memset(pData + rgBinding[2].obValue, 0, MAX_BLOB); // Not strictly necessary
memset(pData + rgBinding[2].obValue, 0x23, MAX_BLOB / (iRow + 1));
#endif
if (SUCCEEDED(hr))
hr = pIRowsetFastLoad->InsertRow(hAcc, pData);
}
}
if (SUCCEEDED(hr))
hr = pIRowsetFastLoad->Commit(TRUE);
if (hAcc)
pIAccessor->ReleaseAccessor(hAcc, NULL);
SAFE_RELEASE(pIDBInitialize);
SAFE_RELEASE(pIDBCrtSess);
SAFE_RELEASE(pIOpenRowset);
SAFE_RELEASE(pIDBCrtCmd);
SAFE_RELEASE(pICmdText);
SAFE_RELEASE(pIAccessor);
SAFE_RELEASE(pIRowsetFastLoad);
SAFE_RELEASE(pIDBProperties);
#ifdef USE_ISEQSTREAM
SAFE_RELEASE(pMySeqStream);
#endif
if (pData)
free(pData);
CoUninitialize();
}
use master
drop table fltest