Kafka Module¶
Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Install¶
npm install @testcontainers/kafka --save-dev
Kafka 8.x¶
Examples¶
it("should connect", async () => {
await using kafkaContainer = await new KafkaContainer(IMAGE).start();
await testPubSub(kafkaContainer);
});
it(`should connect with SASL`, async () => {
const saslConfig: SaslSslListenerOptions = {
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
passphrase: "serverKeystorePassword",
},
truststore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
passphrase: "serverTruststorePassword",
},
};
const kafkaContainer = new KafkaContainer("confluentinc/cp-kafka:7.5.0").withSaslSslListener(saslConfig);
await using startedKafkaContainer = await kafkaContainer.start();
await testPubSub(startedKafkaContainer, {
brokers: [`${startedKafkaContainer.getHost()}:${startedKafkaContainer.getMappedPort(9096)}`],
sasl: {
username: "app-user",
password: "userPassword",
mechanism: "scram-sha-512",
},
ssl: {
ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
},
});
});
Kafka 7.x¶
Examples¶
it("should connect using in-built zoo-keeper", async () => {
await using kafkaContainer = await new KafkaContainer(IMAGE).start();
await testPubSub(kafkaContainer);
});
it("should connect using provided zoo-keeper and network", async () => {
await using network = await new Network().start();
const zooKeeperHost = "zookeeper";
const zooKeeperPort = 2181;
await using _ = await new GenericContainer("confluentinc/cp-zookeeper:5.5.4")
.withNetwork(network)
.withNetworkAliases(zooKeeperHost)
.withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
.withExposedPorts(zooKeeperPort)
.start();
await using kafkaContainer = await new KafkaContainer(IMAGE)
.withNetwork(network)
.withZooKeeper(zooKeeperHost, zooKeeperPort)
.start();
await testPubSub(kafkaContainer);
});
it(`should connect locally`, async () => {
const kafkaContainer = await new KafkaContainer("confluentinc/cp-kafka:7.5.0").withSaslSslListener({
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
passphrase: "serverKeystorePassword",
},
truststore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
passphrase: "serverTruststorePassword",
},
});
configure(kafkaContainer);
await using startedKafkaContainer = await kafkaContainer.start();
await testPubSub(startedKafkaContainer, {
brokers: [`${startedKafkaContainer.getHost()}:${startedKafkaContainer.getMappedPort(9096)}`],
sasl: {
username: "app-user",
password: "userPassword",
mechanism: "scram-sha-512",
},
ssl: {
ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
},
});
});
it("should connect using kraft", async () => {
await using kafkaContainer = await new KafkaContainer(IMAGE).withKraft().start();
await testPubSub(kafkaContainer);
});