Mysql
 sql >> Teknologi Basis Data >  >> RDS >> Mysql

Menggunakan Python dan MySQL dalam Proses ETL:Menggunakan Python dan SQLAlchemy

Dalam dua artikel sebelumnya dari seri ini, kami membahas cara menggunakan Python dan SQLAlchemy untuk melakukan proses ETL. Hari ini kita akan melakukan hal yang sama, tapi kali ini menggunakan Python dan SQL Alchemy tanpa perintah SQL dalam format tekstual. Ini akan memungkinkan kita untuk menggunakan SQLAlchemy terlepas dari mesin database yang terhubung dengan kita. Jadi, mari kita mulai.

Hari ini kita akan membahas bagaimana melakukan proses ETL menggunakan Python dan SQLAlchemy. Kami akan membuat skrip untuk mengekstrak data harian dari database operasional kami, mengubahnya, dan kemudian memuatnya ke gudang data kami.

Ini adalah artikel ketiga dalam seri ini. Jika Anda belum membaca dua artikel pertama (Menggunakan Python dan MySQL dalam Proses ETL dan SQLAlchemy), saya sangat menganjurkan Anda untuk melakukannya sebelum melanjutkan.

Seluruh seri ini merupakan kelanjutan dari seri gudang data kami:

  • Membuat DWH, Bagian Satu:Model Data Bisnis Berlangganan
  • Membuat DWH, Bagian Kedua:Model Data Bisnis Berlangganan
  • Membuat Gudang Data, Bagian 3:Model Data Bisnis Berlangganan

Oke, sekarang mari kita mulai topik hari ini. Pertama, mari kita lihat model datanya.

Model Data



Model data database operasional (langsung)




Model data DWH


Ini adalah dua model data yang akan kita gunakan. Untuk info lebih lanjut tentang gudang data (DWH), lihat artikel berikut:

  • Skema Bintang
  • Skema Kepingan Salju
  • Skema Bintang vs. Skema Kepingan Salju

Mengapa SQLAlchemy?

Seluruh ide di balik SQLAlchemy adalah bahwa setelah kita mengimpor database, kita tidak memerlukan kode SQL yang khusus untuk mesin database terkait. Sebagai gantinya, kita dapat mengimpor objek ke SQLAlchemy dan menggunakan sintaks SQLAlchemy untuk pernyataan. Itu akan memungkinkan kita untuk menggunakan bahasa yang sama, apa pun mesin database yang terhubung dengan kita. Keuntungan utama di sini adalah bahwa pengembang tidak perlu mengurus perbedaan antara mesin database yang berbeda. Program SQLAlchemy Anda akan bekerja sama persis (dengan sedikit perubahan) jika Anda bermigrasi ke mesin database yang berbeda.

Saya telah memutuskan untuk hanya menggunakan perintah SQLAlchemy dan daftar Python untuk berkomunikasi ke penyimpanan sementara dan antara database yang berbeda. Alasan utama di balik keputusan ini adalah 1) daftar Python terkenal, dan 2) kodenya dapat dibaca oleh mereka yang tidak memiliki keterampilan Python.

Ini bukan untuk mengatakan bahwa SQLAlchemy sempurna. Ini memiliki batasan tertentu, yang akan kita bahas nanti. Untuk saat ini, mari kita lihat kode di bawah ini:

Menjalankan skrip dan hasilnya

Ini adalah perintah Python yang digunakan untuk memanggil skrip kita. Script memeriksa data dalam database operasional, membandingkan nilai dengan DWH, dan mengimpor nilai baru. Dalam contoh ini, kami memperbarui nilai dalam dua tabel dimensi dan satu tabel fakta; skrip mengembalikan output yang sesuai. Seluruh skrip ditulis sehingga Anda dapat menjalankannya beberapa kali sehari. Ini akan menghapus data "lama" untuk hari itu dan menggantinya dengan yang baru.

Mari kita analisis seluruh skrip, mulai dari atas.

Mengimpor SQLAlchemy

Hal pertama yang perlu kita lakukan adalah mengimpor modul yang akan kita gunakan dalam skrip. Biasanya, Anda akan mengimpor modul saat menulis skrip. Dalam kebanyakan kasus, Anda tidak akan tahu persis modul mana yang Anda perlukan di awal.

from datetime import date

# import SQLAlchemy
from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case

