將欄位轉換為另一種型別並以批量更新整個集合

通常情況下,當人們想要將欄位型別更改為另一個時,例如原始集合可能將數字日期欄位儲存為字串:

{
    "name": "Alice",
    "salary": "57871",
    "dob": "1986-08-21"
},
{
    "name": "Bob",
    "salary": "48974",
    "dob": "1990-11-04"
}

目標是更新像上面這樣的大型集合

{
    "name": "Alice",
    "salary": 57871,
    "dob": ISODate("1986-08-21T00:00:00.000Z")
},
{
    "name": "Bob",
    "salary": 48974,
    "dob": ISODate("1990-11-04T00:00:00.000Z")
}

對於相對較小的資料,可以通過使用 snapshot 使用游標的 forEach() 方法迭代集合並按如下方式更新每個文件來實現上述目的 :

db.test.find({
    "salary": { "$exists": true, "$type": 2 },
    "dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){ 
    var newSalary = parseInt(doc.salary),
        newDob = new ISODate(doc.dob);        
    db.test.updateOne(
        { "_id": doc._id },
        { "$set": { "salary": newSalary, "dob": newDob } }
    );
});

雖然這對於小型集合來說是最佳的,但是由於迴圈遍歷大型資料集並且每個請求向伺服器傳送每個更新操作會導致計算損失,因此大型集合的效能大大降低。

Bulk() API 來救援,大大提高效能,因為寫操作是成批只有一次傳送到伺服器。由於該方法不向伺服器傳送每個寫入請求(如同 forEach() 迴圈中的當前更新語句 ),而是每 1000 個請求中只傳送一次,因此實現了效率,從而使更新比當前更有效和更快。

使用與 forEach() 迴圈相同的概念來建立批次,我們可以按如下方式批量更新集合。在本演示中,MongoDB 版本 >= 2.6< 3.2 中提供的 Bulk() API 使用 initializeUnorderedBulkOp() 方法並行執行,以及非確定性順序執行批處理中的寫操作。

它通過將 salarydob 欄位分別更改為 numericaldatetime 值來更新客戶端集合中的所有文件:

var bulk = db.test.initializeUnorderedBulkOp(),
    counter = 0; // counter to keep track of the batch update size

db.test.find({
    "salary": { "$exists": true, "$type": 2 },
    "dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){ 
    var newSalary = parseInt(doc.salary),
        newDob = new ISODate(doc.dob);
    bulk.find({ "_id": doc._id }).updateOne({ 
        "$set": { "salary": newSalary, "dob": newDob }
    });

    counter++; // increment counter
    if (counter % 1000 == 0) {
        bulk.execute(); // Execute per 1000 operations and re-initialize every 1000 update statements
        bulk = db.test.initializeUnorderedBulkOp();
    }
});

下一個示例適用於新的 MongoDB 版本 3.2,該版本已經棄用了 Bulk() API 並使用 bulkWrite() 提供了一套更新的 api。

它使用與上面相同的遊標,但使用相同的 forEach() 遊標方法建立具有批量操作的陣列,以將每個批量寫入文件推送到陣列。因為寫入命令可以接受不超過 1000 次操作,所以需要將操作分組以進行最多 1000 次操作,並在迴圈達到 1000 次迭代時重新初始化陣列:

var cursor = db.test.find({
        "salary": { "$exists": true, "$type": 2 },
        "dob": { "$exists": true, "$type": 2 }
    }),
    bulkUpdateOps = [];

cursor.snapshot().forEach(function(doc){ 
    var newSalary = parseInt(doc.salary),
        newDob = new ISODate(doc.dob);
    bulkUpdateOps.push({ 
        "updateOne": {
            "filter": { "_id": doc._id },
            "update": { "$set": { "salary": newSalary, "dob": newDob } }
         }
    });

    if (bulkUpdateOps.length === 1000) {
        db.test.bulkWrite(bulkUpdateOps);
        bulkUpdateOps = [];
    }
});         

if (bulkUpdateOps.length > 0) { db.test.bulkWrite(bulkUpdateOps); }