uncategorized

Aggregation In MongoDB

MongoDB

Aggregation Pipeline

Aggregation Pipeline 是基于数据处理管道思想的数据聚合框架。把数据处理分为不同的阶段,数据依次经过每个阶段处理,最后得到结果。例如:

1
2
3
4
5
// SELECT customer_id AS _id, SUM(amount) AS total FROM orders WHERE status = "DONE" GROUP BY customer_id
db.orders.aggregate([
{ $match: { status: "DONE" } }, // 筛选阶段
{ $group: { _id: "$customer_id", total: { $sum: "$amount" } } }, // 分组阶段
])

Aggregation Pipline性能优化

  1. 第一阶段出现的$match,$sort,$geoNear可以利用相关索引而不必扫描全部文档
  2. 如果可以,在尽可能早的阶段筛选数据

Aggregation Pipline内部优化

  1. 映射优化:只读取使用到的字段,减少进入pipeline的数据量
  2. $project + $match: 提取$match中不依赖$project结果的筛选条件组成一个新的$match阶段,并放到$project前面
  3. $sort + $match: 移动$match到$sort前面
  4. $redact + $match: 提取部分$match的条件到$redact前面
  5. $project + $skip: 移动$skip到$project前面
  6. $sort + $limit: 如果可以,合并$limit到$sort中
  7. $limit + $limit: 合并为一个$limit,取较小值
  8. $skip + $skip: 合并为一个$skip,取和
  9. $match + $match: 合并为一个$match,使用$and合并条件
  10. $sort + $skip + $limit: $limit合并到$sort中,并加上$skip的值
  11. $lookup + $unwind: 把$unwind合并到$lookup中

Aggregation Pipline限制

  1. aggregate的结果集中的每个文档大小受BSON文档大小限制,当前为16MB,超出这个大小将报错。
  2. 处理数据过程中内存使用不能超过100MB,处理大量数据时需要添加allowDiskUse: true选项来允许向磁盘写入缓存(此选项对$graphLookup无效)。

Map-Reduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// map-reduce
db.orders.mapReduce(
function map() { emit(this.customer_id, this.amount) },
function reduce(customerId, amountValues) { return Array.sum(amountValues) },
{
query: { status: "DONE" },
out: "output_collection"
}
)

// 增量式map-reduce
db.orders.mapReduce(
function map() { emit(this.customer_id, this.amount) },
function reduce(customerId, amountValues) { return Array.sum(amountValues) },
{
query: {
status: "DONE",
timestamp: { $gt: ISODate('2018-11-14 00:00:00') }, // 筛选新的文档
},
out: { reduce: "output_collection" }, // 设置reduce合并到已有结果集
}
)

Map-Reduce限制

  1. 结果集中的每个文档大小受BSON文档大小限制,当前为16MB,超出这个大小将报错。

Tips

  • Aggregation Pipeline使用各种操作符($match, $group, …)来完成各阶段的处理任务,性能和复杂度都要好于Map-Reduce。
  • Map-Reduce更加灵活,可以实现Aggregation Pipeline没有提供的功能。
  • Map-Reduce和Aggregation Pipeline都可以应用到分片(Shards)上面。
Share