Kami telah mengimpor datetime Python modul, yang memberi kita kelas yang bekerja dengan tanggal.

Selanjutnya, kita memiliki sqlalchemy modul. Kami tidak akan mengimpor seluruh modul, hanya hal-hal yang kami butuhkan – beberapa khusus untuk SQLAlchemy (create_engine , MetaData , Table ), beberapa bagian pernyataan SQL (select , and_ , case ), dan func , yang memungkinkan kita menggunakan fungsi seperti count() dan jumlah() .

Menghubungkan ke Basis Data

Kita harus terhubung ke dua database di server kita. Kami dapat terhubung ke lebih banyak database (MySQL, SQL Server, atau lainnya) dari server yang berbeda jika diperlukan. Dalam hal ini, kedua database tersebut adalah database MySQL dan disimpan di mesin lokal saya.

# connect to databases
engine_live = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_live')
connection_live = engine_live.connect()
engine_dwh = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_dwh')
connection_dwh = engine_dwh.connect()

metadata = MetaData(bind=None)

Kami telah membuat dua mesin dan dua koneksi. Saya tidak akan membahas detailnya di sini karena kami sudah menjelaskan bagian ini di artikel sebelumnya.

Memperbarui dim_time Dimensi

Tujuan:Masukkan tanggal kemarin jika belum dimasukkan ke dalam tabel.

Dalam skrip kami, kami akan memperbarui tabel dua dimensi dengan nilai baru. Sisanya mengikuti pola yang sama, jadi kita hanya akan membahas ini sekali; kita tidak perlu menuliskan kode yang hampir sama beberapa kali lagi.

Idenya sangat sederhana. Kami akan selalu menjalankan skrip untuk memasukkan data baru untuk kemarin. Oleh karena itu, kita perlu memeriksa apakah tanggal tersebut dimasukkan ke dalam tabel dimensi. Jika sudah ada, kami tidak akan melakukan apa-apa; jika tidak, kami akan menambahkannya. Mari kita lihat kode untuk memperbarui dim_time tabel.

Pertama, kami akan memeriksa apakah tanggalnya ada. Jika tidak ada, kami akan menambahkannya. Kita mulai dengan menyimpan tanggal kemarin dalam sebuah variabel. Dengan Python, Anda melakukannya dengan cara ini:

yesterday = date.fromordinal(date.today().toordinal()-1)
yesterday_str = str(yesterday)

Baris pertama mengambil tanggal saat ini, mengubahnya menjadi nilai numerik, mengurangi 1 dari nilai itu, dan mengubah nilai numerik itu kembali menjadi tanggal (kemarin =hari ini – 1 ). Baris kedua menyimpan tanggal dalam format tekstual.

Selanjutnya, kita akan menguji apakah tanggal sudah ada di database:

table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh)
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str)
result = connection_dwh.execute(stmt).fetchall()
date_exists = len(result)

Setelah memuat tabel, kami akan menjalankan kueri yang akan menampilkan semua baris dari tabel dimensi dengan nilai waktu/tanggal sama dengan kemarin. Hasilnya bisa memiliki 0 (tidak ada tanggal seperti itu di tabel) atau 1 baris (tanggal sudah ada di tabel).

Jika tanggal belum ada di tabel, kita akan menggunakan perintah insert() untuk menambahkannya:

if date_exists == 0:
  print("New value added.")
  stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday())
  connection_dwh.execute(stmt)
else:
  print("No new values.")

Satu hal baru di sini yang ingin saya tunjukkan adalah penggunaan. .year , .month , .isocalendar()[1] , dan .weekday untuk mendapatkan bagian tanggal.

Memperbarui dim_city Dimensi

Tujuan:Memasukkan kota baru jika ada (yaitu membandingkan daftar kota di database langsung dengan daftar kota di DWH dan menambahkan yang belum ada).

Memperbarui dim_time dimensinya cukup sederhana. Kami hanya menguji apakah tanggal ada di tabel dan memasukkannya jika belum ada di sana. Untuk menguji nilai dalam database DWH, kami menggunakan variabel Python (kemarin ). Kami akan menggunakan proses itu lagi, tapi kali ini dengan daftar.

Karena tidak ada cara mudah untuk menggabungkan tabel dari database yang berbeda dalam satu kueri SQLAlchemy, kami tidak dapat menggunakan pendekatan yang diuraikan dalam Bagian 1 dari seri ini. Oleh karena itu, kita memerlukan objek untuk menyimpan nilai yang diperlukan untuk berkomunikasi antara dua database ini. Saya memutuskan untuk menggunakan daftar, karena daftar tersebut umum dan berfungsi.

