Kafka Module
Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Install
npm install @testcontainers/kafka --save-dev
Examples
it("should connect using in-built zoo-keeper", async () => {
const kafkaContainer = await new KafkaContainer().withExposedPorts(9093).start();
await testPubSub(kafkaContainer);
await kafkaContainer.stop();
});
it("should connect using provided zoo-keeper and network", async () => {
const network = await new Network().start();
const zooKeeperHost = "zookeeper";
const zooKeeperPort = 2181;
const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:5.5.4")
.withNetwork(network)
.withNetworkAliases(zooKeeperHost)
.withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
.withExposedPorts(zooKeeperPort)
.start();
const kafkaContainer = await new KafkaContainer()
.withNetwork(network)
.withZooKeeper(zooKeeperHost, zooKeeperPort)
.withExposedPorts(9093)
.start();
await testPubSub(kafkaContainer);
await zookeeperContainer.stop();
await kafkaContainer.stop();
await network.stop();
});
it(`should connect locally`, async () => {
const kafkaContainer = await new KafkaContainer("confluentinc/cp-kafka:7.5.0").withSaslSslListener({
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
passphrase: "serverKeystorePassword",
},
truststore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
passphrase: "serverTruststorePassword",
},
});
configure(kafkaContainer);
const startedKafkaContainer = await kafkaContainer.start();
await testPubSub(startedKafkaContainer, {
brokers: [`${startedKafkaContainer.getHost()}:${startedKafkaContainer.getMappedPort(9096)}`],
sasl: {
username: "app-user",
password: "userPassword",
mechanism: "scram-sha-512",
},
ssl: {
ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
},
});
await startedKafkaContainer.stop();
});
it("should connect using kraft", async () => {
const kafkaContainer = await new KafkaContainer().withKraft().withExposedPorts(9093).start();
await testPubSub(kafkaContainer);
await kafkaContainer.stop();
});