Kafka (Message queue service)

Last updated 3rd June 2021

Objective

Apache Kafka is an open-source stream-processing software platform.

It is a framework for storing, reading and analyzing streaming data. See the Kafka documentation for more information.

Supported versions

Grid
2.1
2.2
2.3
2.4
2.5

Relationship

The format exposed in the $PLATFORM_RELATIONSHIPS environment variable:

{
    "service": "kafka25",
    "ip": "169.254.46.170",
    "hostname": "t7lv3t3ttyh3vyrzgqguj5upwy.kafka25.service._.eu-3.platformsh.site",
    "cluster": "rjify4yjcwxaa-master-7rqtwti",
    "host": "kafka.internal",
    "rel": "kafka",
    "scheme": "kafka",
    "type": "kafka:2.5",
    "port": 9092
}

Usage example

In your .platform/services.yaml:

queuekafka:
    type: kafka:2.5
    disk: 512

In your .platform.app.yaml:

relationships:
    kafkaqueue: "queuekafka:kafka"

You will need to use kafka type when defining the service

```yaml

.platform/services.yaml

service_name: type: kafka:version disk:512 ```

and the endpoint kafka when defining the relationship

```yaml

.platform.app.yaml

relationships: relationship_name: “service_name:kafka” ```

Your service_name and relationship_name are defined by you, but we recommend making them distinct from each other.

You can then use the service in a configuration file of your application with something like:

package sh.platform.languages.sample;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import sh.platform.config.Config;
import sh.platform.config.Kafka;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

public class KafkaSample implements Supplier<String> {

    @Override
    public String get() {
        StringBuilder logger = new StringBuilder();

        // Create a new config object to ease reading the Web PaaS environment variables.
        // You can alternatively use getenv() yourself.
        Config config = new Config();

        try {
            // Get the credentials to connect to the Kafka service.
            final Kafka kafka = config.getCredential("kafka", Kafka::new);
            Map<String, Object> configProducer = new HashMap<>();
            configProducer.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "animals");
            final Producer<Long, String> producer = kafka.getProducer(configProducer);

            // Sending data into the stream.
            RecordMetadata metadata = producer.send(new ProducerRecord<>("animals", "lion")).get();
            logger.append("Record sent with to partition ").append(metadata.partition())
                    .append(" with offset ").append(metadata.offset()).append('\n');

            metadata = producer.send(new ProducerRecord<>("animals", "dog")).get();
            logger.append("Record sent with to partition ").append(metadata.partition())
                    .append(" with offset ").append(metadata.offset()).append('\n');

            metadata = producer.send(new ProducerRecord<>("animals", "cat")).get();
            logger.append("Record sent with to partition ").append(metadata.partition())
                    .append(" with offset ").append(metadata.offset()).append('\n');

            // Consumer, read data from the stream.
            final HashMap<String, Object> configConsumer = new HashMap<>();
            configConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1");
            configConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            Consumer<Long, String> consumer = kafka.getConsumer(configConsumer, "animals");
            ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(3));

            // Print each record.
            consumerRecords.forEach(record -> {
                logger.append("Record: Key " + record.key());
                logger.append(" value " + record.value());
                logger.append(" partition " + record.partition());
                logger.append(" offset " + record.offset()).append('\n');
            });

            // Commits the offset of record to broker.
            consumer.commitSync();

            return logger.toString();
        } catch (Exception exp) {
            throw new RuntimeException("An error when execute Kafka", exp);
        }
    }
}  
from json import dumps
from json import loads
from kafka import KafkaConsumer, KafkaProducer
from platformshconfig import Config


def usage_example():
    # Create a new Config object to ease reading the Web PaaS environment variables.
    # You can alternatively use os.environ yourself.
    config = Config()
    # Get the credentials to connect to the Kafka service.
    credentials = config.credentials('kafka')

    try:
        kafka_server = '{}:{}'.format(credentials['host'], credentials['port'])

        # Producer
        producer = KafkaProducer(
            bootstrap_servers=[kafka_server],
            value_serializer=lambda x: dumps(x).encode('utf-8')
        )
        for e in range(10):
            data = {'number' : e}
            producer.send('numtest', value=data)

        # Consumer
        consumer = KafkaConsumer(
            bootstrap_servers=[kafka_server],
            auto_offset_reset='earliest'
        )

        consumer.subscribe(['numtest'])

        output = ''
        # For demonstration purposes so it doesn't block.
        for e in range(10):
            message = next(consumer)
            output += str(loads(message.value.decode('UTF-8'))["number"]) + ', '

        # What a real implementation would do instead.
        # for message in consumer:
        #     output += loads(message.value.decode('UTF-8'))["number"]

        return output

    except Exception as e:
        return e
## With the ruby-kafka gem

# Producer
require "kafka"
kafka = Kafka.new(["kafka.internal:9092"], client_id: "my-application")
kafka.deliver_message("Hello, World!", topic: "greetings")

# Consumer
kafka.each_message(topic: "greetings") do |message|
  puts message.offset, message.key, message.value
end

(The specific way to inject configuration into your application will vary. Consult your application or framework's documentation.)


Did you find this guide useful?

Please feel free to give any suggestions in order to improve this documentation.

Whether your feedback is about images, content, or structure, please share it, so that we can improve it together.

Your support requests will not be processed via this form. To do this, please use the "Create a ticket" form.

Thank you. Your feedback has been received.


These guides might also interest you...

OVHcloud Community

Access your community space. Ask questions, search for information, post content, and interact with other OVHcloud Community members.

Discuss with the OVHcloud community

In accordance with the 2006/112/CE Directive, modified on 01/01/2015, prices exclude VAT. VAT may vary according to the customer's country of residence.