Pertama, kita akan memuat country dan city tabel dari database langsung ke objek yang relevan.

# dim_city
print("\nUpdating... dim_city")
table_city = Table('city', metadata, autoload = True, autoload_with = engine_live)
table_country = Table('country', metadata, autoload = True, autoload_with = engine_live)
table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)

Selanjutnya, kita akan memuat dim_city tabel dari DWH ke dalam daftar:

# load whole dwh table in the list
stmt = select([table_dim_city]);
table_dim_city_list = connection_dwh.execute(stmt).fetchall()

Kemudian kita akan melakukan hal yang sama untuk nilai dari database langsung. Kami akan bergabung dengan tabel country dan city jadi kami memiliki semua data yang dibutuhkan dalam daftar ini:

# load all live values in the list
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\
	.select_from(table_city\
	.join(table_country))
table_city_list = connection_live.execute(stmt).fetchall()

Sekarang kita akan mengulang daftar yang berisi data dari database langsung. Untuk setiap catatan, kami akan membandingkan nilai (city_name , postal_code , dan country_name ). Jika kami tidak menemukan nilai tersebut, kami akan menambahkan catatan baru ke dalam dim_city tabel.

# loop through live_db table
# for each record test if it is missing in the dwh table
new_values_added = 0
for city in table_city_list:
	id = -1;
	for dim_city in table_dim_city_list:
		if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]:
			id = dim_city[0]
	if id == -1:
		stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2])
		connection_dwh.execute(stmt)
		new_values_added = 1
if new_values_added == 0:
	print("No new values.")
else:
	print("New value(s) added.")

Untuk menentukan apakah nilainya sudah ada di DWH, kami menguji kombinasi atribut yang harus unik. (Kunci utama dari database langsung tidak banyak membantu kami di sini.) Kami dapat menggunakan kode serupa untuk memperbarui kamus lain. Ini bukan solusi terbaik, tetapi masih merupakan solusi yang cukup elegan. Dan itu akan melakukan apa yang kita butuhkan.

Memperbarui fakta_pelanggan_berlangganan Tabel

Tujuan:Jika kita memiliki data lama untuk tanggal kemarin, hapus dulu. Tambahkan data kemarin ke dalam DWH – terlepas dari apakah kita telah menghapus sesuatu di langkah sebelumnya atau tidak.

Setelah memperbarui semua tabel dimensi, kita harus memperbarui tabel fakta. Dalam skrip kami, kami hanya akan memperbarui satu tabel fakta. Alasannya sama seperti di bagian sebelumnya:memperbarui tabel lain akan mengikuti pola yang sama, jadi kami kebanyakan akan mengulangi kodenya.

Sebelum memasukkan nilai dalam tabel fakta, kita perlu mengetahui nilai kunci terkait dari tabel dimensi. Untuk melakukannya, kami akan memuat kembali dimensi ke dalam daftar dan membandingkannya dengan nilai dari database langsung.

Hal pertama yang akan kita lakukan adalah memuat pelanggan dan fact_customer_subscribed tabel menjadi objek:

# fact_customer_subscribed
print("\nUpdating... fact_customer_subscribed")

table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live)
table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)

Sekarang, kita perlu menemukan kunci untuk dimensi waktu terkait. Karena kami selalu memasukkan data untuk kemarin, kami akan mencari tanggal tersebut di dim_time tabel dan gunakan ID-nya. Kueri mengembalikan 1 baris, dan ID berada di posisi pertama (indeks dimulai dari 0, jadi result[0][0] ):

# find key for the dim_time dimension
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday)
result = connection_dwh.execute(stmt).fetchall()
dim_time_id = result[0][0]

Untuk saat itu, kami akan menghapus semua catatan terkait dari tabel fakta:

# delete any existing data in the fact table for that time dimension value
stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id)
connection_dwh.execute(stmt)

Oke, sekarang kita memiliki ID dimensi waktu yang disimpan di dim_time_id variabel. Ini mudah karena kita hanya dapat memiliki satu nilai dimensi waktu. Ceritanya akan berbeda untuk dimensi kota. Pertama, kami akan memuat semua nilai yang kita butuhkan – nilai yang secara unik menggambarkan kota (bukan ID), dan nilai gabungan:

