Mysql
 sql >> Teknologi Basis Data >  >> RDS >> Mysql

Menggunakan Python dan MySQL dalam Proses ETL

Python sangat populer akhir-akhir ini. Karena Python adalah bahasa pemrograman tujuan umum, itu juga dapat digunakan untuk melakukan proses Ekstrak, Transformasi, Muat (ETL). Modul ETL yang berbeda tersedia, tetapi hari ini kami akan tetap menggunakan kombinasi Python dan MySQL. Kami akan menggunakan Python untuk menjalankan prosedur tersimpan dan menyiapkan serta menjalankan pernyataan SQL.

Kami akan menggunakan dua pendekatan yang serupa tetapi berbeda. Pertama, kami akan memanggil prosedur tersimpan yang akan melakukan seluruh pekerjaan, dan setelah itu kami akan menganalisis bagaimana kami dapat melakukan proses yang sama tanpa prosedur tersimpan dengan menggunakan kode MySQL dengan Python.

Siap? Sebelum kita mendalami, mari kita lihat model data – atau model data, karena ada dua di antaranya dalam artikel ini.

Model Data

Kami memerlukan dua model data, satu untuk menyimpan data operasional kami dan yang lainnya untuk menyimpan data pelaporan kami.




Model pertama ditunjukkan pada gambar di atas. Model ini digunakan untuk menyimpan data operasional (langsung) untuk bisnis berbasis langganan. Untuk wawasan lebih lanjut tentang model ini, silakan lihat artikel kami sebelumnya, Membuat DWH, Bagian Satu:Model Data Bisnis Berlangganan.




Memisahkan data operasional dan pelaporan biasanya merupakan keputusan yang sangat bijaksana. Untuk mencapai pemisahan itu, kita perlu membuat gudang data (DWH). Kami sudah melakukannya; modelnya bisa dilihat pada gambar diatas. Model ini juga dijelaskan secara rinci dalam postingan Membuat DWH, Bagian Kedua:Model Data Bisnis Berlangganan.

Terakhir, kita perlu mengekstrak data dari database langsung, mengubahnya, dan memuatnya ke DWH kita. Kami telah melakukan ini menggunakan prosedur tersimpan SQL. Anda dapat menemukan deskripsi tentang apa yang ingin kami capai bersama dengan beberapa contoh kode di Membuat Gudang Data, Bagian 3:Model Data Bisnis Berlangganan.

Jika Anda memerlukan informasi tambahan mengenai DWH, kami sarankan untuk membaca artikel berikut:

  • Skema Bintang
  • Skema Kepingan Salju
  • Skema Bintang vs. Skema Kepingan Salju.

Tugas kita hari ini adalah mengganti prosedur tersimpan SQL dengan kode Python. Kami siap membuat keajaiban Python. Mari kita mulai dengan hanya menggunakan prosedur tersimpan di Python.

Metode 1:ETL Menggunakan Prosedur Tersimpan

Sebelum kita mulai menjelaskan prosesnya, penting untuk disebutkan bahwa kita memiliki dua database di server kita.

subscription_live database digunakan untuk menyimpan data transaksional/live, sedangkan subscription_dwh adalah database pelaporan (DWH) kami.

Kami telah menjelaskan prosedur tersimpan yang digunakan untuk memperbarui tabel dimensi dan fakta. Mereka akan membaca data dari subscription_live database, gabungkan dengan data di subscription_dwh database, dan masukkan data baru ke dalam subscription_dwh basis data. Kedua prosedur ini adalah:

  • p_update_dimensions – Memperbarui tabel dimensi dim_time dan dim_city .
  • p_update_facts – Memperbarui dua tabel fakta, fact_customer_subscribed dan fact_subscription_status .

Jika Anda ingin melihat kode lengkap untuk prosedur ini, baca Membuat Gudang Data, Bagian 3:Model Data Bisnis Berlangganan.

Sekarang kita siap untuk menulis skrip Python sederhana yang akan terhubung ke server dan melakukan proses ETL. Pertama-tama mari kita lihat seluruh skrip (etl_procedures.py ). Kemudian kami akan menjelaskan bagian yang paling penting.

# import MySQL connectorimport mysql.connector# connect to serverconnection =mysql.connector.connect(user='', password='', host='127.0.0.1')print('Connected to database.')cursor =connection.cursor()# Saya memperbarui dimensioncursor.callproc('subscription_dwh.p_update_dimensions')print('Tabel dimensi diperbarui.')# II memperbarui faktacursor.callproc('subscription_dwh.p_update_facts')print('Fakta tabel diperbarui.')# komit &tutup connectioncursor.close()connection.commit()connection.close()print('Terputus dari database.')

