db.collection.aggregate(
[
{
"$addFields": {
"indexes": {
"$range": [
0,
{
"$size": "$time_series"
}
]
},
"reversedSeries": {
"$reverseArray": "$time_series"
}
}
},
{
"$project": {
"derivatives": {
"$reverseArray": {
"$slice": [
{
"$map": {
"input": {
"$zip": {
"inputs": [
"$reversedSeries",
"$indexes"
]
}
},
"in": {
"$subtract": [
{
"$arrayElemAt": [
"$$this",
0
]
},
{
"$arrayElemAt": [
"$reversedSeries",
{
"$add": [
{
"$arrayElemAt": [
"$$this",
1
]
},
1
]
}
]
}
]
}
}
},
{
"$subtract": [
{
"$size": "$time_series"
},
1
]
}
]
}
},
"time_series": 1
}
}
]
)
Kita dapat menggunakan pipeline di atas dalam versi 3.4+ untuk melakukan ini. Dalam pipeline, kita menggunakan $addFields
tahap pipa. operator untuk menambahkan larik indeks elemen "time_series" untuk melakukan dokumen, kami juga membalik larik deret waktu dan menambahkannya ke dokumen masing-masing menggunakan $range
dan $reverseArray
operator
Kami membalikkan array di sini karena elemen pada posisi p
dalam array selalu lebih besar dari elemen pada posisi p+1
yang artinya [p] - [p+1] < 0
dan kami tidak ingin menggunakan $multiply
di sini. (lihat saluran untuk versi 3.2)
Selanjutnya kita $zipped
data deret waktu dengan larik indeks dan menerapkan substract
ekspresi ke array yang dihasilkan menggunakan $map
operator.
Kami kemudian $slice
hasil untuk membuang null/None
nilai dari array dan membalikkan hasilnya.
Di 3.2 kita dapat menggunakan $unwind
operator untuk bersantai larik kita dan sertakan indeks setiap elemen dalam larik dengan menetapkan dokumen sebagai operan alih-alih "jalur" tradisional yang diawali dengan $ .
Selanjutnya dalam pipeline, kita perlu $group
dokumen kami dan gunakan $push
operator akumulator untuk mengembalikan larik sub-dokumen yang terlihat seperti ini:
{
"_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
"time_series" : [
{ "value" : 10, "index" : NumberLong(0) },
{ "value" : 20, "index" : NumberLong(1) },
{ "value" : 40, "index" : NumberLong(2) },
{ "value" : 70, "index" : NumberLong(3) },
{ "value" : 110, "index" : NumberLong(4) }
]
}
Akhirnya muncul $project
panggung. Pada tahap ini, kita perlu menggunakan $map
operator untuk menerapkan serangkaian ekspresi ke setiap elemen dalam larik yang baru dihitung di $group
panggung.
Inilah yang terjadi di dalam $map
(lihat $map
sebagai perulangan for) di ekspresi:
Untuk setiap subdokumen, kami menetapkan nilai ke variabel menggunakan $let
operator variabel. Kami kemudian mengurangi nilainya dari nilai bidang "nilai" elemen berikutnya dalam array.
Karena elemen berikutnya dalam array adalah elemen pada indeks saat ini plus satu, yang kita butuhkan hanyalah bantuan $arrayElemAt
operator dan $add
sederhana
ition dari indeks elemen saat ini dan 1
.
$subtract
ekspresi mengembalikan nilai negatif jadi kita perlu mengalikan nilainya dengan -1
menggunakan $multiply
operator.
Kita juga perlu $filter
array yang dihasilkan karena elemen terakhir adalah None
atau null
. Alasannya adalah ketika elemen saat ini adalah elemen terakhir, $subtract
kembalikan None
karena indeks elemen berikutnya sama dengan ukuran array.
db.collection.aggregate([
{
"$unwind": {
"path": "$time_series",
"includeArrayIndex": "index"
}
},
{
"$group": {
"_id": "$_id",
"time_series": {
"$push": {
"value": "$time_series",
"index": "$index"
}
}
}
},
{
"$project": {
"time_series": {
"$filter": {
"input": {
"$map": {
"input": "$time_series",
"as": "el",
"in": {
"$multiply": [
{
"$subtract": [
"$$el.value",
{
"$let": {
"vars": {
"nextElement": {
"$arrayElemAt": [
"$time_series",
{
"$add": [
"$$el.index",
1
]
}
]
}
},
"in": "$$nextElement.value"
}
}
]
},
-1
]
}
}
},
"as": "item",
"cond": {
"$gte": [
"$$item",
0
]
}
}
}
}
}
])
Opsi lain yang menurut saya kurang efisien adalah melakukan operasi peta/pengurangan pada koleksi kami menggunakan map_reduce
metode.
>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
... function() {
... var derivatives = [];
... for (var index=1; index<this.time_series.length; index++) {
... derivatives.push(this.time_series[index] - this.time_series[index-1]);
... }
... emit(this._id, derivatives);
... }
... """)
>>> reducer = Code("""
... function(key, value) {}
... """)
>>> for res in collection.map_reduce(mapper, reducer, out={'inline': 1})['results']:
... print(res) # or do something with the document.
...
{'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')}
Anda juga dapat mengambil semua dokumen dan menggunakan numpy.diff
untuk mengembalikan turunan seperti ini:
import numpy as np
for document in collection.find({}, {'time_series': 1}):
result = np.diff(document['time_series'])