跳转至

7 MongoDB Change Stream & 开发最佳实践

1 Change Stream

什么是 Change Stream

Change Stream 是 MongoDB 用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同:

Alt Image Text

Change Stream 的实现原理

Change Stream 是基于 oplog 实现的。它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数。

被追踪的变更事件主要包括:

  • insert/update/delete: 插入、更新、删除;
  • drop: 集合被删除;
  • rename: 集合被重命名;
  • dropDatabase: 数据库被删除;
  • invalidate: drop/rename/dropDatabase将导致invalidate被触发, 并关闭 change stream;

Alt Image Text

Change Stream 与可重复读

Change Stream 只推送已经在大多数节点上提交的变更操作。即“可重复读”的变更。 这个验证是通过 {readConcern: "majority"} 实现的。因此:

  • 未开启 majorityreadConcern 的集群无法使用ChangeStream;
  • 当集群无法满足{w:"majority"}时,不会触发ChangeStream(例如PSA架构 中的 S 因故障宕机)。

Alt Image Text

Change Stream 变更过滤

如果只对某些类型的变更事件感兴趣,可以使用使用聚合管道的过滤步骤过滤事件。

例如:

var cs = db.collection.watch([{
    $match: {
        operationType: {
            $in: ['insert', 'delete']
    } 
   }
}])

Change Stream 示例

Console 1

> db.collection.watch([],
{maxAwaitTimeMS: 30000}).pretty()

Console 2

> db.collection.insert({
    _id: 1,
    text: "hello"
  })
{
    _id : (resumeToken), 
    operationType : "insert", 
    ...
    fullDocument : {
        _id : 1,
        text: "hello" 
    }
}

Alt Image Text

Alt Image Text

Alt Image Text

Alt Image Text

Change Stream 故障恢复

假设在一系列写入操作的过程中,订阅 Change Stream 的应用在接收到“写3”之后 于 t0 时刻崩溃,重启后后续的变更怎么办?

Alt Image Text

想要从上次中断的地方继续获取变更流,只需要保留上次 变更通知中的 _id 即可。

所示是一次 Change Stream 回调所返回的数据。每 条这样的数据都带有一个 _id,这个 _id 可以用于断点恢 复。例如

var cs = db.collection.watch([], {resumeAfter: <_id>})

即可从上一条通知中断处继续获取后续的变更通知。

{
    id : { "_data" : BinData(0,"gl3dBycAAAABRmRfaWQAZF3dB ycMPJGagxioigBaEASFV+UNaJtBdbJ2j5fOD+3 GBA==") },

    operationType : "insert", 
    ns : {
        db : "test",
        coll : "collection"
    }, 
    documentKey : { 
        _id:1
    },
    fullDocument : 
    { 
        _id : 1,
        text: "hello"
    } 
}

Change Stream 使用场景

  • 跨集群的变更复制——在源集群中订阅ChangeStream,一旦得到任何变更立即写入目标集群。
  • 微服务联动——当一个微服务变更数据库时,其他微服务得到通知并做出相应的变更。
  • 其他任何需要系统联动的场景。

注意事项

  • ChangeStream依赖于oplog,因此中断时间不可超过oplog回收的最大时间窗;
  • 在执行update操作时,如果只更新了部分数据,那么ChangeStream通知的也 是增量部分;
  • 同理,删除数据时通知的仅是删除数据的_id

2 MongoDB 开发最佳实践

连接到 MongoDB

关于驱动程序:总是选择与所用之MongoDB相兼容的驱动程序。这可以很容易地从驱动 兼容对照表中查到;

  • 如果使用第三方框架(如SpringData),则还需要考虑框架版本与驱动的兼容性;

关于连接对象MongoClient:使用MongoClient对象连接到MongoDB实例时总是应该 保证它单例,并且在整个生命周期中都从它获取其他操作对象

  • 关于连接字符串:连接字符串中可以配置大部分连接选项,建议总是在连接字符串中配置 这些选项;
// 连接到复制集 
mongodb://节点1,节点2,节点3.../database?[options]

// 连接到分片集
 mongodb://mongos1,mongos2,mongos3.../database?[options]