etl_procedures.py

Mengimpor Modul dan Menghubungkan ke Database

Python menggunakan modul untuk menyimpan definisi dan pernyataan. Anda dapat menggunakan modul yang ada atau menulis sendiri. Menggunakan modul yang ada akan menyederhanakan hidup Anda karena Anda menggunakan kode yang telah ditulis sebelumnya, tetapi menulis modul Anda sendiri juga sangat berguna. Saat Anda keluar dari interpreter Python dan menjalankannya lagi, Anda akan kehilangan fungsi dan variabel yang telah Anda tentukan sebelumnya. Tentu saja, Anda tidak ingin mengetik kode yang sama berulang-ulang. Untuk menghindarinya, Anda dapat menyimpan definisi Anda dalam modul dan mengimpornya ke Python.

Kembali ke etl_procedures.py . Dalam program kami, kami mulai dengan mengimpor Konektor MySQL:

# impor konektor MySQLimport mysql.connector

Konektor MySQL untuk Python digunakan sebagai driver standar yang terhubung ke server/database MySQL. Anda harus mengunduhnya dan menginstalnya jika sebelumnya Anda belum pernah melakukannya. Selain menghubungkan ke database, ia menawarkan sejumlah metode dan properti untuk bekerja dengan database. Kami akan menggunakan beberapa di antaranya, tetapi Anda dapat memeriksa dokumentasi lengkapnya di sini.

Selanjutnya, kita harus terhubung ke database kita:

# connect to serverconnection =mysql.connector.connect(user='', password='', host='127.0.0.1')print('Connected to database.')cursor =connection .kursor()

Baris pertama akan terhubung ke server (dalam hal ini, saya menghubungkan ke mesin lokal saya) menggunakan kredensial Anda (ganti dan dengan nilai sebenarnya). Saat membuat koneksi, Anda juga dapat menentukan database yang ingin Anda hubungkan, seperti yang ditunjukkan di bawah ini:

koneksi =mysql.connector.connect(user='', password='', host='127.0.0.1', database='')

Saya sengaja menghubungkan hanya ke server dan bukan ke database tertentu karena saya akan menggunakan dua database yang terletak di server yang sama.

Perintah selanjutnya – print – di sini hanya pemberitahuan bahwa kami berhasil terhubung. Meskipun tidak memiliki signifikansi pemrograman, ini dapat digunakan untuk men-debug kode jika terjadi kesalahan dalam skrip.

Baris terakhir di bagian ini adalah:

kursor =connection.cursor()

Cursors adalah struktur handler yang digunakan untuk bekerja dengan data. Kami akan menggunakannya untuk mengambil data dari database (SELECT), tetapi juga untuk memodifikasi data (INSERT, UPDATE, DELETE). Sebelum menggunakan kursor, kita perlu membuatnya. Dan itulah yang dilakukan baris ini.

Prosedur Panggilan

Bagian sebelumnya bersifat umum dan dapat digunakan untuk tugas-tugas terkait database lainnya. Bagian kode berikut ini khusus untuk ETL:memanggil prosedur tersimpan kami dengan cursor.callproc memerintah. Tampilannya seperti ini:

# 1. perbarui dimensioncursor.callproc('subscription_dwh.p_update_dimensions')print('Tabel dimensi diperbarui.')# 2. perbarui factcursor.callproc('subscription_dwh.p_update_facts')print('Tabel fakta diperbarui.') 

Prosedur panggilan sudah cukup jelas. Setelah setiap panggilan, perintah cetak ditambahkan. Sekali lagi, ini hanya memberi kami pemberitahuan bahwa semuanya baik-baik saja.

Berkomitmen dan Tutup

Bagian terakhir dari skrip melakukan perubahan basis data dan menutup semua objek yang digunakan:

# commit &tutup connectioncursor.close()connection.commit()connection.close()print('Terputus dari database.')

Prosedur panggilan sudah cukup jelas. Setelah setiap panggilan, perintah cetak ditambahkan. Sekali lagi, ini hanya memberi kami pemberitahuan bahwa semuanya baik-baik saja.

Komitmen sangat penting di sini; tanpa itu, tidak akan ada perubahan pada database, bahkan jika Anda memanggil prosedur atau mengeksekusi pernyataan SQL.

