Create a MongoDB sink connector for Aiven for Apache Kafka®
Use the MongoDB sink connector to move data from an Aiven for Apache Kafka® service to a MongoDB database.
Aiven provides two MongoDB sink connectors with different capabilities:
- MongoDB Kafka Sink Connector (by MongoDB): The standard connector maintained by MongoDB.
- MongoDB Sink Connector (by Lenses.io): Supports KCQL transformations for topic data before writing to MongoDB.
For information about the Lenses.io connector, see MongoDB sink connector (Lenses.io).
For detailed configuration parameters, refer to the MongoDB Kafka connector documentation.
Prerequisites
- An Aiven for Apache Kafka® service with Kafka Connect enabled, or a dedicated Aiven for Apache Kafka Connect® service
- A running MongoDB database with valid credentials and network access from the Kafka Connect service
- The following MongoDB connection details:
MONGODB_USERNAME: Database usernameMONGODB_PASSWORD: Database passwordMONGODB_HOST: MongoDB host nameMONGODB_PORT: MongoDB portMONGODB_DATABASE_NAME: Target database name
- A source Kafka topic with data to write to MongoDB
- Access to one of the following setup methods:
- Authentication configured for your project
(for example, set the
AIVEN_API_TOKENenvironment variable if using the CLI or Terraform)
Additional details for Avro data format
If you use Avro serialization, collect the following Schema Registry details:
SCHEMA_REGISTRY_URL: Schema Registry URL, for examplehttps://HOST:PORTSCHEMA_REGISTRY_PORT: Schema Registry portSCHEMA_REGISTRY_USER: Schema Registry usernameSCHEMA_REGISTRY_PASSWORD: Schema Registry password
Create a MongoDB sink connector configuration file
Create a file named mongodb_sink_config.json with the following configuration:
{
"name": "mongodb-sink",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "students",
"connection.uri": "mongodb://USERNAME:PASSWORD@HOST:PORT",
"database": "school",
"collection": "students",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://SCHEMA_REGISTRY_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://SCHEMA_REGISTRY_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
}
Parameters:
name: Name of the connectorconnector.class: Class name of the MongoDB sink connectortopics: Comma-separated list of Kafka topics to sinkconnection.uri: MongoDB connection URIdatabase: Target MongoDB database namecollection: Target MongoDB collection name. If not specified, the connector uses the topic name by defaulttasks.max: Maximum number of parallel tasks for writing datakey.converterandvalue.converter: Define the serialization format for records (Avro, JSON, or others supported by your Kafka service)key.converter.schema.registry.urlandvalue.converter.schema.registry.url: URL of the Schema Registrykey.converter.basic.auth.credentials.sourceandvalue.converter.basic.auth.credentials.source: Method used to supply Schema Registry credentialskey.converter.schema.registry.basic.auth.user.infoandvalue.converter.schema.registry.basic.auth.user.info: Schema Registry credentials in the formatusername:password
For advanced configuration options, including batch size, document mapping, and topic management, refer to the MongoDB Kafka connector documentation.
Create the connector
- Console
- CLI
- Terraform
- Access the Aiven Console.
- Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
- Click Connectors.
- Click Create connector if Kafka Connect is enabled on the service. If not, enable it under Service settings > Actions > Enable Kafka Connect.
- From the list of sink connectors, select MongoDB sink connector, and click Get started.
- In the Common tab, locate the Connector configuration text box and click Edit.
- Paste the configuration from your
mongodb_sink_config.jsonfile into the text box. - Click Create connector.
- Verify the connector status on the Connectors page.
- Verify that data from the Kafka topic appears in MongoDB.
To create the MongoDB sink connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @mongodb_sink_config.json
Replace:
SERVICE_NAME: Name of your Aiven for Apache Kafka or Kafka Connect service@mongodb_sink_config.json: Path to your JSON configuration file
You can configure this connector using the
aiven_kafka_connector
resource in the Aiven Provider for Terraform.
For configuration examples, see the MongoDB sink connector Terraform example.
Example: Create a MongoDB sink connector
The following example creates a MongoDB sink connector that writes data
from the Kafka topic students to a MongoDB database named school.
Kafka topic (students):
key: 1 value: {"name": "carlo"}
key: 2 value: {"name": "lucy"}
key: 3 value: {"name": "mary"}
Connector configuration:
{
"name": "mongodb-sink",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "students",
"connection.uri": "mongodb://USERNAME:PASSWORD@HOST:PORT",
"database": "school",
"tasks.max": "1"
}
This configuration writes records from the Kafka topic students to a collection
named students in the MongoDB database school.
Verify data flow
After creating the connector:
- Check the connector status on the Connectors page in the Aiven Console.
- Verify that a
studentscollection exists in the MongoDB database. - Confirm that records from the Kafka topic are written to the collection.
Related pages