Sqlserver
 sql >> Teknologi Basis Data >  >> RDS >> Sqlserver

Menerapkan Beban Tambahan menggunakan Ubah Pengambilan Data di SQL Server

Artikel ini akan menarik bagi mereka yang sering harus berurusan dengan integrasi data.

Pengantar

Asumsikan bahwa ada database di mana pengguna selalu mengubah data (memperbarui atau menghapus). Mungkin, database ini digunakan oleh aplikasi besar yang tidak memungkinkan untuk mengubah struktur tabel. Tugasnya adalah memuat data dari database ini ke database lain di server yang berbeda dari waktu ke waktu. Cara paling sederhana untuk mengatasi masalah ini adalah dengan memuat data baru dari database sumber ke database target dengan pembersihan awal database target. Anda dapat menggunakan metode ini selama waktu untuk memuat data dapat diterima dan tidak melebihi tenggat waktu yang telah ditentukan. Bagaimana jika dibutuhkan beberapa hari untuk memuat data? Selain itu, saluran komunikasi yang tidak stabil menyebabkan situasi ketika beban data berhenti dan dimulai ulang. Jika Anda menghadapi kendala ini, saya sarankan untuk mempertimbangkan salah satu algoritme 'pemuatan ulang data'. Artinya, hanya modifikasi data yang terjadi sejak pemuatan terakhir.

CDC

Di SQL Server 2008, Microsoft memperkenalkan mekanisme pelacakan data yang disebut Change Data Capture (CDC). Secara garis besar, tujuan dari mekanisme ini adalah memungkinkan CDC untuk tabel database apa pun akan membuat tabel sistem dalam database yang sama dengan nama yang mirip dengan tabel aslinya (skemanya adalah sebagai berikut:'cdc' sebagai awalan ditambah dengan nama skema lama ditambah ”_” dan akhiran “_CT”. Misalnya tabel aslinya adalah dbo.Contoh, maka tabel sistem akan disebut cdc.dbo_Example_CT). Ini akan menyimpan semua data yang telah dimodifikasi.

Sebenarnya, untuk menggali lebih dalam di CDC, perhatikan contohnya. Tetapi pertama-tama, pastikan bahwa Agen SQL yang menggunakan CDC berfungsi pada contoh pengujian SQL Server.

Selain itu, kita akan mempertimbangkan skrip yang membuat database dan tabel pengujian, mengisi tabel ini dengan data, dan mengaktifkan CDC untuk tabel ini.

Untuk memahami dan menyederhanakan tugas, kita akan menggunakan satu instance SQL Server tanpa mendistribusikan database sumber dan target ke server yang berbeda.

gunakan mastergo-- buat database sumberjika tidak ada (pilih * dari sys.databases di mana name ='db_src_cdc') buat database db_src_cdcgouse db_src_cdcgo-- aktifkan CDC jika dinonaktifkanjika tidak ada (pilih * dari sys.databases di mana nama =db_name() dan is_cdc_enabled=1) exec sys.sp_cdc_enable_dbgo-- buat peran untuk tabel dengan CDCif tidak ada(pilih * dari sys.sysusers di mana name ='CDC_Reader' dan issqlrole=1) buat peran CDC_Readergo-- buat tableif object_id('dbo.Example','U') is null create table dbo.Example ( ID int identity constraint PK_Example primary key, Title varchar(200) not null )go-- isi tableinsert dbo.Contoh (Title) values( 'One'),('Two'),('Three'),('Four'),('Five');go-- aktifkan CDC untuk tabeljika tidak ada (pilih * dari sys.tables di mana is_tracked_by_cdc =1 dan nama ='Contoh') exec sys.sp_cdc_enable_table @source_schema ='dbo', @source_name ='Contoh', @role_name ='CDC_Reader'go-- isi tabel dengan beberapa data. Kami akan mengubah atau menghapus somethingupdate dbo.Contohset Judul =reverse(Judul)di mana ID di (2,3,4);hapus dari dbo.Contoh di mana ID di (1,2);set identity_insert dbo.Contoh di;masukkan dbo. Contoh (ID, Judul) nilai(1,'One'),(6,'Six');set identity_insert dbo.Contoh off;go