Menjalankan Skrip

Hal terakhir yang perlu kita lakukan adalah menjalankan skrip kita. Kami akan menggunakan perintah berikut di Python Shell untuk mencapainya:

import osfile_path ='D://python_scripts'os.chdir(file_path)exec(open("etl_procedures.py").read())

Script dijalankan dan semua perubahan dibuat dalam database yang sesuai. Hasilnya dapat dilihat pada gambar di bawah ini.

Metode 2:ETL Menggunakan Python dan MySQL

Pendekatan yang disajikan di atas tidak jauh berbeda dengan pendekatan memanggil prosedur tersimpan secara langsung di MySQL. Satu-satunya perbedaan adalah bahwa sekarang kami memiliki skrip yang akan melakukan seluruh pekerjaan untuk kami.

Kita bisa menggunakan pendekatan lain:meletakkan semuanya di dalam skrip Python. Kami akan menyertakan pernyataan Python, tetapi kami juga akan menyiapkan kueri SQL dan menjalankannya di database. Basis data sumber (langsung) dan basis data tujuan (DWH) sama seperti pada contoh dengan prosedur tersimpan.

Sebelum kita mempelajarinya, mari kita lihat skrip lengkapnya (etl_queries.py ):

from datetime import date# import MySQL connectorimport mysql.connector# connect to serverconnection =mysql.connector.connect(user='', password='', host='127.0.0.1')print ('Tersambung ke database.')# 1. perbarui dimensi# 1.1 perbarui dim_time# tanggal - kemarin kemarin =date.fromordinal(date.today().toordinal()-1)yesterday_str ='"' + str(kemarin) + ' "'# tes apakah tanggal sudah ada di tablecursor =connection.cursor()query =( "SELECT COUNT(*) " "FROM subscription_dwh.dim_time " "WHERE time_date =" + kemarin_str)cursor.execute(query)result =kursor .fetchall()yesterday_subscription_count =int(result[0][0])if kemarin_subscription_count ==0:kemarin_tahun ='YEAR("' + str(kemarin) + '")' kemarin_bulan ='MONTH("' + str(kemarin) ) + '")' kemarin_minggu ='WEEK("' + str(kemarin) + '")' kemarin_mingguan ='WEEKDAY("' + str(kemarin) + '")' query =( "INSERT INTO subscription_dwh.`dim_time `(`tanggal_waktu`, `tahun_waktu`, `bulan_waktu`, `minggu_waktu` , `time_weekday`, `ts`) " " VALUES (" + kemarin_str + ", " + kemarin_tahun + ", " + kemarin_bulan + ", " + kemarin_minggu + ", " + kemarin_hari kerja + ", Sekarang())") kursor .execute(query)# 1.2 perbarui dim_cityquery =( "INSERT INTO subscription_dwh.`dim_city`(`city_name`, `postal_code`, `country_name`, `ts`) " "PILIH city_live.city_name, city_live.postal_code, country_live.country_name , Sekarang() " "DARI subscription_live.city city_live " "INNER GABUNG subscription_live.country country_live ON city_live.country_id =country_live.id " "KIRI GABUNG subscription_dwh.dim_city city_dwh PADA city_live.city_name =city_dwh.city_name DAN city_live.dwh_code =city_dwh.code postal_code AND country_live.country_name =city_dwh.country_name " "WHERE city_dwh.id IS NULL")cursor.execute(query)print('Dimension tables updated.')# 2. perbarui fakta# 2.1 perbarui pelanggan yang berlangganan# hapus data lama untuk same datequery =( "HAPUS subscription_dwh.`fact_customer_subscribed`.* " "FROM subscription_dwh.`fa ct_customer_subscribed` " "INNER JOIN subscription_dwh.`dim_time` ON subscription_dwh.`fact_customer_subscribed`.`dim_time_id` =subscription_dwh.`dim_time`.`id` " "WHERE subscription_dwh.`dim_time`.`time_str` =" + kemarin._str` =" + kemarin._str` execute(query)# insert new dataquery =( "INSERT INTO subscription_dwh.`fact_customer_subscribed`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) " " SELECT city_dwh.id AS dim_ctiy_id, time_dwh.id AS dim_time_id, SUM(CASE WHEN customer_live.active =1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN customer_live.active =0 THEN 1 ELSE 0 END) AS total_inactive, SUM( CASE WHEN customer_live.active =1 AND DATE(customer_live.time_updated) =@time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN customer_live.active =0 AND DATE(customer_live.time_updated) =@time_date THEN 1 ELSE 0 END ) AS daily_canceled, MIN(NOW()) AS ts " "FROM subscription_live.`customer` customer_live " "INNER JOIN subscri ption_live.`city` city_live ON customer_live.city_id =city_live.id " "INNER JOIN subscription_live.`country` country_live ON city_live.country_id =country_live.id " "INNER JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name =city_dwh.live_city .postal_code =city_dwh.postal_code DAN country_live.country_name =city_dwh.country_name " "INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date =" + kemarin_str + " " "GROUP BY city_dwh.id, time_dwh.execut")(query.execut") )# 2.2 perbarui status langganan# hapus data lama untuk kueri tanggal yang sama =( "HAPUS subscription_dwh.`fact_subscription_status`.* " "FROM subscription_dwh.`fact_subscription_status` " "INNER JOIN subscription_dwh.`dim_time` ON subscription_dwh.`fact_subscription_status`.` dim_time_id` =subscription_dwh.`dim_time`.`id` " "WHERE subscription_dwh.`dim_time`.`time_date` =" + kemarin_str)cursor.execute(query)# insert new dataquery =( "INSERT INTO subscription_dwh.`fact _subscription_status`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) " "PILIH city_dwh.id AS dim_ctiy_id, time_dwh.id AS redup subscription_live.active =1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN subscription_live.active =0 THEN 1 ELSE 0 END) AS total_inactive, SUM(CASE WHEN subscription_live.active =1 AND DATE(subscription_live.time_updated) =@ time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN WHEN subscription_live.active =0 AND DATE(subscription_live.time_updated) =@time_date THEN 1 ELSE 0 END) AS daily_canceled, MIN(NOW()) AS ts " "FROM subscription_live .`customer` customer_live " "INNER JOIN subscription_live.`subscription` subscription_live ON subscription_live.customer_id =customer_live.id " "INNER JOIN subscription_live.`city` city_live ON customer_live.city_id =city_live.id " "INNER JOIN subscription_live.`country` country_live ON city_live.country_id =country_live.id " "INNER GABUNG subscription_dwh.dim_city city_dwh PADA city_live.city_name =city_dwh.city_name DAN city_live.postal_code =city_dwh.postal_code DAN country_live.country_name =city_dwh.country_name " "INNER GABUNG subscription_dwh.dim_time time_dwh. "time_strUP +dwh kemarin ON time_ROUP " " OLEH city_dwh.id, time_dwh.id")cursor.execute(query)print('Tabel fakta diperbarui.')# komit &tutup connectioncursor.close()connection.commit()connection.close()print('Terputus dari database .')

