Kafka
Install
| npm install @testcontainers/kafka --save-dev
|
Examples
Kafka 8.x
These examples use the following libraries:
Choose an image from the container registry and substitute IMAGE
.
Produce/consume a message
With SSL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 | const saslConfig: SaslSslListenerOptions = {
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
passphrase: "serverKeystorePassword",
},
truststore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
passphrase: "serverTruststorePassword",
},
};
await using container = await new KafkaContainer(IMAGE).withSaslSslListener(saslConfig).start();
await assertMessageProducedAndConsumed(container, {
brokers: [`${container.getHost()}:${container.getMappedPort(9096)}`],
sasl: {
username: "app-user",
password: "userPassword",
mechanism: "scram-sha-512",
},
ssl: {
ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
},
});
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | export async function assertMessageProducedAndConsumed(
container: StartedKafkaContainer,
additionalConfig: Partial<KafkaConfig> = {}
) {
const brokers = [`${container.getHost()}:${container.getMappedPort(9093)}`];
const kafka = new Kafka({ logLevel: logLevel.NOTHING, brokers: brokers, ...additionalConfig });
const producer = kafka.producer();
await producer.connect();
const consumer = kafka.consumer({ groupId: "test-group" });
await consumer.connect();
await producer.send({ topic: "test-topic", messages: [{ value: "test message" }] });
await consumer.subscribe({ topic: "test-topic", fromBeginning: true });
const consumedMessage = await new Promise((resolve) =>
consumer.run({
eachMessage: async ({ message }) => resolve(message.value?.toString()),
})
);
expect(consumedMessage).toBe("test message");
await consumer.disconnect();
await producer.disconnect();
}
|
Kafka 7.x
These examples use the following libraries:
Choose an image from the container registry and substitute IMAGE
.
Produce/consume a message
With SSL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 | await using container = await new KafkaContainer("confluentinc/cp-kafka:7.5.0")
.withSaslSslListener({
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
passphrase: "serverKeystorePassword",
},
truststore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
passphrase: "serverTruststorePassword",
},
})
.start();
await assertMessageProducedAndConsumed(container, {
brokers: [`${container.getHost()}:${container.getMappedPort(9096)}`],
sasl: {
username: "app-user",
password: "userPassword",
mechanism: "scram-sha-512",
},
ssl: {
ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
},
});
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | export async function assertMessageProducedAndConsumed(
container: StartedKafkaContainer,
additionalConfig: Partial<KafkaConfig> = {}
) {
const brokers = [`${container.getHost()}:${container.getMappedPort(9093)}`];
const kafka = new Kafka({ logLevel: logLevel.NOTHING, brokers: brokers, ...additionalConfig });
const producer = kafka.producer();
await producer.connect();
const consumer = kafka.consumer({ groupId: "test-group" });
await consumer.connect();
await producer.send({ topic: "test-topic", messages: [{ value: "test message" }] });
await consumer.subscribe({ topic: "test-topic", fromBeginning: true });
const consumedMessage = await new Promise((resolve) =>
consumer.run({
eachMessage: async ({ message }) => resolve(message.value?.toString()),
})
);
expect(consumedMessage).toBe("test message");
await consumer.disconnect();
await producer.disconnect();
}
|
With provided ZooKeeper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | await using network = await new Network().start();
const zooKeeperHost = "zookeeper";
const zooKeeperPort = 2181;
await using _ = await new GenericContainer("confluentinc/cp-zookeeper:5.5.4")
.withNetwork(network)
.withNetworkAliases(zooKeeperHost)
.withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
.withExposedPorts(zooKeeperPort)
.start();
await using container = await new KafkaContainer(IMAGE)
.withNetwork(network)
.withZooKeeper(zooKeeperHost, zooKeeperPort)
.start();
|
With Kraft
| await using container = await new KafkaContainer(IMAGE).withKraft().start();
|