Sekarang, mari kita lihat apa yang kita miliki setelah menjalankan skrip ini di tabel dbo.Example dan cdc.dbo_Example_CT (perlu dicatat bahwa CDC tidak sinkron. Data diisi ke dalam tabel tempat pelacakan perubahan disimpan setelah jangka waktu tertentu ).

pilih * dari dbo.Contoh;
Judul ID-------------------------- 1 Satu 3 eerhT 4 ruoF 5 Lima 6 Enam
pilih row_number() over ( partisi menurut ID diurutkan berdasarkan __$start_lsn desc, __$seqval desc ) sebagai __$rn, *from cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operation __$update_mask ID Judul------ --------------------- - ----------- ---------------------- ------------ ---- ------------ --- ----------- 1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 Satu 2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0x03 1 Satu 1 0x0000003A000000560006 NULL 0x0000003A 000000560005 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 Empat 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF 1 0x0000003A000000580005 NULL 0x0000003A000000580004 2 0x03

Pertimbangkan secara rinci struktur tabel di mana pelacakan perubahan disimpan. Field __ $start_lsn dan __ $seqval masing-masing adalah LSN (nomor urut log dalam database) dan nomor transaksi dalam transaksi. Ada properti penting di bidang ini, yaitu, kami dapat memastikan bahwa rekaman dengan LSN lebih tinggi akan dilakukan nanti. Karena properti ini, kita dapat dengan mudah mendapatkan status terbaru dari setiap record dalam kueri, memfilter pilihan kita berdasarkan kondisi – di mana __ $ rn =1.

Kolom __$operation berisi kode transaksi:

  • 1 – catatan dihapus
  • 2 – rekaman dimasukkan
  • 3, 4 – catatan diperbarui. Data lama sebelum update adalah 3, data baru adalah 4.

Selain bidang layanan dengan awalan «__$», bidang tabel asli sepenuhnya diduplikasi. Informasi ini cukup bagi kami untuk melanjutkan ke beban tambahan.

Menyiapkan database untuk memuat data

Buat tabel di database target pengujian kami, tempat data akan dimuat, serta tabel tambahan untuk menyimpan data tentang log pemuatan.

gunakan mastergo-- buat database target jika tidak ada (pilih * dari sys.databases where name ='db_dst_cdc') create database db_dst_cdcgouse db_dst_cdcgo-- buat tabelif object_id('dbo.Example','U') adalah null create table dbo.Example ( ID int constraint PK_Example primary key, Title varchar(200) not null )go-- buat tabel untuk menyimpan beban logif object_id('dbo.log_cdc','U') is null create table dbo .log_cdc ( table_name nvarchar(512) bukan null, dt datetime bukan null default getdate(), lsn binary(10) bukan null default(0x0), kendala pk_log_cdc primary key (table_name,dt desc) )go