常见连接字符串参数

  • maxPoolSize: 连接池大小
  • MaxWaitTime: 建议设置,自动杀掉太慢的查询
  • WriteConcern: 建议majority保证数据安全
  • ReadConcern: 对于数据一致性要求高的场景适当使用

连接字符串节点和地址

  • 无论对于复制集或分片集,连接字符串中都应尽可能多地提供节点地址,建议全部列出;

    • 复制集利用这些地址可以更有效地发现集群成员;
    • 分片集利用这些地址可以更有效地分散负载;
  • 连接字符串中尽可能使用与复制集内部配置相同的域名或IP;

使用域名连接集群

在配置集群时使用域名可以为集群变更时提供一层额外的保护。例如需要将集群整体 迁移到新网段,直接修改域名解析即可。

另外,MongoDB 提供的 mongodb+srv:// 协议可以提供额外一层的保护。该协议允许通过域名解析得到所有 mongos 或节点的地址,而不是写在连接字符串中。

mongodb+srv://server.example.com/
Record TTL Class Priority Weight Port Target _mongodb._tcp.server.example.com. 86400 IN SRV 0 5 27317 mongodb1.example.com.
_mongodb._tcp.server.example.com. 86400 IN SRV 0 5 27017 mongodb2.example.com.

不要在 mongos 前面使用负载均衡

基于前面提到的原因,驱动已经知晓在不同的 mongos 之间实现负载均衡,而复制集 则需要根据节点的角色来选择发送请求的目标。如果在 mongos 或复制集上层部署负 载均衡:

  • 驱动会无法探测具体哪个节点存活,从而无法完成自动故障恢复;
  • 驱动会无法判断游标是在哪个节点创建的,从而遍历游标时出错;

结论:不要在 mongos 或复制集上层放置负载均衡器,让驱动处理负载均衡和自动故障恢复

游标使用

如果一个游标已经遍历完,则会自动关闭;如果没有遍历完,则需要手动调用 close()方法,否则该游标将在服务器上存在 10 分钟(默认值)后超时释放,造成不必要的资源浪费

但是,如果不能遍历完一个游标,通常意味着查询条件太宽泛,更应该考虑的问题是 如何将条件收紧。

关于查询及索引

  • 每一个查询都必须要有对应的索引
  • 尽量使用覆盖索引 Covered Indexes (可以避免读数据文件)
  • 使用 projection 来减少返回到客户端的的文档的内容

关于写入

  • 在 update 语句里只包括需要更新的字段
  • 尽可能使用批量插入来提升写入性能
  • 使用TTL自动过期日志类型的数据

关于文档结构

  • 防止使用太长的字段名(浪费空间)
  • 防止使用太深的数组嵌套(超过2层操作比较复杂)
  • 不使用中文,标点符号等非拉丁字母作为字段名

处理分页问题 – 避免使用 count

尽可能不要计算总页数,特别是数据量大和查询条件不能完整命中索引时。 考虑以下场景:假设集合总共有 1000w 条数据,在没有索引的情况下考虑以下查询:

db.coll.find({x: 100}).limit(50);

db.coll.count({x: 100});
  • 前者只需要遍历前n条,直到找到50条队伍x=100的文档即可结束;
  • 后者需要遍历完1000w条找到所有符合要求的文档才能得到结果。

处理分页问题 – 巧分页

避免使用skip/limit形式的分页,特别是数据量大的时候;

替代方案:使用查询条件+唯一排序条件;

例如:

  • 第一页: db.posts.find({}).sort({_id: 1}).limit(20);
  • 第二页: db.posts.find({_id: {$gt: <第一页最后一个_id>}}).sort({_id: 1}).limit(20);
  • 第三页: db.posts.find({_id: {$gt: <第二页最后一个_id>}}).sort({_id: 1}).limit(20);

......

关于事务

使用事务的原则:

  • 无论何时,事务的使用总是能避免则避免;
  • 模型设计先于事务,尽可能用模型设计规避事务;
  • 不要使用过大的事务(尽量控制在1000个文档更新以内);
  • 当必须使用事务时,尽可能让涉及事务的文档分布在同一个分片上,这 将有效地提高效率;