Skip to content

Kafka

Install

1
npm install @testcontainers/kafka --save-dev

Examples

Kafka 8.x

These examples use the following libraries:

Choose an image from the container registry and substitute IMAGE.

Produce/consume a message

1
2
3
await using container = await new KafkaContainer(IMAGE).start();

await assertMessageProducedAndConsumed(container);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
export async function assertMessageProducedAndConsumed(
  container: StartedKafkaContainer,
  additionalConfig: Partial<KafkaConfig> = {}
) {
  const brokers = [`${container.getHost()}:${container.getMappedPort(9093)}`];
  const kafka = new Kafka({ logLevel: logLevel.NOTHING, brokers: brokers, ...additionalConfig });

  const producer = kafka.producer();
  await producer.connect();
  const consumer = kafka.consumer({ groupId: "test-group" });
  await consumer.connect();

  await producer.send({ topic: "test-topic", messages: [{ value: "test message" }] });
  await consumer.subscribe({ topic: "test-topic", fromBeginning: true });

  const consumedMessage = await new Promise((resolve) =>
    consumer.run({
      eachMessage: async ({ message }) => resolve(message.value?.toString()),
    })
  );
  expect(consumedMessage).toBe("test message");

  await consumer.disconnect();
  await producer.disconnect();
}

With SSL

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
const saslConfig: SaslSslListenerOptions = {
  port: 9096,
  sasl: {
    mechanism: "SCRAM-SHA-512",
    user: {
      name: "app-user",
      password: "userPassword",
    },
  },
  keystore: {
    content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
    passphrase: "serverKeystorePassword",
  },
  truststore: {
    content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
    passphrase: "serverTruststorePassword",
  },
};

await using container = await new KafkaContainer(IMAGE).withSaslSslListener(saslConfig).start();

await assertMessageProducedAndConsumed(container, {
  brokers: [`${container.getHost()}:${container.getMappedPort(9096)}`],
  sasl: {
    username: "app-user",
    password: "userPassword",
    mechanism: "scram-sha-512",
  },
  ssl: {
    ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
  },
});
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
export async function assertMessageProducedAndConsumed(
  container: StartedKafkaContainer,
  additionalConfig: Partial<KafkaConfig> = {}
) {
  const brokers = [`${container.getHost()}:${container.getMappedPort(9093)}`];
  const kafka = new Kafka({ logLevel: logLevel.NOTHING, brokers: brokers, ...additionalConfig });

  const producer = kafka.producer();
  await producer.connect();
  const consumer = kafka.consumer({ groupId: "test-group" });
  await consumer.connect();

  await producer.send({ topic: "test-topic", messages: [{ value: "test message" }] });
  await consumer.subscribe({ topic: "test-topic", fromBeginning: true });

  const consumedMessage = await new Promise((resolve) =>
    consumer.run({
      eachMessage: async ({ message }) => resolve(message.value?.toString()),
    })
  );
  expect(consumedMessage).toBe("test message");

  await consumer.disconnect();
  await producer.disconnect();
}

Kafka 7.x

These examples use the following libraries:

Choose an image from the container registry and substitute IMAGE.

Produce/consume a message

1
2
await using container = await new KafkaContainer(IMAGE).start();
await assertMessageProducedAndConsumed(container);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
export async function assertMessageProducedAndConsumed(
  container: StartedKafkaContainer,
  additionalConfig: Partial<KafkaConfig> = {}
) {
  const brokers = [`${container.getHost()}:${container.getMappedPort(9093)}`];
  const kafka = new Kafka({ logLevel: logLevel.NOTHING, brokers: brokers, ...additionalConfig });

  const producer = kafka.producer();
  await producer.connect();
  const consumer = kafka.consumer({ groupId: "test-group" });
  await consumer.connect();

  await producer.send({ topic: "test-topic", messages: [{ value: "test message" }] });
  await consumer.subscribe({ topic: "test-topic", fromBeginning: true });

  const consumedMessage = await new Promise((resolve) =>
    consumer.run({
      eachMessage: async ({ message }) => resolve(message.value?.toString()),
    })
  );
  expect(consumedMessage).toBe("test message");

  await consumer.disconnect();
  await producer.disconnect();
}

With SSL

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.5.0")
  .withSaslSslListener({
    port: 9096,
    sasl: {
      mechanism: "SCRAM-SHA-512",
      user: {
        name: "app-user",
        password: "userPassword",
      },
    },
    keystore: {
      content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
      passphrase: "serverKeystorePassword",
    },
    truststore: {
      content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
      passphrase: "serverTruststorePassword",
    },
  })
  .start();

await assertMessageProducedAndConsumed(container, {
  brokers: [`${container.getHost()}:${container.getMappedPort(9096)}`],
  sasl: {
    username: "app-user",
    password: "userPassword",
    mechanism: "scram-sha-512",
  },
  ssl: {
    ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
  },
});
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
export async function assertMessageProducedAndConsumed(
  container: StartedKafkaContainer,
  additionalConfig: Partial<KafkaConfig> = {}
) {
  const brokers = [`${container.getHost()}:${container.getMappedPort(9093)}`];
  const kafka = new Kafka({ logLevel: logLevel.NOTHING, brokers: brokers, ...additionalConfig });

  const producer = kafka.producer();
  await producer.connect();
  const consumer = kafka.consumer({ groupId: "test-group" });
  await consumer.connect();

  await producer.send({ topic: "test-topic", messages: [{ value: "test message" }] });
  await consumer.subscribe({ topic: "test-topic", fromBeginning: true });

  const consumedMessage = await new Promise((resolve) =>
    consumer.run({
      eachMessage: async ({ message }) => resolve(message.value?.toString()),
    })
  );
  expect(consumedMessage).toBe("test message");

  await consumer.disconnect();
  await producer.disconnect();
}

With provided ZooKeeper

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
await using network = await new Network().start();

const zooKeeperHost = "zookeeper";
const zooKeeperPort = 2181;

await using _ = await new GenericContainer("confluentinc/cp-zookeeper:5.5.4")
  .withNetwork(network)
  .withNetworkAliases(zooKeeperHost)
  .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
  .withExposedPorts(zooKeeperPort)
  .start();

await using container = await new KafkaContainer(IMAGE)
  .withNetwork(network)
  .withZooKeeper(zooKeeperHost, zooKeeperPort)
  .start();

With Kraft

1
await using container = await new KafkaContainer(IMAGE).withKraft().start();