
Managing Apache Kafka Programmatically

Clairvoyant carries vast experience in Big data and Cloud technologies. We aim to explore the core concepts of Apache Kafka and other Big Data technologies to provide the best-optimized solutions to clients. With the introduction of AdminClient feature in Kafka, we have utilized this alternative for essential topic management to provide better solutions to clients to manage Kafka programmatically.
Introduction
Apache Kafka added the AdminClient in version 0.11 to provide a programmatic API for administrative functionality that was previously done in the command line: Listing, Creating, and Deleting topics, describing the cluster, managing ACLs, and modifying configurations.
Here’s one example: Your application is going to produce events to a specific topic. This means that before producing the first event, the topic has to exist. Before Apache Kafka added the admin client, there were few options, and none of them particularly user-friendly:
- You could capture exceptions from the method and let your user know that they need to create the topic
- You could hope that the Kafka cluster you are writing to enabled automatic topic creation
- You can try to rely on internal APIs and deal with the consequences of no compatibility guarantees
Now that Apache Kafka provides AdminClient, there is a much better solution: Use AdminClient to check whether the topic exists, and if it does not, create it on the spot.
AdminClient Lifecycle: Creating, Configuring, and Closing
In order to use Kafka’s AdminClient, the first thing you have to do is construct an instance of the AdminClient class. This is quite straightforward:
Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(properties);
adminClient.close(Duration.ofSeconds(30));
The static method takes a object argument with configuration. The only mandatory configuration is the URI for your cluster (a comma-separated list of brokers to connect to).
If you start an AdminClient, eventually you want to close it. It is important to remember that when you call , there could still be some AdminClient operations in progress. Therefore the method accepts a timeout parameter. Once you call , you can’t call any other methods and send any more requests, but the client will wait for responses until the timeout expires. After the timeout expires, the client will abort all ongoing operations with timeout exception and release all resources. Calling without a timeout implies that you’ll wait as long as it takes for all ongoing operations to complete.
Essential Topic Management
Now that we created and configured an AdminClient, it is time to see what we can do with it. The most common use case for Kafka’s AdminClient is topic management. This includes listing topics, describing them, creating topics, and deleting them.
- Listing all Topics in the Cluster
topics.names().get().forEach(System.out::println);
Note that returns a object which is a thin wrapper over a collection of . returns a future set of names. When we call on this future, the executing thread will wait until the server responds with a set of topic names, or we get a timeout exception. Once we get the list, we iterate over it to print all the topic names.
- Describing Topics in the Cluster
- To check that the topic exists with the correct configuration, we call with a list of topic names that we want to validate. This returns a object, which wraps a map of topic names to future descriptions.
- If the topic does exist, the future completes by returning a object, which contains a list of all the partitions of the topic and for each partition which broker is the leader, a list of replicas, and a list of in-sync replicas. Note: this does not include the configuration of the topic.
- If the topic does not exist, the server can’t respond with its description. In this case, the server will send an error and the future will complete by throwing an . The actual error sent by the server will be the of the exception. Since we want to handle the case where the topic doesn’t exist, we handle these exceptions by creating a topic on the spot.
- Note that all AdminClient result objects throw when Kafka responds with an error. This is because AdminClient results are wrapped Future objects and those wrap exceptions. You always need to examine the cause of an to get the error that Kafka returned.
- Creating Topics in the Cluster
(Collections.singletonList(new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REPLICATION_FACTOR)));
- If the topic does not exist, we create a new topic. When creating a topic, you can specify just the name and use default values for all the details. You can also specify the number of partitions, the number of replicas, and configuration.
- Finally, you want to wait for topic creation to return and perhaps validate the result. Checking the result is more common if you relied on broker defaults when creating the topic.
- If some topic already exists with the same name or if we call some method to check the results of , exception can be thrown. It can be handled by describing the topic to check for correct configuration.
- Deleting Topics in the Cluster
- We call the method with a list of topic names to delete, and we use to wait for this to complete.
- In Kafka, deletion of topics is final — there is no “recyclebin” or “trashcan” to help you rescue the deleted topic and no checks to validate that the topic is empty and that you really meant to delete it. Deleting the wrong topic could mean an un-recoverable loss of data — so handle this method with extra care.
Important Links for Reference
Conclusion
- Admin Client is useful when you want to execute some administrative commands from within your client application rather than using CLI and GUI tools to manage Kafka.
- Creating new topics on demand based on user input or data is an especially common use case.
- IOT apps often receive events from user devices, and write events to topics based on the device type. If the manufacturer produces a new type of device, you either have to remember, via some process, to also create a topic. Or alternatively, the application can dynamically create a new topic if it receives events with an unrecognized device type.
- Automatic topic creation using AdminClient has downsides but avoiding the dependency on additional process to generate topics is an attractive feature in the right scenarios.
References
Best Java code snippets using org.apache.kafka.clients.admin.AdminClient(Showing top 20 results out of 513)
- Common ways to obtain AdminClient
private void myMethod () {
}
- Wisconsin ragdoll breeders
- Action point 2018 torrent
- Worst fallout 4 mods
- Virginia people search
- Best 9mm revolver
Best Java code snippets using org.apache.kafka.clients.admin.AdminClient.createTopics(Showing top 20 results out of 315)
- Common ways to obtain AdminClient
private void myMethod () {
}
Java Code Examples for org.apache.kafka.clients.admin.AdminClient
The following examples show how to use org.apache.kafka.clients.admin.AdminClient. These examples are extracted from open source projects. You can vote up the ones you like or vote down the ones you don't like, and go to the original project or source file by following the links above each example. You may check out the related API usage on the sidebar.
Example 1
Example 2
Example 3
Example 4
Example 5
Example 6
Example 7
Example 8
Example 9
Example 10
Example 11
Example 12
Example 13
Example 14
Example 15
Example 16
Example 17
Example 18
Example 19
Example 20
Example 21
Example 22
Example 23
Example 24
Example 25
Example 26
Example 27
Example 28
Example 29
Example 30
Client example admin kafka
Best Java code snippets using org.apache.kafka.clients.admin.AdminClient.describeTopics(Showing top 20 results out of 315)
- Common ways to obtain AdminClient
private void myMethod () {
}
Kafka - Introduction to Kafka Admin API
The Admin API supports managing and inspecting topics, brokers, acls, and other Kafka objects.
In this tutorial we will see getting started examples of how to use Kafka Admin API.
Start Kafka server as describe here.
How to list Kafka configuration?
package com.logicbig.example; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.common.Node; import org.apache.kafka.common.config.ConfigResource; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException; public class ListingConfigs { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties config = new Properties(); config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); AdminClient admin = AdminClient.create(config); for (Node node : admin.describeCluster().nodes().get()) { System.out.println("-- node: " + node.id() + " --"); ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, "0"); DescribeConfigsResult dcr = admin.describeConfigs(Collections.singleton(cr)); dcr.all().get().forEach((k, c) -> { c.entries() .forEach(configEntry -> {System.out.println(configEntry.name() + "= " + configEntry.value());}); }); } } } -- node: 0 --advertised.host.name= null
log.cleaner.min.compaction.lag.ms= 0
metric.reporters=
quota.producer.default= 9223372036854775807
offsets.topic.num.partitions= 50
log.flush.interval.messages= 9223372036854775807
controller.socket.timeout.ms= 30000
auto.create.topics.enable= true
log.flush.interval.ms= null
principal.builder.class= null
replica.socket.receive.buffer.bytes= 65536
min.insync.replicas= 1
replica.fetch.wait.max.ms= 500
num.recovery.threads.per.data.dir= 1
ssl.keystore.type= JKS
password.encoder.iterations= 4096
sasl.mechanism.inter.broker.protocol= GSSAPI
default.replication.factor= 1
ssl.truststore.password= null
log.preallocate= false
sasl.kerberos.principal.to.local.rules= DEFAULT
fetch.purgatory.purge.interval.requests= 1000
ssl.endpoint.identification.algorithm= https
replica.socket.timeout.ms= 30000
message.max.bytes= 1000012
transactional.id.expiration.ms= 604800000
transaction.state.log.replication.factor= 1
control.plane.listener.name= null
num.io.threads= 8
sasl.login.refresh.buffer.seconds= 300
connections.max.reauth.ms= 0
connection.failed.authentication.delay.ms= 100
offsets.commit.required.acks= -1
log.flush.offset.checkpoint.interval.ms= 60000
delete.topic.enable= true
quota.window.size.seconds= 1
ssl.truststore.type= JKS
offsets.commit.timeout.ms= 5000
quota.window.num= 11
zookeeper.connect= localhost:2181
authorizer.class.name=
password.encoder.secret= null
log.cleaner.max.compaction.lag.ms= 9223372036854775807
num.replica.fetchers= 1
alter.log.dirs.replication.quota.window.size.seconds= 1
log.retention.ms= null
alter.log.dirs.replication.quota.window.num= 11
log.roll.jitter.hours= 0
password.encoder.old.secret= null
log.cleaner.enable= true
offsets.load.buffer.size= 5242880
log.cleaner.delete.retention.ms= 86400000
ssl.client.auth= none
controlled.shutdown.max.retries= 3
offsets.topic.replication.factor= 1
queued.max.requests= 500
transaction.state.log.min.isr= 1
log.cleaner.threads= 1
ssl.secure.random.implementation= null
sasl.kerberos.service.name= null
sasl.kerberos.ticket.renew.jitter= 0.05
socket.request.max.bytes= 104857600
ssl.trustmanager.algorithm= PKIX
zookeeper.session.timeout.ms= 6000
log.retention.bytes= -1
sasl.jaas.config= null
log.message.timestamp.type= CreateTime
sasl.kerberos.min.time.before.relogin= 60000
zookeeper.set.acl= false
connections.max.idle.ms= 600000
offsets.retention.minutes= 10080
max.connections= 2147483647
delegation.token.expiry.time.ms= 86400000
transaction.state.log.num.partitions= 50
replica.fetch.backoff.ms= 1000
inter.broker.protocol.version= 2.4-IV1
kafka.metrics.reporters=
listener.security.protocol.map= PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.retention.hours= 168
num.partitions= 1
client.quota.callback.class= null
broker.id.generation.enable= true
listeners= null
ssl.provider= null
ssl.enabled.protocols= TLSv1.2,TLSv1.1,TLSv1
inter.broker.listener.name= null
delete.records.purgatory.purge.interval.requests= 1
log.roll.ms= null
alter.config.policy.class.name= null
delegation.token.expiry.check.interval.ms= 3600000
ssl.cipher.suites=
zookeeper.max.in.flight.requests= 10
log.flush.scheduler.interval.ms= 9223372036854775807
log.index.size.max.bytes= 10485760
ssl.keymanager.algorithm= SunX509
sasl.login.callback.handler.class= null
security.inter.broker.protocol= PLAINTEXT
replica.fetch.max.bytes= 1048576
sasl.server.callback.handler.class= null
advertised.port= null
log.cleaner.dedupe.buffer.size= 134217728
replica.high.watermark.checkpoint.interval.ms= 5000
replication.quota.window.size.seconds= 1
log.cleaner.io.buffer.size= 524288
sasl.kerberos.ticket.renew.window.factor= 0.8
create.topic.policy.class.name= null
zookeeper.connection.timeout.ms= 6000
metrics.recording.level= INFO
password.encoder.cipher.algorithm= AES/CBC/PKCS5Padding
controlled.shutdown.retry.backoff.ms= 5000
security.providers= null
log.roll.hours= 168
log.cleanup.policy= delete
log.flush.start.offset.checkpoint.interval.ms= 60000
ssl.principal.mapping.rules= DEFAULT
host.name=
replica.selector.class= null
log.roll.jitter.ms= null
transaction.state.log.segment.bytes= 104857600
max.connections.per.ip= 2147483647
offsets.topic.segment.bytes= 104857600
background.threads= 10
quota.consumer.default= 9223372036854775807
request.timeout.ms= 30000
group.initial.rebalance.delay.ms= 0
log.message.format.version= 2.4-IV1
sasl.login.class= null
log.index.interval.bytes= 4096
log.dir= /tmp/kafka-logs
log.segment.bytes= 1073741824
log.cleaner.backoff.ms= 15000
offset.metadata.max.bytes= 4096
ssl.truststore.location= null
replica.fetch.response.max.bytes= 10485760
group.max.session.timeout.ms= 1800000
ssl.keystore.password= null
port= 9092
zookeeper.sync.time.ms= 2000
log.retention.minutes= null
log.segment.delete.delay.ms= 60000
log.dirs= /tmp/kafka-logs
controlled.shutdown.enable= true
compression.type= producer
max.connections.per.ip.overrides=
log.message.timestamp.difference.max.ms= 9223372036854775807
sasl.login.refresh.min.period.seconds= 60
password.encoder.key.length= 128
sasl.login.refresh.window.factor= 0.8
kafka.metrics.polling.interval.secs= 10
transaction.abort.timed.out.transaction.cleanup.interval.ms= 60000
sasl.kerberos.kinit.cmd= /usr/bin/kinit
log.cleaner.io.max.bytes.per.second= 1.7976931348623157E308
auto.leader.rebalance.enable= true
leader.imbalance.check.interval.seconds= 300
log.cleaner.min.cleanable.ratio= 0.5
replica.lag.time.max.ms= 10000
max.incremental.fetch.session.cache.slots= 1000
delegation.token.master.key= null
num.network.threads= 3
ssl.key.password= null
reserved.broker.max.id= 1000
sasl.client.callback.handler.class= null
metrics.num.samples= 2
transaction.remove.expired.transaction.cleanup.interval.ms= 3600000
socket.send.buffer.bytes= 102400
log.message.downconversion.enable= true
ssl.protocol= TLS
password.encoder.keyfactory.algorithm= null
transaction.state.log.load.buffer.size= 5242880
socket.receive.buffer.bytes= 102400
ssl.keystore.location= null
replica.fetch.min.bytes= 1
broker.rack= null
unclean.leader.election.enable= false
num.replica.alter.log.dirs.threads= null
sasl.enabled.mechanisms= GSSAPI
group.min.session.timeout.ms= 6000
offsets.retention.check.interval.ms= 600000
log.cleaner.io.buffer.load.factor= 0.9
transaction.max.timeout.ms= 900000
producer.purgatory.purge.interval.requests= 1000
metrics.sample.window.ms= 30000
group.max.size= 2147483647
broker.id= 0
offsets.topic.compression.codec= 0
delegation.token.max.lifetime.ms= 604800000
replication.quota.window.num= 11
log.retention.check.interval.ms= 300000
advertised.listeners= null
leader.imbalance.per.broker.percentage= 10
sasl.login.refresh.window.jitter= 0.05
queued.max.request.bytes= -1
You will also like:
- Sterilite shallow closet drawer
- Name meaning generator
- Wikipedia planned parenthood
- Ebay boat seats auction
- Pobre ana capitulo 1
- 95 inch tv
- Quality custom distribution
- 10 22 magazines
- Supercharger for mitsubishi eclipse
- Narrow rectangular planter box
- West virginia census 1940
- Layered perfume first kiss
- Us consulate india