SQL Server Bulk Copy using Delphi

When inserting a lot of data into a SQL Server table, the memory-based bulk copy exposed by the IRowsetFastLoad interface in Microsoft’s OLE DB providers is the fastest way.

Here are the steps listed in the example above (with some comments added by me):

  1. Establish a connection to the data source.
  2. In Delphi, you can simply set up a TADOConnection component and set its Connected property to True:

        Connection.Provider := 'SQLOLEDB'; // or 'SQLNCLI11'
    Connection.ConnectionString := 'Integrated Security=SSPI;Data Source=localhost;';
    Connection.LoginPrompt := False;
    Connection.KeepConnection := True;
    Connection.Connected := True;
  3. Set the SQLOLEDB provider-specific data source property SSPROP_ENABLEFASTLOAD to VARIANT_TRUE. This allows the newly created session to access the IRowsetFastLoad interface.
  4. In fact, from my experiments, this step seems to be unnecessary, as the bulk copy works without setting this property, too. (Or perhaps it’s set already by default.)

  5. Create a session requesting the IOpenRowset interface, and
  6. Call IOpenRowset.OpenRowset to open a rowset that includes all the rows from the target table:
  7. function OpenFastLoad(Connection: TADOConnection;
    const TableName: WideString): IRowsetFastLoad; overload;
    var
    ConnectionConstruction: ADOConnectionConstruction;
    begin
    SetProperty(Connection, DBPROPSET_SQLSERVERDATASOURCE, SSPROP_ENABLEFASTLOAD, True);
    ConnectionConstruction := Connection.ConnectionObject as ADOConnectionConstruction;
    Result := OpenFastLoad(ConnectionConstruction.Get_DSO as IDBCreateSession, TableName);
    end;

    function OpenFastLoad(const DBCreateSession: IDBCreateSession;
    const TableName: WideString): IRowsetFastLoad; overload;
    var
    OpenRowSet: IOpenRowset;
    TableID: TDBID;
    begin
    OleDbCheck(DBCreateSession.CreateSession(nil, IID_IOpenRowset, IUnknown(OpenRowSet)),
    DBCreateSession, IID_IDBCreateSession);
    TableID.eKind := DBKIND_NAME;
    TableID.uName.pwszName := PWideChar(TableName);
    OleDbCheck(OpenRowSet.OpenRowset(nil, @TableID, nil, IID_IRowsetFastLoad, 0, nil,
    @Result), OpenRowSet, IID_IOpenRowset);
    end;
  8. Do the necessary bindings and create an accessor using IAccessor.CreateAccessor:
  9. The bindings are an array of TDBBinding records (declared in Winapi.OleDb) describing each inserted column and their offsets in the buffer:

    procedure InitializeBinding(Field: TField; var Binding: TDBBinding; var Offset: Integer);
    begin
    Binding.iOrdinal := Field.FieldNo; // column ordinal position
    Binding.wType := FieldTypeToOleDbType(Field.DataType); // column data type
    if Field.IsBlob then
    Binding.wType := Binding.wType or DBTYPE_BYREF; // pointer to external blob data
    Binding.eParamIO := DBPARAMIO_NOTPARAM;
    Binding.dwMemOwner := DBMEMOWNER_CLIENTOWNED; // we are releasing the memory
    Binding.obLength := Offset; // length field offset (starts with 0 for the first column)
    Binding.obStatus := Binding.obLength + SizeOf(DBLENGTH); // status field offset
    Binding.obValue := Binding.obStatus + SizeOf(DBSTATUS); // value offset
    Binding.dwPart := DBPART_LENGTH or DBPART_STATUS or DBPART_VALUE; // included parts
    case Field.DataType of
    ftDate:
    Binding.cbMaxLen := SizeOf(TDBDate); // OLE DB date
    ftTime:
    Binding.cbMaxLen := SizeOf(TDBTime); // OLE DB time
    ftDateTime, ftTimeStamp:
    Binding.cbMaxLen := SizeOf(TDBTimeStamp); // OLE DB timestamp
    else
    Binding.cbMaxLen := Field.DataSize;
    end;

    Inc(Offset, SizeOf(TColumnData) + Binding.cbMaxLen - 1); // next column's offset...
    Align(Offset); // ...aligned to 8 bytes
    end;

    ...
    OleDbCheck(FastLoad.QueryInterface(IID_IAccessor, Accessor), FastLoad, IID_IRowsetFastLoad);
    OleDbCheck(Accessor.CreateAccessor(DBACCESSOR_ROWDATA, Dataset.FieldCount, Bindings, BufferSize,
    AccessorHandle, StatusCodes), Accessor, IID_IAccessor);

  10. Set up the memory buffer from which the data will be copied to the table.
  11. The record buffer is a sequence of TColumnData records (of variable size):

    type
    DBLENGTH = ULONGLONG;
    PColumnData = ^TColumnData;
    TColumnData = record
    Length: DBLENGTH; // data length
    Status: DBSTATUS; // null or has a value
    Data: array[0..0] of Byte; // value data
    end;

    For each column, fill in the length, status and data fields within the buffer (code simplified):

    procedure GetFieldValue(Field: TField; const Binding: TDBBinding; Buffer: Pointer);
    var
    Column: PColumnData;
    begin
    Column := Pointer(NativeUInt(Buffer) + Binding.obLength);
    if Field.IsNull then
    begin
    Column^.Status := DBSTATUS_S_ISNULL;
    Column^.Length := 0;
    end
    else
    begin
    Column^.Status := DBSTATUS_S_OK;
    case Field.DataType of
    ftDate:
    with PDBDate(@Column^.Data[0])^ do
    DecodeDate(Field.AsDateTime, Word(year), month, day);
    ftTime:
    with PDBTime(@Column^.Data[0])^ do
    DecodeTime(Field.AsDateTime, hour, minute, second, MSec);
    ftDateTime, ftTimeStamp:
    with PDBTimeStamp(@Column^.Data[0])^ do
    begin
    DecodeDate(Field.AsDateTime, Word(year), month, day);
    DecodeTime(Field.AsDateTime, hour, minute, second, MSec);
    fraction := MSec * 1000000;
    end;
    else
    Field.GetData(@Column^.Data[0], False);
    end;
    case Field.DataType of
    ftString, ftMemo:
    Column^.Length := StrLen(PAnsiChar(@Column^.Data[0]));
    ftWideString, ftWideMemo:
    Column^.Length := StrLen(PWideChar(@Column^.Data[0])) * SizeOf(WideChar);
    else
    Column^.Length := Field.DataSize;
    end;
    end;
    end;
  12. Call IRowsetFastLoad.InsertRow to bulk copy the data in to the table.
  13. (Repeat the previous and this step for each row you want to insert.)

  14. Call IRowsetFastLoad.Commit to commit all changes.

Since you explicitly control in your code how the buffers are allocated and populated, this approach offers some ways to optimize for performance especially when inserting large amounts of data. It’s also possible to set up bulk insert sessions inserting into the same table in parallel from multiple threads.
As a starting point, you can find the example source code here.

Leave a Reply

Your email address will not be published.