etl_queries.py

Mengimpor Modul dan Menghubungkan ke Database

Sekali lagi, kita perlu mengimpor MySQL menggunakan kode berikut:

import mysql.connector

Kami juga akan mengimpor modul datetime, seperti yang ditunjukkan di bawah ini. Kami membutuhkan ini untuk operasi terkait tanggal dengan Python:

dari tanggal impor datetime

Proses untuk menghubungkan ke database sama seperti pada contoh sebelumnya.

Memperbarui Dimensi dim_time

Untuk memperbarui dim_time tabel, kita perlu memeriksa apakah nilainya (untuk kemarin) sudah ada di tabel. Kita harus menggunakan fungsi tanggal Python (bukan SQL) untuk melakukan ini:

# tanggal - kemarin kemarin =date.fromordinal(date.today().toordinal()-1)yesterday_str ='"' + str(kemarin) + '"'

Baris kode pertama akan mengembalikan tanggal kemarin dalam variabel tanggal, sedangkan baris kedua akan menyimpan nilai ini sebagai string. Kami akan membutuhkan ini sebagai string karena kami akan menggabungkannya dengan string lain saat kami membuat kueri SQL.

Selanjutnya, kita perlu menguji apakah tanggal ini sudah dalam dim_time meja. Setelah mendeklarasikan kursor, kami akan menyiapkan kueri SQL. Untuk mengeksekusi kueri, kami akan menggunakan cursor.execute perintah:

# tes apakah tanggal sudah ada di tablecursor =connection.cursor()query =( "SELECT COUNT(*) " "FROM subscription_dwh.dim_time " "WHERE time_date =" + kemarin_str)cursor.execute(query)'" '

