PostgreSQL
 sql >> Teknologi Basis Data >  >> RDS >> PostgreSQL

INSERT atau UPDATE data massal dari dataframe/CSV ke database PostgreSQL

Dalam kasus khusus ini lebih baik turun ke level DB-API, karena Anda memerlukan beberapa alat yang tidak diekspos bahkan oleh SQLAlchemy Core secara langsung, seperti copy_expert() . Itu dapat dilakukan menggunakan raw_connection() . Jika data sumber Anda adalah file CSV, Anda tidak memerlukan panda dalam hal ini sama sekali. Mulailah dengan membuat tabel staging sementara, salin data ke tabel temp, dan masukkan ke tabel tujuan dengan penanganan konflik:

conn = engine.raw_connection()

try:
    with conn.cursor() as cur:
        cur.execute("""CREATE TEMPORARY TABLE TEST_STAGING ( LIKE TEST_TABLE )
                       ON COMMIT DROP""")

        with open("your_source.csv") as data:
            cur.copy_expert("""COPY TEST_STAGING ( itemid, title, street, pincode )
                               FROM STDIN WITH CSV""", data)

        cur.execute("""INSERT INTO TEST_TABLE ( itemid, title, street, pincode )
                       SELECT itemid, title, street, pincode
                       FROM TEST_STAGING
                       ON CONFLICT ( itemid )
                       DO UPDATE SET title = EXCLUDED.title
                                   , street = EXCLUDED.street
                                   , pincode = EXCLUDED.pincode""")

except:
    conn.rollback()
    raise

else:
    conn.commit()

finally:
    conn.close()

Jika di sisi lain data sumber Anda adalah DataFrame , Anda masih dapat menggunakan COPY oleh melewati fungsi sebagai metode method= ke to_sql() . Fungsi tersebut bahkan dapat menyembunyikan semua logika di atas:

import csv

from io import StringIO
from psycopg2 import sql

def psql_upsert_copy(table, conn, keys, data_iter):
    dbapi_conn = conn.connection

    buf = StringIO()
    writer = csv.writer(buf)
    writer.writerows(data_iter)
    buf.seek(0)

    if table.schema:
        table_name = sql.SQL("{}.{}").format(
            sql.Identifier(table.schema), sql.Identifier(table.name))
    else:
        table_name = sql.Identifier(table.name)

    tmp_table_name = sql.Identifier(table.name + "_staging")
    columns = sql.SQL(", ").join(map(sql.Identifier, keys))

    with dbapi_conn.cursor() as cur:
        # Create the staging table
        stmt = "CREATE TEMPORARY TABLE {} ( LIKE {} ) ON COMMIT DROP"
        stmt = sql.SQL(stmt).format(tmp_table_name, table_name)
        cur.execute(stmt)

        # Populate the staging table
        stmt = "COPY {} ( {} ) FROM STDIN WITH CSV"
        stmt = sql.SQL(stmt).format(tmp_table_name, columns)
        cur.copy_expert(stmt, buf)

        # Upsert from the staging table to the destination. First find
        # out what the primary key columns are.
        stmt = """
               SELECT kcu.column_name
               FROM information_schema.table_constraints tco
               JOIN information_schema.key_column_usage kcu 
               ON kcu.constraint_name = tco.constraint_name
               AND kcu.constraint_schema = tco.constraint_schema
               WHERE tco.constraint_type = 'PRIMARY KEY'
               AND tco.table_name = %s
               """
        args = (table.name,)

        if table.schema:
            stmt += "AND tco.table_schema = %s"
            args += (table.schema,)

        cur.execute(stmt, args)
        pk_columns = {row[0] for row in cur.fetchall()}
        # Separate "data" columns from (primary) key columns
        data_columns = [k for k in keys if k not in pk_columns]
        # Build conflict_target
        pk_columns = sql.SQL(", ").join(map(sql.Identifier, pk_columns))

        set_ = sql.SQL(", ").join([
            sql.SQL("{} = EXCLUDED.{}").format(k, k)
            for k in map(sql.Identifier, data_columns)])

        stmt = """
               INSERT INTO {} ( {} )
               SELECT {}
               FROM {}
               ON CONFLICT ( {} )
               DO UPDATE SET {}
               """

        stmt = sql.SQL(stmt).format(
            table_name, columns, columns, tmp_table_name, pk_columns, set_)
        cur.execute(stmt)

Anda kemudian akan memasukkan DataFrame baru menggunakan

df.to_sql("test_table", engine,
          method=psql_upsert_copy,
          index=False,
          if_exists="append")

Menggunakan metode ini, penyisipan ~1.000.000 baris membutuhkan waktu sekitar 16 detik pada mesin ini dengan database lokal.




  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Pendekatan yang kuat untuk membuat kueri SQL secara terprogram

  2. Bagaimana cara mengetahui apakah suatu urutan diinisialisasi dalam sesi ini?

  3. postgres - pilih * dari tabel yang ada - psql mengatakan tabel tidak ada

  4. Kesalahan:INTO ditentukan lebih dari sekali pada atau di dekat INTO

  5. Bagaimana cara menulis kueri sql parameter untuk mencegah injeksi SQL?