Saya ingin menarik perhatian Anda ke bidang tabel LOG_CDC:

  • TABLE_NAME menyimpan informasi tentang tabel apa yang dimuat (mungkin untuk memuat beberapa tabel di masa mendatang, dari database yang berbeda atau bahkan dari server yang berbeda; format tabelnya adalah 'SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME'
  • DT adalah bidang tanggal dan waktu pemuatan, yang opsional untuk beban tambahan. Namun, ini akan berguna untuk mengaudit pemuatan.
  • LSN – setelah tabel dimuat, kita perlu menyimpan informasi tentang tempat untuk memulai pemuatan berikutnya, jika diperlukan. Oleh karena itu, setelah setiap pemuatan, kami menambahkan yang terbaru (maksimum) __ $ start_lsn ke dalam kolom ini.

Algoritma untuk memuat data

Seperti dijelaskan di atas, dengan menggunakan kueri, kita bisa mendapatkan status tabel terbaru dengan bantuan fungsi jendela. Jika kita mengetahui LSN dari beban terakhir, saat berikutnya kita memuat, kita dapat memfilter dari sumber semua data, yang perubahannya lebih tinggi dari LSN yang disimpan, jika ada setidaknya satu pemuatan sebelumnya yang lengkap:

dengan incr_Example as( pilih row_number() over ( partisi menurut ID diurutkan berdasarkan __$start_lsn desc, __$seqval desc ) sebagai __$rn, * from db_src_cdc.cdc.dbo_Example_CT di mana __$operasi <> 3 dan __$ start_lsn> @lsn)pilih * dari incr_Example

Kemudian, kita bisa mendapatkan semua catatan untuk beban lengkap, jika LSN beban tidak disimpan:

dengan incr_Example as( pilih row_number() over ( partisi menurut ID diurutkan berdasarkan __$start_lsn desc, __$seqval desc ) sebagai __$rn, * from db_src_cdc.cdc.dbo_Example_CT di mana __$operasi <> 3 dan __$ start_lsn> @lsn), full_Example as( pilih * dari db_src_cdc.dbo.Contoh di mana @lsn adalah null)pilih ID, Judul, __$operasidari incr_Examplewhere __$rn =1union allselect ID, Title, 2 as __$operationfrom full_Example 

Jadi, tergantung pada nilai @LSN, kueri ini akan menampilkan semua perubahan terbaru (melewati yang sementara) dengan status Dihapus atau tidak, atau semua data dari tabel asli, menambahkan status 2 (catatan baru) – bidang ini hanya digunakan untuk menyatukan dua pilihan. Dengan query ini, kita dapat dengan mudah mengimplementasikan full load atau reload menggunakan perintah MERGE (dimulai dengan versi SQL 2008).

Untuk menghindari kemacetan yang dapat membuat proses alternatif dan memuat data yang cocok dari tabel yang berbeda (di masa depan, kami akan memuat beberapa tabel dan, mungkin, mungkin ada hubungan relasional di antara mereka), saya sarankan menggunakan snapshot DB pada database sumber ( fitur SQL 2008 lainnya).

Teks lengkap dari pemuatan adalah sebagai berikut:

[expand title="Kode"]

/* Algoritma pemuatan data*/-- buat snapshot database jika ada (pilih * dari sys.databases where name ='db_src_cdc_ss' ) drop database db_src_cdc_ss;deklarasikan @query nvarchar(max);pilih @query =N' buat database db_src_cdc_ss pada ( name =N'''+name+ ''', filename =N'''+[filename]+'.ss'' ) sebagai snapshot dari db_src_cdc'from db_src_cdc.sys.sysfiles di mana groupid =1; exec ( @query );-- baca LSN dari loaddeclare sebelumnya @lsn binary(10) =(pilih max(lsn) dari db_dst_cdc.dbo.log_cdc dimana table_name ='localhost.db_src_cdc.dbo.Example');-- clear sebuah tabel sebelum loadif lengkap @lsn is null truncate table db_dst_cdc.dbo.Example;-- load processwith incr_Example as( select row_number() over ( partisi menurut ID diurutkan berdasarkan __$start_lsn desc, __$seqval desc ) sebagai __$rn , * from db_src_cdc_ss.cdc.dbo_Example_CT di mana __$operation <> 3 dan __$start_lsn> @lsn), full_Example as( pilih * dari db_src_cdc_ss.dbo.Contoh di mana @lsn adalah null), cte_Example as Judul, __$operasi dari incr_Contoh di mana __$rn =1 gabungan semua ID pilih, Judul, 2 sebagai __$operasi dari contoh_penuh)gabungkan db_dst_cdc.dbo.Contoh sebagai trg menggunakan cte_Contoh sebagai src pada trg.ID=src.IDketika cocok dan __$operasi =1 lalu hapus bila cocok dan __$operasi <> 1 lalu perbarui set trg.Title =src.Titlebila tidak cocok dengan target dan __$operasi <> 1 lalu masukkan nilai (ID, Judul) (src.ID, src .Title);-- tandai akhir dari proses pemuatan dan LSNinsert terbaru db_dst_cdc.dbo.log_cdc (nama_tabel, lsn)nilai ('localhost.db_src_cdc.dbo.Contoh', isnull((pilih maks(__$start_lsn) dari db_src_cdc_ss.cdc.dbo_Example_CT),0))-- hapus snapshot database jika ada (pilih * dari sys.databases where name ='db_src_cdc_ss' ) drop database db_src_cdc_ss

[/expand]


  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Jalankan prosedur tersimpan menggunakan kerangka entitas

  2. SQL Server loop - bagaimana cara mengulang serangkaian catatan

  3. Ubah 'waktu' menjadi 'datetime2' di SQL Server (Contoh T-SQL)

  4. INSERT INTO @TABLE EXEC @query dengan SQL Server 2000

  5. Perilaku Rencana Kueri Tabel Temporal SQL Server 2016