When inserting a lot of data into a SQL Server table, the memory-based bulk copy exposed by the IRowsetFastLoad
interface in Microsoft’s OLE DB providers is the fastest way.
Here are the steps listed in the example above (with some comments added by me):
- Establish a connection to the data source.
- Set the
SQLOLEDB
provider-specific data source propertySSPROP_ENABLEFASTLOAD
toVARIANT_TRUE
. This allows the newly created session to access theIRowsetFastLoad
interface. - Create a session requesting the
IOpenRowset
interface, and - Call
IOpenRowset.OpenRowset
to open a rowset that includes all the rows from the target table: - Do the necessary bindings and create an accessor using
IAccessor.CreateAccessor
: - Set up the memory buffer from which the data will be copied to the table.
- Call
IRowsetFastLoad.InsertRow
to bulk copy the data in to the table. - Call
IRowsetFastLoad.Commit
to commit all changes.
In Delphi, you can simply set up a TADOConnection
component and set its Connected
property to True
:
Connection.Provider := 'SQLOLEDB'; // or 'SQLNCLI11'
Connection.ConnectionString := 'Integrated Security=SSPI;Data Source=localhost;';
Connection.LoginPrompt := False;
Connection.KeepConnection := True;
Connection.Connected := True;
In fact, from my experiments, this step seems to be unnecessary, as the bulk copy works without setting this property, too. (Or perhaps it’s set already by default.)
function OpenFastLoad(Connection: TADOConnection;
const TableName: WideString): IRowsetFastLoad; overload;
var
ConnectionConstruction: ADOConnectionConstruction;
begin
SetProperty(Connection, DBPROPSET_SQLSERVERDATASOURCE, SSPROP_ENABLEFASTLOAD, True);
ConnectionConstruction := Connection.ConnectionObject as ADOConnectionConstruction;
Result := OpenFastLoad(ConnectionConstruction.Get_DSO as IDBCreateSession, TableName);
end;
function OpenFastLoad(const DBCreateSession: IDBCreateSession;
const TableName: WideString): IRowsetFastLoad; overload;
var
OpenRowSet: IOpenRowset;
TableID: TDBID;
begin
OleDbCheck(DBCreateSession.CreateSession(nil, IID_IOpenRowset, IUnknown(OpenRowSet)),
DBCreateSession, IID_IDBCreateSession);
TableID.eKind := DBKIND_NAME;
TableID.uName.pwszName := PWideChar(TableName);
OleDbCheck(OpenRowSet.OpenRowset(nil, @TableID, nil, IID_IRowsetFastLoad, 0, nil,
@Result), OpenRowSet, IID_IOpenRowset);
end;
The bindings are an array of TDBBinding
records (declared in Winapi.OleDb
) describing each inserted column and their offsets in the buffer:
procedure InitializeBinding(Field: TField; var Binding: TDBBinding; var Offset: Integer);
begin
Binding.iOrdinal := Field.FieldNo; // column ordinal position
Binding.wType := FieldTypeToOleDbType(Field.DataType); // column data type
if Field.IsBlob then
Binding.wType := Binding.wType or DBTYPE_BYREF; // pointer to external blob data
Binding.eParamIO := DBPARAMIO_NOTPARAM;
Binding.dwMemOwner := DBMEMOWNER_CLIENTOWNED; // we are releasing the memory
Binding.obLength := Offset; // length field offset (starts with 0 for the first column)
Binding.obStatus := Binding.obLength + SizeOf(DBLENGTH); // status field offset
Binding.obValue := Binding.obStatus + SizeOf(DBSTATUS); // value offset
Binding.dwPart := DBPART_LENGTH or DBPART_STATUS or DBPART_VALUE; // included parts
case Field.DataType of
ftDate:
Binding.cbMaxLen := SizeOf(TDBDate); // OLE DB date
ftTime:
Binding.cbMaxLen := SizeOf(TDBTime); // OLE DB time
ftDateTime, ftTimeStamp:
Binding.cbMaxLen := SizeOf(TDBTimeStamp); // OLE DB timestamp
else
Binding.cbMaxLen := Field.DataSize;
end;
Inc(Offset, SizeOf(TColumnData) + Binding.cbMaxLen - 1); // next column's offset...
Align(Offset); // ...aligned to 8 bytes
end;
...
OleDbCheck(FastLoad.QueryInterface(IID_IAccessor, Accessor), FastLoad, IID_IRowsetFastLoad);
OleDbCheck(Accessor.CreateAccessor(DBACCESSOR_ROWDATA, Dataset.FieldCount, Bindings, BufferSize,
AccessorHandle, StatusCodes), Accessor, IID_IAccessor);
The record buffer is a sequence of TColumnData
records (of variable size):
type
DBLENGTH = ULONGLONG;
PColumnData = ^TColumnData;
TColumnData = record
Length: DBLENGTH; // data length
Status: DBSTATUS; // null or has a value
Data: array[0..0] of Byte; // value data
end;
For each column, fill in the length, status and data fields within the buffer (code simplified):
procedure GetFieldValue(Field: TField; const Binding: TDBBinding; Buffer: Pointer);
var
Column: PColumnData;
begin
Column := Pointer(NativeUInt(Buffer) + Binding.obLength);
if Field.IsNull then
begin
Column^.Status := DBSTATUS_S_ISNULL;
Column^.Length := 0;
end
else
begin
Column^.Status := DBSTATUS_S_OK;
case Field.DataType of
ftDate:
with PDBDate(@Column^.Data[0])^ do
DecodeDate(Field.AsDateTime, Word(year), month, day);
ftTime:
with PDBTime(@Column^.Data[0])^ do
DecodeTime(Field.AsDateTime, hour, minute, second, MSec);
ftDateTime, ftTimeStamp:
with PDBTimeStamp(@Column^.Data[0])^ do
begin
DecodeDate(Field.AsDateTime, Word(year), month, day);
DecodeTime(Field.AsDateTime, hour, minute, second, MSec);
fraction := MSec * 1000000;
end;
else
Field.GetData(@Column^.Data[0], False);
end;
case Field.DataType of
ftString, ftMemo:
Column^.Length := StrLen(PAnsiChar(@Column^.Data[0]));
ftWideString, ftWideMemo:
Column^.Length := StrLen(PWideChar(@Column^.Data[0])) * SizeOf(WideChar);
else
Column^.Length := Field.DataSize;
end;
end;
end;
(Repeat the previous and this step for each row you want to insert.)
Since you explicitly control in your code how the buffers are allocated and populated, this approach offers some ways to optimize for performance especially when inserting large amounts of data. It’s also possible to set up bulk insert sessions inserting into the same table in parallel from multiple threads.
As a starting point, you can find the example source code here.