Skip to content

Kafka Module

Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Install

npm install @testcontainers/kafka --save-dev

Examples

it("should connect using in-built zoo-keeper", async () => {
  const kafkaContainer = await new KafkaContainer().withExposedPorts(9093).start();

  await testPubSub(kafkaContainer);

  await kafkaContainer.stop();
});
it("should connect using provided zoo-keeper and network", async () => {
  const network = await new Network().start();

  const zooKeeperHost = "zookeeper";
  const zooKeeperPort = 2181;
  const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:5.5.4")
    .withNetwork(network)
    .withNetworkAliases(zooKeeperHost)
    .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
    .withExposedPorts(zooKeeperPort)
    .start();

  const kafkaContainer = await new KafkaContainer()
    .withNetwork(network)
    .withZooKeeper(zooKeeperHost, zooKeeperPort)
    .withExposedPorts(9093)
    .start();

  await testPubSub(kafkaContainer);

  await zookeeperContainer.stop();
  await kafkaContainer.stop();
  await network.stop();
});
it(`should connect locally`, async () => {
  const kafkaContainer = await new KafkaContainer("confluentinc/cp-kafka:7.5.0").withSaslSslListener({
    port: 9096,
    sasl: {
      mechanism: "SCRAM-SHA-512",
      user: {
        name: "app-user",
        password: "userPassword",
      },
    },
    keystore: {
      content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
      passphrase: "serverKeystorePassword",
    },
    truststore: {
      content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
      passphrase: "serverTruststorePassword",
    },
  });
  configure(kafkaContainer);
  const startedKafkaContainer = await kafkaContainer.start();

  await testPubSub(startedKafkaContainer, {
    brokers: [`${startedKafkaContainer.getHost()}:${startedKafkaContainer.getMappedPort(9096)}`],
    sasl: {
      username: "app-user",
      password: "userPassword",
      mechanism: "scram-sha-512",
    },
    ssl: {
      ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
    },
  });
  await startedKafkaContainer.stop();
});
it("should connect using kraft", async () => {
  const kafkaContainer = await new KafkaContainer().withKraft().withExposedPorts(9093).start();

  await testPubSub(kafkaContainer);

  await kafkaContainer.stop();
});