# prepare data for insert
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\
	.select_from(table_customer\
	.join(table_city)\
	.join(table_country))\
	.group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)

Ada beberapa hal yang ingin saya tekankan tentang kueri di atas:

  • func.sum(...) adalah SUM(...) dari "SQL standar".
  • case(...) sintaks menggunakan and_ sebelum kondisi, bukan di antara keduanya.
  • .label(...) berfungsi seperti alias SQL AS.
  • Kami menggunakan \ untuk pindah ke baris berikutnya dan meningkatkan keterbacaan kueri. (Percayalah, ini tidak bisa dibaca tanpa garis miring – saya sudah mencobanya :) )
  • .group_by(...) memainkan peran GROUP BY SQL.

Selanjutnya, kita akan mengulang setiap record yang dikembalikan menggunakan kueri sebelumnya. Untuk setiap catatan, kami akan membandingkan nilai yang secara unik mendefinisikan sebuah kota (city_name , postal_code , country_name ) dengan nilai yang disimpan dalam daftar yang dibuat dari dim_city DWH meja. Jika ketiga nilai cocok, kami akan menyimpan ID dari daftar dan menggunakannya saat memasukkan data baru. Dengan cara ini, untuk setiap record, kita akan memiliki ID untuk kedua dimensi:

# loop through all new records
# use time dimension
# for each record find key for city dimension
# insert row
new_values = connection_live.execute(stmt).fetchall()
for new_value in new_values:
	dim_city_id = -1;
	for dim_city in table_dim_city_list:
		if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]:
			dim_city_id = dim_city[0]
	if dim_city_id > 0:	
		stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6])
		connection_dwh.execute(stmt_insert)
		dim_city_id = -1
print("Completed.")

Dan hanya itu. Kami telah memperbarui DWH kami. Skrip akan lebih panjang jika kita memperbarui semua tabel dimensi dan fakta. Kompleksitas juga akan lebih besar ketika tabel fakta terkait dengan lebih banyak tabel dimensi. Dalam hal ini, kita memerlukan untuk loop untuk setiap tabel dimensi.

Ini Tidak Berfungsi!

Saya sangat kecewa ketika saya menulis skrip ini dan kemudian menemukan bahwa sesuatu seperti ini tidak akan berhasil:

stmt = select([table_city.columns.city_name])\
	.select_from(table_city\
	.outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\
	.where(table_dim_city.columns.id.is_(None))

Dalam contoh ini, saya mencoba menggunakan tabel dari dua database yang berbeda. Jika kita membuat dua koneksi terpisah, koneksi pertama tidak akan "melihat" tabel dari koneksi lain. Jika kita terhubung langsung ke server, dan bukan ke database, kita tidak akan dapat memuat tabel.

Sampai perubahan ini (semoga segera), Anda harus menggunakan beberapa jenis struktur (misalnya apa yang kami lakukan hari ini) untuk berkomunikasi antara dua database. Ini memperumit kode, karena Anda perlu mengganti satu kueri dengan dua daftar dan untuk bersarang loop.

Bagikan Pendapat Anda Tentang SQLAlchemy dan Python

Ini adalah artikel terakhir dalam seri ini. Tapi siapa yang tahu? Mungkin kami akan mencoba pendekatan lain di artikel mendatang, jadi pantau terus. Sementara itu, silakan bagikan pemikiran Anda tentang SQLAlchemy dan Python dalam kombinasi dengan database. Menurut Anda apa kekurangan kami dalam artikel ini? Apa yang akan Anda tambahkan? Beri tahu kami di komentar di bawah.

Anda dapat mengunduh skrip lengkap yang kami gunakan dalam artikel ini di sini.

Dan terima kasih khusus kepada Dirk J Bosman (@dirkjobosman), yang merekomendasikan seri artikel ini.


  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Blog dan Situs Web MySQL Teratas untuk Administrator Basis Data

  2. MySQL Tampilkan Hibah untuk semua Pengguna

  3. Cara memulai, memulai ulang, memeriksa status, dan menghentikan server MySQL

  4. Bisakah angka digunakan untuk memberi nama kolom tabel MySQL?

  5. Ekstensi mysql tidak digunakan lagi dan akan dihapus di masa mendatang:gunakan mysqli atau PDO sebagai gantinya