Kami akan menyimpan hasil kueri di hasil variabel. Hasilnya akan memiliki 0 atau 1 baris, jadi kita bisa menguji kolom pertama dari baris pertama. Ini akan berisi 0 atau 1. (Ingat, kita dapat memiliki tanggal yang sama hanya sekali dalam tabel dimensi.)

Jika tanggal belum ada di tabel, kami akan menyiapkan string yang akan menjadi bagian dari kueri SQL:

result =cursor.fetchall()yesterday_subscription_count =int(result[0][0])if kemarin_subscription_count ==0:kemarin_year ='YEAR("' + str(kemarin) + '")' kemarin_month ='MONTH( "' + str(kemarin) + '")' kemarin_minggu ='WEEK("' + str(kemarin) + '")' kemarin_mingguan ='WEEKDAY("' + str(kemarin) + '")'

Terakhir, kita akan membuat kueri dan menjalankannya. Ini akan memperbarui dim_time tabel setelah itu dilakukan. Harap perhatikan bahwa saya telah menggunakan jalur lengkap ke tabel, termasuk nama database (subscription_dwh ).

 query =( "INSERT INTO subscription_dwh.`dim_time`(`time_date`, `time_year`, `time_month`, `time_week`, `time_weekday`, `ts`) " " VALUES (" + kemarin_str + ", " + kemarin_tahun + ", " + kemarin_bulan + ", " + kemarin_minggu + ", " + kemarin_hari kerja + ", Sekarang())") cursor.execute(query)

Perbarui Dimensi dim_city

Memperbarui dim_city tabel bahkan lebih sederhana karena kita tidak perlu menguji apa pun sebelum menyisipkan. Kami sebenarnya akan menyertakan pengujian itu dalam kueri SQL.

# 1.2 perbarui dim_cityquery =( "INSERT INTO subscription_dwh.`dim_city`(`city_name`, `postal_code`, `country_name`, `ts`) " "PILIH city_live.city_name, city_live.postal_code, country_live.country_name, Sekarang () " "DARI subscription_live.city city_live " "INNER GABUNG subscription_live.country country_live DI city_live.country_id =country_live.id " "KIRI GABUNG subscription_dwh.dim_city city_dwh PADA city_live.city_name =city_dwh.city_name DAN city_live.postal.codeDAN.postal_kode =city__d country_live.country_name =city_dwh.country_name " "DIMANA city_dwh.id NULL")cursor.execute(query)

Di sini kita menyiapkan eksekusi query SQL. Perhatikan bahwa saya kembali menggunakan jalur lengkap ke tabel, termasuk nama kedua database (subscription_live dan subscription_dwh ).

Memperbarui Tabel Fakta

Hal terakhir yang perlu kita lakukan adalah memperbarui tabel fakta kita. Prosesnya hampir sama dengan memperbarui tabel dimensi:kami menyiapkan kueri dan menjalankannya. Kueri ini jauh lebih kompleks, tetapi sama dengan yang digunakan dalam prosedur tersimpan.

Kami telah menambahkan satu peningkatan dibandingkan dengan prosedur tersimpan:menghapus data yang ada untuk tanggal yang sama di tabel fakta. Ini akan memungkinkan kita untuk menjalankan skrip beberapa kali untuk tanggal yang sama. Pada akhirnya, kita harus melakukan transaksi dan menutup semua objek dan koneksi.

Menjalankan Skrip

Kami memiliki perubahan kecil di bagian ini, yang memanggil skrip yang berbeda:

- import os- file_path ='D://python_scripts'- os.chdir(file_path)- exec(open("etl_queries.py").read())

Karena kami telah menggunakan pesan yang sama dan skrip berhasil diselesaikan, hasilnya sama:

Bagaimana Anda Menggunakan Python di ETL?

Hari ini kita melihat satu contoh melakukan proses ETL dengan skrip Python. Ada cara lain untuk melakukan ini, mis. sejumlah solusi sumber terbuka yang memanfaatkan pustaka Python untuk bekerja dengan database dan melakukan proses ETL. Di artikel berikutnya, kita akan bermain dengan salah satunya. Sementara itu, jangan ragu untuk berbagi pengalaman Anda dengan Python dan ETL.


  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Bagaimana Fungsi OCTET_LENGTH() Bekerja di MySQL

  2. MySQL JATUHKAN KENDALA UNIK

  3. Cara mengosongkan database MySQL

  4. JSON_VALUE() di MySQL

  5. Bagaimana cara menyimpan beberapa opsi dalam satu tabel?