MongoDB
 sql >> Teknologi Basis Data >  >> NoSQL >> MongoDB

Impor CSV Menggunakan Skema Mongoose

Anda dapat melakukannya dengan csv cepat dengan mendapatkan headers dari definisi skema yang akan mengembalikan garis yang diuraikan sebagai "objek". Anda sebenarnya memiliki beberapa ketidakcocokan, jadi saya telah menandainya dengan koreksi:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Selama skema benar-benar sesuai dengan CSV yang disediakan maka tidak apa-apa. Ini adalah koreksi yang dapat saya lihat tetapi jika Anda membutuhkan nama bidang yang sebenarnya disejajarkan secara berbeda maka Anda perlu menyesuaikan. Tapi pada dasarnya ada Number di posisi di mana ada String dan pada dasarnya bidang tambahan, yang saya anggap kosong di CSV.

Hal-hal umum mendapatkan array nama bidang dari skema dan meneruskannya ke opsi saat membuat instance parser csv:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

Setelah Anda benar-benar melakukannya, Anda akan mendapatkan "Objek" kembali alih-alih array:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

Jangan khawatir tentang "tipe" karena Mongoose akan memberikan nilai sesuai skema.

Sisanya terjadi di dalam handler untuk data peristiwa. Untuk efisiensi maksimum kami menggunakan insertMany() untuk hanya menulis ke database sekali setiap 10.000 baris. Bagaimana itu benar-benar masuk ke server dan prosesnya tergantung pada versi MongoDB, tetapi 10.000 harus cukup masuk akal berdasarkan jumlah rata-rata bidang yang akan Anda impor untuk satu koleksi dalam hal "pertukaran" untuk penggunaan memori dan penulisan permintaan jaringan yang wajar. Kecilkan angkanya jika perlu.

Bagian yang penting adalah menandai panggilan ini sebagai async fungsi dan await hasil dari insertMany() sebelum melanjutkan. Kita juga perlu pause() aliran dan resume() pada setiap item jika tidak, kami berisiko menimpa buffer dokumen untuk dimasukkan sebelum benar-benar dikirim. pause() dan resume() diperlukan untuk menempatkan "tekanan balik" pada pipa, jika tidak item akan terus "keluar" dan menembakkan data acara.

Secara alami kontrol untuk 10.000 entri mengharuskan kami memeriksa bahwa baik pada setiap iterasi dan penyelesaian aliran untuk mengosongkan buffer dan mengirim dokumen yang tersisa ke server.

Itulah yang sebenarnya ingin Anda lakukan, karena Anda tentu tidak ingin menjalankan permintaan asinkron ke server baik pada "setiap" iterasi melalui data acara atau pada dasarnya tanpa menunggu setiap permintaan selesai. Anda akan lolos dengan tidak memeriksanya untuk "file yang sangat kecil", tetapi untuk beban dunia nyata apa pun, Anda pasti akan melebihi tumpukan panggilan karena panggilan asinkron "dalam penerbangan" yang belum selesai.

FYI - sebuah package.json digunakan. mz bersifat opsional karena hanya Promise yang dimodernisasi pustaka yang diaktifkan dari pustaka "bawaan" simpul standar yang biasa saya gunakan. Kode ini tentu saja sepenuhnya dapat dipertukarkan dengan fs modul.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

Sebenarnya dengan Node v8.9.x ke atas maka kita bahkan bisa membuatnya lebih sederhana dengan implementasi AsyncIterator melalui stream-to-iterator modul. Masih dalam Iterator<Promise<T>> mode, tetapi harus dilakukan sampai Node v10.x menjadi LTS stabil:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

Pada dasarnya, semua aliran "acara" yang menangani dan menjeda dan melanjutkan akan diganti dengan for sederhana lingkaran:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

Mudah! Ini akan dibersihkan dalam implementasi node selanjutnya dengan for..await..of ketika menjadi lebih stabil. Tapi di atas berjalan dengan baik pada dari versi yang ditentukan dan di atasnya.



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. server mengembalikan kesalahan pada langkah otentikasi SASL:Otentikasi gagal

  2. MongoDB $kalikan

  3. Operator MongoDB $gt/$lt dengan harga disimpan sebagai string

  4. MongoDB 2.1 Kerangka Agregat Jumlah Elemen Array yang cocok dengan nama

  5. Bagaimana cara memvalidasi anggota bidang array?