Last updated 3rd June 2021
Objective
Apache Kafka is an open-source stream-processing software platform.
It is a framework for storing, reading and analyzing streaming data. See the Kafka documentation for more information.
Supported versions
Grid |
---|
2.1 |
2.2 |
2.3 |
2.4 |
2.5 |
Relationship
The format exposed in the $PLATFORM_RELATIONSHIPS
environment variable:
{
"service": "kafka25",
"ip": "169.254.46.170",
"hostname": "t7lv3t3ttyh3vyrzgqguj5upwy.kafka25.service._.eu-3.platformsh.site",
"cluster": "rjify4yjcwxaa-master-7rqtwti",
"host": "kafka.internal",
"rel": "kafka",
"scheme": "kafka",
"type": "kafka:2.5",
"port": 9092
}
Usage example
In your .platform/services.yaml
:
queuekafka:
type: kafka:2.5
disk: 512
In your .platform.app.yaml
:
relationships:
kafkaqueue: "queuekafka:kafka"
You will need to use
kafka
type when defining the service```yaml
.platform/services.yaml
service_name: type: kafka:version disk:512 ```
and the endpoint
kafka
when defining the relationship```yaml
.platform.app.yaml
relationships: relationship_name: “service_name:kafka” ```
Your
service_name
andrelationship_name
are defined by you, but we recommend making them distinct from each other.
You can then use the service in a configuration file of your application with something like:
package sh.platform.languages.sample;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import sh.platform.config.Config;
import sh.platform.config.Kafka;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
public class KafkaSample implements Supplier<String> {
@Override
public String get() {
StringBuilder logger = new StringBuilder();
// Create a new config object to ease reading the Web PaaS environment variables.
// You can alternatively use getenv() yourself.
Config config = new Config();
try {
// Get the credentials to connect to the Kafka service.
final Kafka kafka = config.getCredential("kafka", Kafka::new);
Map<String, Object> configProducer = new HashMap<>();
configProducer.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "animals");
final Producer<Long, String> producer = kafka.getProducer(configProducer);
// Sending data into the stream.
RecordMetadata metadata = producer.send(new ProducerRecord<>("animals", "lion")).get();
logger.append("Record sent with to partition ").append(metadata.partition())
.append(" with offset ").append(metadata.offset()).append('\n');
metadata = producer.send(new ProducerRecord<>("animals", "dog")).get();
logger.append("Record sent with to partition ").append(metadata.partition())
.append(" with offset ").append(metadata.offset()).append('\n');
metadata = producer.send(new ProducerRecord<>("animals", "cat")).get();
logger.append("Record sent with to partition ").append(metadata.partition())
.append(" with offset ").append(metadata.offset()).append('\n');
// Consumer, read data from the stream.
final HashMap<String, Object> configConsumer = new HashMap<>();
configConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1");
configConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<Long, String> consumer = kafka.getConsumer(configConsumer, "animals");
ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(3));
// Print each record.
consumerRecords.forEach(record -> {
logger.append("Record: Key " + record.key());
logger.append(" value " + record.value());
logger.append(" partition " + record.partition());
logger.append(" offset " + record.offset()).append('\n');
});
// Commits the offset of record to broker.
consumer.commitSync();
return logger.toString();
} catch (Exception exp) {
throw new RuntimeException("An error when execute Kafka", exp);
}
}
}
from json import dumps
from json import loads
from kafka import KafkaConsumer, KafkaProducer
from platformshconfig import Config
def usage_example():
# Create a new Config object to ease reading the Web PaaS environment variables.
# You can alternatively use os.environ yourself.
config = Config()
# Get the credentials to connect to the Kafka service.
credentials = config.credentials('kafka')
try:
kafka_server = '{}:{}'.format(credentials['host'], credentials['port'])
# Producer
producer = KafkaProducer(
bootstrap_servers=[kafka_server],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
for e in range(10):
data = {'number' : e}
producer.send('numtest', value=data)
# Consumer
consumer = KafkaConsumer(
bootstrap_servers=[kafka_server],
auto_offset_reset='earliest'
)
consumer.subscribe(['numtest'])
output = ''
# For demonstration purposes so it doesn't block.
for e in range(10):
message = next(consumer)
output += str(loads(message.value.decode('UTF-8'))["number"]) + ', '
# What a real implementation would do instead.
# for message in consumer:
# output += loads(message.value.decode('UTF-8'))["number"]
return output
except Exception as e:
return e
## With the ruby-kafka gem
# Producer
require "kafka"
kafka = Kafka.new(["kafka.internal:9092"], client_id: "my-application")
kafka.deliver_message("Hello, World!", topic: "greetings")
# Consumer
kafka.each_message(topic: "greetings") do |message|
puts message.offset, message.key, message.value
end
(The specific way to inject configuration into your application will vary. Consult your application or framework's documentation.)
Did you find this guide useful?
Please feel free to give any suggestions in order to improve this documentation.
Whether your feedback is about images, content, or structure, please share it, so that we can improve it together.
Your support requests will not be processed via this form. To do this, please use the "Create a ticket" form.
Thank you. Your feedback has been received.