Availability : Change streams are available for replica sets and sharded clusters 前置条件: 更改流可用于副本集和分片集群
下面的 Java 示例假定您已连接到 MongoDB 副本集并访问了包含集合的 inventory 数据库。
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>");
// Select the MongoDB database and collection to open the change stream against
MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");
// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));
// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
该 pipeline 列表包括一个 $match 阶段,用于筛选 is username alice 的任何操作或 is operationType 为 的操作 delete 。 将 pipeline 传递给 watch() 该方法会指示更改流在通过指定的 pipeline .
默认情况下,更改流仅在更新操作期间返回字段的增量。但是,您可以将更改流配置为返回更新文档的最新多数提交版本。 若要返回更新文档的最新多数提交版本,请传递给 FullDocument.UPDATE_LOOKUP 该 db.collection.watch.fullDocument() 方法。 在下面的示例中,所有更新操作通知都包含一个 FullDocument 字段,该字段表示受更新操作影响的文档的当前版本。
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();
- Reference: Operation Types