将字段转换为另一种类型并以批量更新整个集合
通常情况下,当人们想要将字段类型更改为另一个时,例如原始集合可能将数字或日期字段保存为字符串:
{
"name": "Alice",
"salary": "57871",
"dob": "1986-08-21"
},
{
"name": "Bob",
"salary": "48974",
"dob": "1990-11-04"
}
目标是更新像上面这样的大型集合
{
"name": "Alice",
"salary": 57871,
"dob": ISODate("1986-08-21T00:00:00.000Z")
},
{
"name": "Bob",
"salary": 48974,
"dob": ISODate("1990-11-04T00:00:00.000Z")
}
对于相对较小的数据,可以通过使用 snapshot
使用光标的 forEach()
方法迭代集合并按如下方式更新每个文档来实现上述目的 :
db.test.find({
"salary": { "$exists": true, "$type": 2 },
"dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){
var newSalary = parseInt(doc.salary),
newDob = new ISODate(doc.dob);
db.test.updateOne(
{ "_id": doc._id },
{ "$set": { "salary": newSalary, "dob": newDob } }
);
});
虽然这对于小型集合来说是最佳的,但是由于循环遍历大型数据集并且每个请求向服务器发送每个更新操作会导致计算损失,因此大型集合的性能大大降低。
该 Bulk()
API 来救援,大大提高性能,因为写操作是成批只有一次发送到服务器。由于该方法不向服务器发送每个写入请求(如同 forEach()
循环中的当前更新语句 ),而是每 1000 个请求中只发送一次,因此实现了效率,从而使更新比当前更有效和更快。
使用与 forEach()
循环相同的概念来创建批次,我们可以按如下方式批量更新集合。在本演示中,MongoDB 版本 >= 2.6
和 < 3.2
中提供的 Bulk()
API 使用 initializeUnorderedBulkOp()
方法并行执行,以及非确定性顺序执行批处理中的写操作。
它通过将 salary
和 dob
字段分别更改为 numerical
和 datetime
值来更新客户端集合中的所有文档:
var bulk = db.test.initializeUnorderedBulkOp(),
counter = 0; // counter to keep track of the batch update size
db.test.find({
"salary": { "$exists": true, "$type": 2 },
"dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){
var newSalary = parseInt(doc.salary),
newDob = new ISODate(doc.dob);
bulk.find({ "_id": doc._id }).updateOne({
"$set": { "salary": newSalary, "dob": newDob }
});
counter++; // increment counter
if (counter % 1000 == 0) {
bulk.execute(); // Execute per 1000 operations and re-initialize every 1000 update statements
bulk = db.test.initializeUnorderedBulkOp();
}
});
下一个示例适用于新的 MongoDB 版本 3.2
,该版本已经弃用了 Bulk()
API 并使用 bulkWrite()
提供了一套更新的 api。
它使用与上面相同的游标,但使用相同的 forEach()
游标方法创建具有批量操作的数组,以将每个批量写入文档推送到数组。因为写入命令可以接受不超过 1000 次操作,所以需要将操作分组以进行最多 1000 次操作,并在循环达到 1000 次迭代时重新初始化数组:
var cursor = db.test.find({
"salary": { "$exists": true, "$type": 2 },
"dob": { "$exists": true, "$type": 2 }
}),
bulkUpdateOps = [];
cursor.snapshot().forEach(function(doc){
var newSalary = parseInt(doc.salary),
newDob = new ISODate(doc.dob);
bulkUpdateOps.push({
"updateOne": {
"filter": { "_id": doc._id },
"update": { "$set": { "salary": newSalary, "dob": newDob } }
}
});
if (bulkUpdateOps.length === 1000) {
db.test.bulkWrite(bulkUpdateOps);
bulkUpdateOps = [];
}
});
if (bulkUpdateOps.length > 0) { db.test.bulkWrite(bulkUpdateOps); }