Anda dapat melakukannya dengan csv cepat dengan mendapatkan headers
dari definisi skema yang akan mengembalikan garis yang diuraikan sebagai "objek". Anda sebenarnya memiliki beberapa ketidakcocokan, jadi saya telah menandainya dengan koreksi:
const fs = require('mz/fs');
const csv = require('fast-csv');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
console.log(headers);
await new Promise((resolve,reject) => {
let buffer = [],
counter = 0;
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}
stream.resume();
})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});
});
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
Selama skema benar-benar sesuai dengan CSV yang disediakan maka tidak apa-apa. Ini adalah koreksi yang dapat saya lihat tetapi jika Anda membutuhkan nama bidang yang sebenarnya disejajarkan secara berbeda maka Anda perlu menyesuaikan. Tapi pada dasarnya ada Number
di posisi di mana ada String
dan pada dasarnya bidang tambahan, yang saya anggap kosong di CSV.
Hal-hal umum mendapatkan array nama bidang dari skema dan meneruskannya ke opsi saat membuat instance parser csv:
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
Setelah Anda benar-benar melakukannya, Anda akan mendapatkan "Objek" kembali alih-alih array:
{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}
Jangan khawatir tentang "tipe" karena Mongoose akan memberikan nilai sesuai skema.
Sisanya terjadi di dalam handler untuk data
peristiwa. Untuk efisiensi maksimum kami menggunakan insertMany()
untuk hanya menulis ke database sekali setiap 10.000 baris. Bagaimana itu benar-benar masuk ke server dan prosesnya tergantung pada versi MongoDB, tetapi 10.000 harus cukup masuk akal berdasarkan jumlah rata-rata bidang yang akan Anda impor untuk satu koleksi dalam hal "pertukaran" untuk penggunaan memori dan penulisan permintaan jaringan yang wajar. Kecilkan angkanya jika perlu.
Bagian yang penting adalah menandai panggilan ini sebagai async
fungsi dan await
hasil dari insertMany()
sebelum melanjutkan. Kita juga perlu pause()
aliran dan resume()
pada setiap item jika tidak, kami berisiko menimpa buffer
dokumen untuk dimasukkan sebelum benar-benar dikirim. pause()
dan resume()
diperlukan untuk menempatkan "tekanan balik" pada pipa, jika tidak item akan terus "keluar" dan menembakkan data
acara.
Secara alami kontrol untuk 10.000 entri mengharuskan kami memeriksa bahwa baik pada setiap iterasi dan penyelesaian aliran untuk mengosongkan buffer dan mengirim dokumen yang tersisa ke server.
Itulah yang sebenarnya ingin Anda lakukan, karena Anda tentu tidak ingin menjalankan permintaan asinkron ke server baik pada "setiap" iterasi melalui data
acara atau pada dasarnya tanpa menunggu setiap permintaan selesai. Anda akan lolos dengan tidak memeriksanya untuk "file yang sangat kecil", tetapi untuk beban dunia nyata apa pun, Anda pasti akan melebihi tumpukan panggilan karena panggilan asinkron "dalam penerbangan" yang belum selesai.
FYI - sebuah package.json
digunakan. mz
bersifat opsional karena hanya Promise
yang dimodernisasi pustaka yang diaktifkan dari pustaka "bawaan" simpul standar yang biasa saya gunakan. Kode ini tentu saja sepenuhnya dapat dipertukarkan dengan fs
modul.
{
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
Sebenarnya dengan Node v8.9.x ke atas maka kita bahkan bisa membuatnya lebih sederhana dengan implementasi AsyncIterator
melalui stream-to-iterator
modul. Masih dalam Iterator<Promise<T>>
mode, tetapi harus dilakukan sampai Node v10.x menjadi LTS stabil:
const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
//console.log(headers);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));
const iterator = await streamToIterator(stream).init();
let buffer = [],
counter = 0;
for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
Pada dasarnya, semua aliran "acara" yang menangani dan menjeda dan melanjutkan akan diganti dengan for
sederhana lingkaran:
const iterator = await streamToIterator(stream).init();
for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}
Mudah! Ini akan dibersihkan dalam implementasi node selanjutnya dengan for..await..of
ketika menjadi lebih stabil. Tapi di atas berjalan dengan baik pada dari versi yang ditentukan dan di atasnya.