public class KafkaHelper extends Object
| Modifier and Type | Class and Description |
|---|---|
static class |
KafkaHelper.RecordConsumer<K,V> |
| Modifier and Type | Method and Description |
|---|---|
<K,V> com.google.common.util.concurrent.ListenableFuture<List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> |
consume(String topic,
org.apache.kafka.clients.consumer.KafkaConsumer<K,V> consumer,
int numMessagesToConsume)
Attempt to consume the specified number of messages
|
Properties |
consumerConfig()
Get the consumer configuration (with auto-commit enabled)
|
Properties |
consumerConfig(boolean enableAutoCommit)
Get the consumer configuration
|
com.google.common.util.concurrent.ListenableFuture<List<String>> |
consumeStrings(String topic,
int numMessagesToConsume)
Consume specified number of string messages
|
org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]> |
createByteConsumer()
Create a consumer that reads bytes
|
org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]> |
createByteConsumer(Properties overrideConfig)
Create a consumer that reads bytes
|
org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> |
createByteProducer()
Create a producer that writes byte keys and values
|
org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> |
createByteProducer(Properties overrideConfig)
Create a producer that writes byte keys and values
|
<K,V> org.apache.kafka.clients.consumer.KafkaConsumer<K,V> |
createConsumer(org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer,
org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer,
Properties overrideConfig)
Create a consumer that can read from this broker
|
static KafkaHelper |
createFor(EphemeralKafkaBroker broker) |
<K,V> org.apache.kafka.clients.producer.KafkaProducer<K,V> |
createProducer(org.apache.kafka.common.serialization.Serializer<K> keySerializer,
org.apache.kafka.common.serialization.Serializer<V> valueSerializer,
Properties overrideConfig)
Create a producer that can write to this broker
|
org.apache.kafka.clients.consumer.KafkaConsumer<String,String> |
createStringConsumer()
Create a consumer that reads strings
|
org.apache.kafka.clients.consumer.KafkaConsumer<String,String> |
createStringConsumer(Properties overrideConfig)
Create a consumer that reads strings
|
org.apache.kafka.clients.producer.KafkaProducer<String,String> |
createStringProducer()
Create a producer that writes String keys and values
|
org.apache.kafka.clients.producer.KafkaProducer<String,String> |
createStringProducer(Properties overrideConfig)
Create a producer that writes String keys and values
|
int |
kafkaPort()
Get the broker listener port
|
<K,V> void |
produce(String topic,
org.apache.kafka.clients.producer.KafkaProducer<K,V> producer,
Map<K,V> data)
Produce data to the specified topic
|
Properties |
producerConfig()
Get the producer configuration
|
void |
produceStrings(String topic,
String... values)
Convenience method to produce a set of strings to the specified topic
|
String |
zookeeperConnectionString()
Get the zookeeper connection string
|
int |
zookeeperPort()
Get the zookeeper port
|
public static KafkaHelper createFor(EphemeralKafkaBroker broker)
public Properties producerConfig()
public Properties consumerConfig()
public Properties consumerConfig(boolean enableAutoCommit)
enableAutoCommit - Enable auto-commitpublic String zookeeperConnectionString()
IllegalStateException if the broker is not runningpublic int zookeeperPort()
IllegalStateException if the broker is not runningpublic int kafkaPort()
IllegalStateException if the broker is not runningpublic <K,V> org.apache.kafka.clients.producer.KafkaProducer<K,V> createProducer(org.apache.kafka.common.serialization.Serializer<K> keySerializer,
org.apache.kafka.common.serialization.Serializer<V> valueSerializer,
Properties overrideConfig)
K - Type of KeyV - Type of ValuekeySerializer - Key serializer classvalueSerializer - Valuer serializer classoverrideConfig - Producer config to override. Pass null if there aren't any.public org.apache.kafka.clients.producer.KafkaProducer<String,String> createStringProducer()
public org.apache.kafka.clients.producer.KafkaProducer<String,String> createStringProducer(Properties overrideConfig)
overrideConfig - Producer config to overridepublic org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> createByteProducer()
public org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> createByteProducer(Properties overrideConfig)
overrideConfig - Producer config to overridepublic <K,V> void produce(String topic, org.apache.kafka.clients.producer.KafkaProducer<K,V> producer, Map<K,V> data)
K - Type of keyV - Type of valuetopic - Topic to produce toproducer - Producer to usedata - Data to producepublic void produceStrings(String topic, String... values)
topic - Topic to produce tovalues - Values producepublic <K,V> org.apache.kafka.clients.consumer.KafkaConsumer<K,V> createConsumer(org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer,
org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer,
Properties overrideConfig)
K - Type of KeyV - Type of ValuekeyDeserializer - Key deserializervalueDeserializer - Value deserializeroverrideConfig - Consumer config to override. Pass null if there aren't anypublic org.apache.kafka.clients.consumer.KafkaConsumer<String,String> createStringConsumer()
public org.apache.kafka.clients.consumer.KafkaConsumer<String,String> createStringConsumer(Properties overrideConfig)
overrideConfig - Consumer config to overridepublic org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]> createByteConsumer()
public org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]> createByteConsumer(Properties overrideConfig)
overrideConfig - Consumer config to overridepublic <K,V> com.google.common.util.concurrent.ListenableFuture<List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> consume(String topic, org.apache.kafka.clients.consumer.KafkaConsumer<K,V> consumer, int numMessagesToConsume)
K - Type of KeyV - Type of Valuetopic - Topic to consumeconsumer - Consumer to usenumMessagesToConsume - Number of messages to consumepublic com.google.common.util.concurrent.ListenableFuture<List<String>> consumeStrings(String topic, int numMessagesToConsume)
topic - Topic to consume fromnumMessagesToConsume - Number of messages to consumeCopyright © 2017. All rights reserved.