Skip site navigation (1)Skip section navigation (2)

FreeBSD Manual Pages

  
 
  

home | help
MONGOC_CHANGE_STREAM_T(3)	   libmongoc	     MONGOC_CHANGE_STREAM_T(3)

SYNOPSIS
	  #include <mongoc/mongoc.h>

	  typedef struct _mongoc_change_stream_t mongoc_change_stream_t;

       mongoc_change_stream_t  is  a  handle  to a change stream. A collection
       change stream can be obtained using mongoc_collection_watch().

       It is recommended to use	a mongoc_change_stream_t and its functions in-
       stead of	a raw aggregation with a $changeStream stage. For more	infor-
       mation see the MongoDB Manual Entry on Change Streams.

EXAMPLE
       example-collection-watch.c

	  #include <mongoc/mongoc.h>

	  int
	  main (void)
	  {
	     bson_t empty = BSON_INITIALIZER;
	     const bson_t *doc;
	     bson_t *to_insert = BCON_NEW ("x",	BCON_INT32 (1));
	     const bson_t *err_doc;
	     bson_error_t error;
	     const char	*uri_string;
	     mongoc_uri_t *uri;
	     mongoc_client_t *client;
	     mongoc_collection_t *coll;
	     mongoc_change_stream_t *stream;
	     mongoc_write_concern_t *wc	= mongoc_write_concern_new ();
	     bson_t opts = BSON_INITIALIZER;
	     bool r;

	     mongoc_init ();

	     uri_string	= "mongodb://"
			  "localhost:27017,localhost:27018,localhost:"
			  "27019/db?replicaSet=rs0";

	     uri = mongoc_uri_new_with_error (uri_string, &error);
	     if	(!uri) {
		fprintf	(stderr,
			 "failed to parse URI: %s\n"
			 "error	message:       %s\n",
			 uri_string,
			 error.message);
		return EXIT_FAILURE;
	     }

	     client = mongoc_client_new_from_uri (uri);
	     if	(!client) {
		return EXIT_FAILURE;
	     }

	     coll = mongoc_client_get_collection (client, "db",	"coll");
	     stream = mongoc_collection_watch (coll, &empty, NULL);

	     mongoc_write_concern_set_wmajority	(wc, 10000);
	     mongoc_write_concern_append (wc, &opts);
	     r = mongoc_collection_insert_one (coll, to_insert,	&opts, NULL, &error);
	     if	(!r) {
		fprintf	(stderr, "Error: %s\n",	error.message);
		return EXIT_FAILURE;
	     }

	     while (mongoc_change_stream_next (stream, &doc)) {
		char *as_json =	bson_as_relaxed_extended_json (doc, NULL);
		fprintf	(stderr, "Got document:	%s\n", as_json);
		bson_free (as_json);
	     }

	     if	(mongoc_change_stream_error_document (stream, &error, &err_doc)) {
		if (!bson_empty	(err_doc)) {
		   fprintf (stderr, "Server Error: %s\n", bson_as_relaxed_extended_json	(err_doc, NULL));
		} else {
		   fprintf (stderr, "Client Error: %s\n", error.message);
		}
		return EXIT_FAILURE;
	     }

	     bson_destroy (to_insert);
	     mongoc_write_concern_destroy (wc);
	     bson_destroy (&opts);
	     mongoc_change_stream_destroy (stream);
	     mongoc_collection_destroy (coll);
	     mongoc_uri_destroy	(uri);
	     mongoc_client_destroy (client);
	     mongoc_cleanup ();

	     return EXIT_SUCCESS;
	  }

   Starting and	Resuming
       All  watch  functions accept several options to indicate	where a	change
       stream should start returning changes  from:  resumeAfter,  startAfter,
       and startAtOperationTime.

       All  changes  returned  by mongoc_change_stream_next() include a	resume
       token in	the _id	field. MongoDB 4.2 also	includes an additional	resume
       token  in each "aggregate" and "getMore"	command	response, which	points
       to the end of that response's batch. The	current	token is automatically
       cached by libmongoc. In the event of an error,  libmongoc  attempts  to
       recreate	 the  change  stream starting where it left off	by passing the
       cached resume token. libmongoc only attempts to resume once, but	client
       applications   can   access    the    cached    resume	 token	  with
       mongoc_change_stream_get_resume_token() and use it for their own	resume
       logic by	passing	it as either the resumeAfter or	startAfter option.

       Additionally,  change  streams can start	returning changes at an	opera-
       tion time by using the startAtOperationTime  field.  This  can  be  the
       timestamp returned in the operationTime field of	a command reply.

       resumeAfter,  startAfter,  and startAtOperationTime are mutually	exclu-
       sive options. Setting more than one will	result in a server error.

       The following example implements	custom resuming	logic, persisting  the
       resume token in a file.

       example-resume.c

	  #include <mongoc/mongoc.h>

	  /* An	example	implementation of custom resume	logic in a change stream.
	   * example-resume starts a client-wide change	stream and persists the	resume
	   * token in a	file "resume-token.json". On restart, if "resume-token.json"
	   * exists, the change	stream starts watching after the persisted resume token.
	   *
	   * This behavior allows a user to exit example-resume, and restart it	later
	   * without missing any change	events.
	   */
	  #include <unistd.h>

	  static const char *RESUME_TOKEN_PATH = "resume-token.json";

	  static bool
	  _save_resume_token (const bson_t *doc)
	  {
	     FILE *file_stream;
	     bson_iter_t iter;
	     bson_t resume_token_doc;
	     char *as_json = NULL;
	     size_t as_json_len;
	     ssize_t r,	n_written;
	     const bson_value_t	*resume_token;

	     if	(!bson_iter_init_find (&iter, doc, "_id")) {
		fprintf	(stderr, "reply	does not contain operationTime.");
		return false;
	     }
	     resume_token = bson_iter_value (&iter);
	     /*	store the resume token in a document, {	resumeAfter: <resume token> }
	      *	which we can later append easily. */
	     file_stream = fopen (RESUME_TOKEN_PATH, "w+");
	     if	(!file_stream) {
		fprintf	(stderr, "failed to open %s for	writing\n", RESUME_TOKEN_PATH);
		return false;
	     }
	     bson_init (&resume_token_doc);
	     BSON_APPEND_VALUE (&resume_token_doc, "resumeAfter", resume_token);
	     as_json = bson_as_canonical_extended_json (&resume_token_doc, &as_json_len);
	     bson_destroy (&resume_token_doc);
	     n_written = 0;
	     while (n_written <	as_json_len) {
		r = fwrite ((void *) (as_json +	n_written), sizeof (char), as_json_len - n_written, file_stream);
		if (r == -1) {
		   fprintf (stderr, "failed to write to	%s\n", RESUME_TOKEN_PATH);
		   bson_free (as_json);
		   fclose (file_stream);
		   return false;
		}
		n_written += r;
	     }

	     bson_free (as_json);
	     fclose (file_stream);
	     return true;
	  }

	  bool
	  _load_resume_token (bson_t *opts)
	  {
	     bson_error_t error;
	     bson_json_reader_t	*reader;
	     bson_t doc;

	     /*	if the file does not exist, skip. */
	     if	(-1 == access (RESUME_TOKEN_PATH, R_OK)) {
		return true;
	     }
	     reader = bson_json_reader_new_from_file (RESUME_TOKEN_PATH, &error);
	     if	(!reader) {
		fprintf	(stderr, "failed to open %s for	reading: %s\n",	RESUME_TOKEN_PATH, error.message);
		return false;
	     }

	     bson_init (&doc);
	     if	(-1 == bson_json_reader_read (reader, &doc, &error)) {
		fprintf	(stderr, "failed to read doc from %s\n", RESUME_TOKEN_PATH);
		bson_destroy (&doc);
		bson_json_reader_destroy (reader);
		return false;
	     }

	     printf ("found cached resume token	in %s, resuming	change stream.\n", RESUME_TOKEN_PATH);

	     bson_concat (opts,	&doc);
	     bson_destroy (&doc);
	     bson_json_reader_destroy (reader);
	     return true;
	  }

	  int
	  main (void)
	  {
	     int exit_code = EXIT_FAILURE;
	     const char	*uri_string;
	     mongoc_uri_t *uri = NULL;
	     bson_error_t error;
	     mongoc_client_t *client = NULL;
	     bson_t pipeline = BSON_INITIALIZER;
	     bson_t opts = BSON_INITIALIZER;
	     mongoc_change_stream_t *stream = NULL;
	     const bson_t *doc;

	     const int max_time	= 30; /* max amount of time, in	seconds, that
					 mongoc_change_stream_next can block. */

	     mongoc_init ();
	     uri_string	= "mongodb://localhost:27017/db?replicaSet=rs0";
	     uri = mongoc_uri_new_with_error (uri_string, &error);
	     if	(!uri) {
		fprintf	(stderr,
			 "failed to parse URI: %s\n"
			 "error	message:       %s\n",
			 uri_string,
			 error.message);
		goto cleanup;
	     }

	     client = mongoc_client_new_from_uri (uri);
	     if	(!client) {
		goto cleanup;
	     }

	     if	(!_load_resume_token (&opts)) {
		goto cleanup;
	     }
	     BSON_APPEND_INT64 (&opts, "maxAwaitTimeMS", max_time * 1000);

	     printf ("listening	for changes on the client (max %d seconds).\n",	max_time);
	     stream = mongoc_client_watch (client, &pipeline, &opts);

	     while (mongoc_change_stream_next (stream, &doc)) {
		char *as_json;

		as_json	= bson_as_canonical_extended_json (doc,	NULL);
		printf ("change	received: %s\n", as_json);
		bson_free (as_json);
		if (!_save_resume_token	(doc)) {
		   goto	cleanup;
		}
	     }

	     exit_code = EXIT_SUCCESS;

	  cleanup:
	     mongoc_uri_destroy	(uri);
	     bson_destroy (&pipeline);
	     bson_destroy (&opts);
	     mongoc_change_stream_destroy (stream);
	     mongoc_client_destroy (client);
	     mongoc_cleanup ();
	     return exit_code;
	  }

       The following example shows using startAtOperationTime to synchronize a
       change stream with another operation.

       example-start-at-optime.c

	  /* An	example	of starting a change stream with startAtOperationTime. */
	  #include <mongoc/mongoc.h>

	  int
	  main (void)
	  {
	     int exit_code = EXIT_FAILURE;
	     const char	*uri_string;
	     mongoc_uri_t *uri = NULL;
	     bson_error_t error;
	     mongoc_client_t *client = NULL;
	     mongoc_collection_t *coll = NULL;
	     bson_t pipeline = BSON_INITIALIZER;
	     bson_t opts = BSON_INITIALIZER;
	     mongoc_change_stream_t *stream = NULL;
	     bson_iter_t iter;
	     const bson_t *doc;
	     bson_value_t cached_operation_time	= {0};
	     int i;
	     bool r;

	     mongoc_init ();
	     uri_string	= "mongodb://localhost:27017/db?replicaSet=rs0";
	     uri = mongoc_uri_new_with_error (uri_string, &error);
	     if	(!uri) {
		fprintf	(stderr,
			 "failed to parse URI: %s\n"
			 "error	message:       %s\n",
			 uri_string,
			 error.message);
		goto cleanup;
	     }

	     client = mongoc_client_new_from_uri (uri);
	     if	(!client) {
		goto cleanup;
	     }

	     /*	insert five documents. */
	     coll = mongoc_client_get_collection (client, "db",	"coll");
	     for (i = 0; i < 5;	i++) {
		bson_t reply;
		bson_t *insert_cmd = BCON_NEW ("insert", "coll", "documents", "[", "{",	"x", BCON_INT64	(i), "}", "]");

		r = mongoc_collection_write_command_with_opts (coll, insert_cmd, NULL, &reply, &error);
		bson_destroy (insert_cmd);
		if (!r)	{
		   bson_destroy	(&reply);
		   fprintf (stderr, "failed to insert: %s\n", error.message);
		   goto	cleanup;
		}
		if (i == 0) {
		   /* cache the	operation time in the first reply. */
		   if (bson_iter_init_find (&iter, &reply, "operationTime")) {
		      bson_value_copy (bson_iter_value (&iter),	&cached_operation_time);
		   } else {
		      fprintf (stderr, "reply does not contain operationTime.");
		      bson_destroy (&reply);
		      goto cleanup;
		   }
		}
		bson_destroy (&reply);
	     }

	     /*	start a	change stream at the first returned operationTime. */
	     BSON_APPEND_VALUE (&opts, "startAtOperationTime", &cached_operation_time);
	     stream = mongoc_collection_watch (coll, &pipeline,	&opts);

	     /*	since the change stream	started	at the operation time of the first
	      *	insert,	the five inserts are returned. */
	     printf ("listening	for changes on db.coll:\n");
	     while (mongoc_change_stream_next (stream, &doc)) {
		char *as_json;

		as_json	= bson_as_canonical_extended_json (doc,	NULL);
		printf ("change	received: %s\n", as_json);
		bson_free (as_json);
	     }

	     exit_code = EXIT_SUCCESS;

	  cleanup:
	     mongoc_uri_destroy	(uri);
	     bson_destroy (&pipeline);
	     bson_destroy (&opts);
	     if	(cached_operation_time.value_type) {
		bson_value_destroy (&cached_operation_time);
	     }
	     mongoc_change_stream_destroy (stream);
	     mongoc_collection_destroy (coll);
	     mongoc_client_destroy (client);
	     mongoc_cleanup ();
	     return exit_code;
	  }

AUTHOR
       MongoDB,	Inc

COPYRIGHT
       2009-present, MongoDB, Inc.

1.30.2				 Apr 12, 2025	     MONGOC_CHANGE_STREAM_T(3)

Want to link to this manual page? Use this URL:
<https://man.freebsd.org/cgi/man.cgi?query=mongoc_change_stream_t&sektion=3&manpath=FreeBSD+Ports+14.3.quarterly>

home | help