public class KafkaHelper extends Object
Modifier and Type | Class and Description |
---|---|
static class |
KafkaHelper.RecordConsumer<K,V> |
Modifier and Type | Method and Description |
---|---|
<K,V> com.google.common.util.concurrent.ListenableFuture<List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> |
consume(String topic,
org.apache.kafka.clients.consumer.KafkaConsumer<K,V> consumer,
int numMessagesToConsume)
Attempt to consume the specified number of messages
|
Properties |
consumerConfig()
Get the consumer configuration (with auto-commit enabled)
|
Properties |
consumerConfig(boolean enableAutoCommit)
Get the consumer configuration
|
com.google.common.util.concurrent.ListenableFuture<List<String>> |
consumeStrings(String topic,
int numMessagesToConsume)
Consume specified number of string messages
|
org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]> |
createByteConsumer()
Create a consumer that reads bytes
|
org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]> |
createByteConsumer(Properties overrideConfig)
Create a consumer that reads bytes
|
org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> |
createByteProducer()
Create a producer that writes byte keys and values
|
org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> |
createByteProducer(Properties overrideConfig)
Create a producer that writes byte keys and values
|
<K,V> org.apache.kafka.clients.consumer.KafkaConsumer<K,V> |
createConsumer(org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer,
org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer,
Properties overrideConfig)
Create a consumer that can read from this broker
|
static KafkaHelper |
createFor(EphemeralKafkaBroker broker) |
<K,V> org.apache.kafka.clients.producer.KafkaProducer<K,V> |
createProducer(org.apache.kafka.common.serialization.Serializer<K> keySerializer,
org.apache.kafka.common.serialization.Serializer<V> valueSerializer,
Properties overrideConfig)
Create a producer that can write to this broker
|
org.apache.kafka.clients.consumer.KafkaConsumer<String,String> |
createStringConsumer()
Create a consumer that reads strings
|
org.apache.kafka.clients.consumer.KafkaConsumer<String,String> |
createStringConsumer(Properties overrideConfig)
Create a consumer that reads strings
|
org.apache.kafka.clients.producer.KafkaProducer<String,String> |
createStringProducer()
Create a producer that writes String keys and values
|
org.apache.kafka.clients.producer.KafkaProducer<String,String> |
createStringProducer(Properties overrideConfig)
Create a producer that writes String keys and values
|
int |
kafkaPort()
Get the broker listener port
|
<K,V> void |
produce(String topic,
org.apache.kafka.clients.producer.KafkaProducer<K,V> producer,
Map<K,V> data)
Produce data to the specified topic
|
Properties |
producerConfig()
Get the producer configuration
|
void |
produceStrings(String topic,
String... values)
Convenience method to produce a set of strings to the specified topic
|
String |
zookeeperConnectionString()
Get the zookeeper connection string
|
int |
zookeeperPort()
Get the zookeeper port
|
public static KafkaHelper createFor(EphemeralKafkaBroker broker)
public Properties producerConfig()
public Properties consumerConfig()
public Properties consumerConfig(boolean enableAutoCommit)
enableAutoCommit
- Enable auto-commitpublic String zookeeperConnectionString()
IllegalStateException
if the broker is not runningpublic int zookeeperPort()
IllegalStateException
if the broker is not runningpublic int kafkaPort()
IllegalStateException
if the broker is not runningpublic <K,V> org.apache.kafka.clients.producer.KafkaProducer<K,V> createProducer(org.apache.kafka.common.serialization.Serializer<K> keySerializer, org.apache.kafka.common.serialization.Serializer<V> valueSerializer, Properties overrideConfig)
K
- Type of KeyV
- Type of ValuekeySerializer
- Key serializer classvalueSerializer
- Valuer serializer classoverrideConfig
- Producer config to override. Pass null if there aren't any.public org.apache.kafka.clients.producer.KafkaProducer<String,String> createStringProducer()
public org.apache.kafka.clients.producer.KafkaProducer<String,String> createStringProducer(Properties overrideConfig)
overrideConfig
- Producer config to overridepublic org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> createByteProducer()
public org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> createByteProducer(Properties overrideConfig)
overrideConfig
- Producer config to overridepublic <K,V> void produce(String topic, org.apache.kafka.clients.producer.KafkaProducer<K,V> producer, Map<K,V> data)
K
- Type of keyV
- Type of valuetopic
- Topic to produce toproducer
- Producer to usedata
- Data to producepublic void produceStrings(String topic, String... values)
topic
- Topic to produce tovalues
- Values producepublic <K,V> org.apache.kafka.clients.consumer.KafkaConsumer<K,V> createConsumer(org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer, Properties overrideConfig)
K
- Type of KeyV
- Type of ValuekeyDeserializer
- Key deserializervalueDeserializer
- Value deserializeroverrideConfig
- Consumer config to override. Pass null if there aren't anypublic org.apache.kafka.clients.consumer.KafkaConsumer<String,String> createStringConsumer()
public org.apache.kafka.clients.consumer.KafkaConsumer<String,String> createStringConsumer(Properties overrideConfig)
overrideConfig
- Consumer config to overridepublic org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]> createByteConsumer()
public org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]> createByteConsumer(Properties overrideConfig)
overrideConfig
- Consumer config to overridepublic <K,V> com.google.common.util.concurrent.ListenableFuture<List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> consume(String topic, org.apache.kafka.clients.consumer.KafkaConsumer<K,V> consumer, int numMessagesToConsume)
K
- Type of KeyV
- Type of Valuetopic
- Topic to consumeconsumer
- Consumer to usenumMessagesToConsume
- Number of messages to consumepublic com.google.common.util.concurrent.ListenableFuture<List<String>> consumeStrings(String topic, int numMessagesToConsume)
topic
- Topic to consume fromnumMessagesToConsume
- Number of messages to consumeCopyright © 2017. All rights reserved.