Kafka admin client example

Kafka admin client example DEFAULT

Managing Apache Kafka Programmatically

Clairvoyant carries vast experience in Big data and Cloud technologies. We aim to explore the core concepts of Apache Kafka and other Big Data technologies to provide the best-optimized solutions to clients. With the introduction of AdminClient feature in Kafka, we have utilized this alternative for essential topic management to provide better solutions to clients to manage Kafka programmatically.

Introduction

Apache Kafka added the AdminClient in version 0.11 to provide a programmatic API for administrative functionality that was previously done in the command line: Listing, Creating, and Deleting topics, describing the cluster, managing ACLs, and modifying configurations.

Here’s one example: Your application is going to produce events to a specific topic. This means that before producing the first event, the topic has to exist. Before Apache Kafka added the admin client, there were few options, and none of them particularly user-friendly:

  1. You could capture exceptions from the method and let your user know that they need to create the topic
  2. You could hope that the Kafka cluster you are writing to enabled automatic topic creation
  3. You can try to rely on internal APIs and deal with the consequences of no compatibility guarantees

Now that Apache Kafka provides AdminClient, there is a much better solution: Use AdminClient to check whether the topic exists, and if it does not, create it on the spot.

AdminClient Lifecycle: Creating, Configuring, and Closing

In order to use Kafka’s AdminClient, the first thing you have to do is construct an instance of the AdminClient class. This is quite straightforward:

Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(properties);
adminClient.close(Duration.ofSeconds(30));

The static method takes a object argument with configuration. The only mandatory configuration is the URI for your cluster (a comma-separated list of brokers to connect to).

If you start an AdminClient, eventually you want to close it. It is important to remember that when you call , there could still be some AdminClient operations in progress. Therefore the method accepts a timeout parameter. Once you call , you can’t call any other methods and send any more requests, but the client will wait for responses until the timeout expires. After the timeout expires, the client will abort all ongoing operations with timeout exception and release all resources. Calling without a timeout implies that you’ll wait as long as it takes for all ongoing operations to complete.

Essential Topic Management

Now that we created and configured an AdminClient, it is time to see what we can do with it. The most common use case for Kafka’s AdminClient is topic management. This includes listing topics, describing them, creating topics, and deleting them.

  • Listing all Topics in the Cluster
ListTopicsResult topics = adminClient.listTopics();
topics.names().get().forEach(System.out::println);

Note that returns a object which is a thin wrapper over a collection of . returns a future set of names. When we call on this future, the executing thread will wait until the server responds with a set of topic names, or we get a timeout exception. Once we get the list, we iterate over it to print all the topic names.

  • Describing Topics in the Cluster
DescribeTopicsResult demoTopic=adminClient.describeTopics(TOPIC_LIST);TopicDescription topicDescription = demoTopic.values().get(TOPIC_NAME).get();
  1. To check that the topic exists with the correct configuration, we call with a list of topic names that we want to validate. This returns a object, which wraps a map of topic names to future descriptions.
  2. If the topic does exist, the future completes by returning a object, which contains a list of all the partitions of the topic and for each partition which broker is the leader, a list of replicas, and a list of in-sync replicas. Note: this does not include the configuration of the topic.
  3. If the topic does not exist, the server can’t respond with its description. In this case, the server will send an error and the future will complete by throwing an . The actual error sent by the server will be the of the exception. Since we want to handle the case where the topic doesn’t exist, we handle these exceptions by creating a topic on the spot.
  4. Note that all AdminClient result objects throw when Kafka responds with an error. This is because AdminClient results are wrapped Future objects and those wrap exceptions. You always need to examine the cause of an to get the error that Kafka returned.
  • Creating Topics in the Cluster
CreateTopicsResult newTopic = adminClient.createTopics
(Collections.singletonList(new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REPLICATION_FACTOR)));
  1. If the topic does not exist, we create a new topic. When creating a topic, you can specify just the name and use default values for all the details. You can also specify the number of partitions, the number of replicas, and configuration.
  2. Finally, you want to wait for topic creation to return and perhaps validate the result. Checking the result is more common if you relied on broker defaults when creating the topic.
  3. If some topic already exists with the same name or if we call some method to check the results of , exception can be thrown. It can be handled by describing the topic to check for correct configuration.
  • Deleting Topics in the Cluster
admin.deleteTopics(TOPIC_LIST).all().get();
  1. We call the method with a list of topic names to delete, and we use to wait for this to complete.
  2. In Kafka, deletion of topics is final — there is no “recyclebin” or “trashcan” to help you rescue the deleted topic and no checks to validate that the topic is empty and that you really meant to delete it. Deleting the wrong topic could mean an un-recoverable loss of data — so handle this method with extra care.

Important Links for Reference

Conclusion

  • Admin Client is useful when you want to execute some administrative commands from within your client application rather than using CLI and GUI tools to manage Kafka.
  • Creating new topics on demand based on user input or data is an especially common use case.
  • IOT apps often receive events from user devices, and write events to topics based on the device type. If the manufacturer produces a new type of device, you either have to remember, via some process, to also create a topic. Or alternatively, the application can dynamically create a new topic if it receives events with an unrecognized device type.
  • Automatic topic creation using AdminClient has downsides but avoiding the dependency on additional process to generate topics is an attractive feature in the right scenarios.

References

Sours: https://blog.clairvoyantsoft.com/managing-apache-kafka-programmatically-ee5423aa0730

Best Java code snippets using org.apache.kafka.clients.admin.AdminClient(Showing top 20 results out of 513)

  • Common ways to obtain AdminClient

    private void myMethod () {

    }

    @Override public AdminClient getAdminClient(Map<String, Object> config) { return AdminClient.create(config); }
    void createTopic(final String topic, finalint partitions, finalint replication, final Map<String, String> topicConfig) { log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", topic, partitions, replication, topicConfig); final ImmutableMap<String, Object> props = ImmutableMap.of( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList(), AdminClientConfig.RETRIES_CONFIG, 5); try (AdminClient adminClient = AdminClient.create(props)) { final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication); newTopic.configs(topicConfig); try { final CreateTopicsResult result = adminClient.createTopics(ImmutableList.of(newTopic)); result.all().get(); } catch (final Exception e) { thrownew RuntimeException("Failed to create topic:" + topic, e); } } }
    @Override publicvoid close() { close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); }
    private Config getKafkaBrokerConfig(AdminClient admin) throws Exception { final Collection<Node> nodes = admin.describeCluster().nodes().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); if (nodes.isEmpty()) { thrownew ConnectException("No brokers available to obtain default settings"); } String nodeId = nodes.iterator().next().idString(); Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, nodeId)); final Map<ConfigResource, Config> configs = admin.describeConfigs(resources).all().get( KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS ); if (configs.isEmpty()) { thrownew ConnectException("No configs have been received"); } return configs.values().iterator().next(); } }
    privatevoid tryDelete(AdminClient adminClient, String topic) throws Exception { try { adminClient.deleteTopics(Collections.singleton(topic)).all().get(DELETE_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (TimeoutException e) { LOG.info("Did not receive delete topic response within %d seconds. Checking if it succeeded", DELETE_TIMEOUT_SECONDS); if (adminClient.listTopics().names().get(DELETE_TIMEOUT_SECONDS, TimeUnit.SECONDS).contains(topic)) { thrownew Exception("Topic still exists after timeout"); } } }
    privatevoid waitForTopics(String bootstrapServers) throws Exception { while (true) { TimeUnit.SECONDS.sleep(5); Properties properties = new Properties(); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); AdminClient adminClient = AdminClient.create(properties); if (adminClient.listTopics().names().get().containsAll(TOPICS)) { return; } System.out.println("Waiting for data"); } }
    publicstaticvoid createTopic(String topic, int numPartitions, short replicationFactor, Properties props) { NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor); AdminClient adminClient = AdminClient.create(props); adminClient.createTopics(Collections.singletonList(newTopic)); adminClient.close(); } }
    public CreateTopicsResult createTopics(Collection<NewTopic> newTopics) { returncreateTopics(newTopics, new CreateTopicsOptions()); }
    publicvoid doWithAdmin(java.util.function.Consumer<AdminClient> callback) { Map<String, Object> adminConfigs = new HashMap<>(); adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString()); AdminClient admin = null; try { admin = AdminClient.create(adminConfigs); callback.accept(admin); } finally { if (admin != null) { admin.close(this.adminTimeout, TimeUnit.SECONDS); } } }
    @Test publicvoid testAdminClientWithInvalidCredentials() { Map<String, Object> props = new HashMap<>(saslClientConfigs); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); try (AdminClient client = AdminClient.create(props)) { DescribeTopicsResult result = client.describeTopics(Collections.singleton("test")); result.all().get(); fail("Expected an authentication error!"); } catch (Exception e) { assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(), e.getCause() instanceof SaslAuthenticationException); } }
    @Test publicvoid testAddTopics() throws Exception { AdminClient adminClient = AdminClient.create(this.admin.getConfig()); DescribeTopicsResult topics = adminClient.describeTopics(Arrays.asList("foo", "bar")); topics.all().get(); new DirectFieldAccessor(this.topic1).setPropertyValue("numPartitions", 2); new DirectFieldAccessor(this.topic2).setPropertyValue("numPartitions", 3); this.admin.initialize(); topics = adminClient.describeTopics(Arrays.asList("foo", "bar")); Map<String, TopicDescription> results = topics.all().get(); results.forEach((name, td) -> assertThat(td.partitions()).hasSize(name.equals("foo") ? 2 : 3)); adminClient.close(10, TimeUnit.SECONDS); }
    @SuppressWarnings("TryFinallyCanBeTryWithResources") publicstaticboolean kafkaDetected() { AdminClient client = AdminClient.create(getDefaultAdminProperties()); try { client.describeCluster().nodes().get(5, TimeUnit.SECONDS); returntrue; } catch (InterruptedException e) { Thread.currentThread().interrupt(); thrownew StreamRuntimeException(e); } catch (ExecutionException e) { thrownew StreamRuntimeException(e); } catch (TimeoutException e) { returnfalse; } finally { client.close(0, TimeUnit.SECONDS); } }
    @Test publicvoid testInvalidTopicNames() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); List<String> sillyTopicNames = asList("", null); Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values(); for (String sillyTopicName : sillyTopicNames) { TestUtils.assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); Map<String, KafkaFuture<TopicDescription>> describeFutures = env.adminClient().describeTopics(sillyTopicNames).values(); for (String sillyTopicName : sillyTopicNames) { TestUtils.assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); List<NewTopic> newTopics = new ArrayList<>(); for (String sillyTopicName : sillyTopicNames) { newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1)); } Map<String, KafkaFuture<Void>> createFutures = env.adminClient().createTopics(newTopics).values(); for (String sillyTopicName : sillyTopicNames) { TestUtils.assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); } }
    publicvoid createTopics(List<String> topicNames, int numPartitions) { List<NewTopic> newTopics = new ArrayList<>(); for (String topicName: topicNames) { NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1); newTopics.add(newTopic); } getAdminClient().createTopics(newTopics); DescribeTopicsResult dtr = getAdminClient().describeTopics(topicNames); try { dtr.all().get(10, TimeUnit.SECONDS); } catch (Exception e) { thrownew RuntimeException("Error getting topic info", e); } } publicvoid deleteTopic(String topicName) {
    public DescribeTopicsResult describeTopics(Collection<String> topicNames) { returndescribeTopics(topicNames, new DescribeTopicsOptions()); }
    @Override public Mono<Health> health() { Health.Builder builder = new Health.Builder(); Properties properties = new Properties(); properties.put("bootstrap.servers", props.getBootstrapServers()); try (AdminClient adminClient = AdminClient.create(properties)) { DescribeClusterResult result = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(3000)); builder.withDetail("clusterId", result.clusterId().get()); builder.up(); } catch (Exception e) { builder.down(); } return Mono.just(builder.build()); } }
    public ListTopicsResult listTopics() { returnlistTopics(new ListTopicsOptions()); }
    privatevoid callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException { try { env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(10000)).all().get(); counts.put("my_topic", NewPartitions.increaseTo(3)); counts.put("other_topic", NewPartitions.increaseTo(3, asList(asList(2), asList(3)))); env.adminClient().createPartitions(counts).all().get(); fail("Expected an authentication error."); } catch (ExecutionException e) { env.adminClient().createAcls(asList(ACL1, ACL2)).all().get(); fail("Expected an authentication error."); } catch (ExecutionException e) { env.adminClient().describeAcls(FILTER1).values().get(); fail("Expected an authentication error."); } catch (ExecutionException e) { env.adminClient().deleteAcls(asList(FILTER1, FILTER2)).all().get(); fail("Expected an authentication error."); } catch (ExecutionException e) { env.adminClient().describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, "0"))).all().get(); fail("Expected an authentication error."); } catch (ExecutionException e) {
    @Override boolean runInternal(StopWatch stopWatch) throws InterruptedException, ExecutionException { stopWatch.start("adminClient.listTopics()"); Collection<String> topicNames = adminClient.listTopics().listings().get() .stream().map(TopicListing::name).filter(this::shouldCollectEvent).collect(Collectors.toList()); topicsMap.removeAll(new RemoveTopicPredicate(topicNames)); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicNames); describeTopicsResult.all().get().forEach( (topic, topicDescription) -> topicsMap.executeOnKey(topic, new SetTopicPartitionsProcessor( topicDescription.partitions().stream().map(TopicPartitionInfo::partition).collect(Collectors.toList())) ) ); metaMap.set(this.getName() + TopicServiceScheduler.LAST_SUCCESS_PREFIX, System.currentTimeMillis()); log.debug("Topics:" + topicsMap.entrySet()); log.debug(stopWatch.prettyPrint()); returntrue; }
    public DescribeClusterResult describeCluster() { returndescribeCluster(new DescribeClusterOptions()); }
    Sours: https://www.tabnine.com/code/java/classes/org.apache.kafka.clients.admin.AdminClient
    1. Wisconsin ragdoll breeders
    2. Action point 2018 torrent
    3. Worst fallout 4 mods
    4. Virginia people search
    5. Best 9mm revolver

    Best Java code snippets using org.apache.kafka.clients.admin.AdminClient.createTopics(Showing top 20 results out of 315)

    • Common ways to obtain AdminClient

      private void myMethod () {

      }

      public CreateTopicsResult createTopics(Collection<NewTopic> newTopics) { returncreateTopics(newTopics, new CreateTopicsOptions()); }
      void createTopic(final String topic, finalint partitions, finalint replication, final Map<String, String> topicConfig) { log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", topic, partitions, replication, topicConfig); final ImmutableMap<String, Object> props = ImmutableMap.of( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList(), AdminClientConfig.RETRIES_CONFIG, 5); try (AdminClient adminClient = AdminClient.create(props)) { final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication); newTopic.configs(topicConfig); try { final CreateTopicsResult result = adminClient.createTopics(ImmutableList.of(newTopic)); result.all().get(); } catch (final Exception e) { thrownew RuntimeException("Failed to create topic:" + topic, e); } } }
      publicvoid createTopic(String topicName) throws Exception { kafkaAdminClient.createTopics(Collections.singleton(new NewTopic(topicName, 1, (short)1))) .all() .get(30, TimeUnit.SECONDS); }
      @Override publicvoid createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties properties) { LOG.info("Creating topic {}", topic); try (AdminClient adminClient = AdminClient.create(getStandardProperties())) { NewTopic topicObj = new NewTopic(topic, numberOfPartitions, (short) replicationFactor); adminClient.createTopics(Collections.singleton(topicObj)).all().get(); } catch (Exception e) { e.printStackTrace(); fail("Create test topic : " + topic + " failed, " + e.getMessage()); } }
      @Override publicvoid initializeStorage() { super.initializeStorage(); try (AdminClient admin = AdminClient.create(this.producerConfig.asProperties())) { Config brokerConfig = getKafkaBrokerConfig(admin); finalshort replicationFactor = Short.parseShort(brokerConfig.get(DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME).value()); final NewTopic topic = new NewTopic(topicName, (short)1, replicationFactor); topic.configs(Collect.hashMapOf("cleanup.policy", "delete", "retention.ms", Long.toString(Long.MAX_VALUE), "retention.bytes", "-1")); admin.createTopics(Collections.singleton(topic)); logger.info("Database history topic '{}' created", topic); } catch (Exception e) { thrownew ConnectException("Creation of database history topic failed, please create the topic manually", e); } }
      publicvoid createTopics(List<String> topicNames, int numPartitions) { List<NewTopic> newTopics = new ArrayList<>(); for (String topicName: topicNames) { NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1); newTopics.add(newTopic); } getAdminClient().createTopics(newTopics); DescribeTopicsResult dtr = getAdminClient().describeTopics(topicNames); try { dtr.all().get(10, TimeUnit.SECONDS); } catch (Exception e) { thrownew RuntimeException("Error getting topic info", e); } } publicvoid deleteTopic(String topicName) {
      @Test publicvoid testCreateTopics() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, "")))); KafkaFuture<Void> future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(10000)).all(); future.get(); } }
      @Test publicvoid testTimeoutWithoutMetadata() throws Exception { try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, mockBootstrapCluster(), newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, "")))); KafkaFuture<Void> future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(1000)).all(); TestUtils.assertFutureError(future, TimeoutException.class); } }
      }, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, "")))); KafkaFuture<Void> future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(10000)).all();
      @Test publicvoid testUnreachableBootstrapServer() throws Exception { Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121))); try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) { Cluster discoveredCluster = mockCluster(0); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().setUnreachable(cluster.nodes().get(0), 200); env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest, new MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(), 1, Collections.emptyList())); env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, "")))); KafkaFuture<Void> future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(10000)).all(); future.get(); } }
      @Test publicvoid testPropagatedMetadataFetchException() throws Exception { Cluster cluster = mockCluster(0); try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster, newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121", AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().createPendingAuthenticationError(cluster.nodeById(0), TimeUnit.DAYS.toMillis(1)); env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, "")))); KafkaFuture<Void> future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(1000)).all(); TestUtils.assertFutureError(future, SaslAuthenticationException.class); } }
      privatevoid callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException { try { env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(10000)).all().get();
      private Map<String, TopicDescription> createTopics() throws InterruptedException { AdminClient adminClient = getAdminClient(broker(0).getPlaintextAddr()); adminClient.createTopics(Arrays.asList(new NewTopic(TOPIC0, 1, (short) 1), new NewTopic(TOPIC1, 1, (short) 2))); Map<String, TopicDescription> topicDescriptions0 = null; Map<String, TopicDescription> topicDescriptions1 = null; do { try (AdminClient adminClient0 = getAdminClient(broker(0).getPlaintextAddr()); AdminClient adminClient1 = getAdminClient(broker(1).getPlaintextAddr())) { topicDescriptions0 = adminClient0.describeTopics(Arrays.asList(TOPIC0, TOPIC1)).all().get(); topicDescriptions1 = adminClient1.describeTopics(Arrays.asList(TOPIC0, TOPIC1)).all().get(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } catch (ExecutionException ee) { } } while (topicDescriptions0 == null || topicDescriptions0.size() < 2 || topicDescriptions1 == null || topicDescriptions1.size() < 2); return topicDescriptions0; }
      @Test publicvoid testConnectionFailureOnMetadataUpdate() throws Exception { Cluster cluster = mockBootstrapCluster(); try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) { Cluster discoveredCluster = mockCluster(0); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest, null, true); env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest, new MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(), 1, Collections.emptyList())); env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, "")))); KafkaFuture<Void> future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(10000)).all(); future.get(); } }
      @Test publicvoid testInvalidTopicNames() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); List<String> sillyTopicNames = asList("", null); Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values(); for (String sillyTopicName : sillyTopicNames) { TestUtils.assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); Map<String, KafkaFuture<TopicDescription>> describeFutures = env.adminClient().describeTopics(sillyTopicNames).values(); for (String sillyTopicName : sillyTopicNames) { TestUtils.assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); List<NewTopic> newTopics = new ArrayList<>(); for (String sillyTopicName : sillyTopicNames) { newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1)); } Map<String, KafkaFuture<Void>> createFutures = env.adminClient().createTopics(newTopics).values(); for (String sillyTopicName : sillyTopicNames) { TestUtils.assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); } }
      @Test publicvoid testCreateTopicsHandleNotControllerException() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse( Collections.singletonMap("myTopic", new ApiError(Errors.NOT_CONTROLLER, ""))), env.cluster().nodeById(0)); env.kafkaClient().prepareResponse(new MetadataResponse(env.cluster().nodes(), env.cluster().clusterResource().clusterId(), 1, Collections.<MetadataResponse.TopicMetadata>emptyList())); env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse( Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))), env.cluster().nodeById(1)); KafkaFuture<Void> future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(10000)).all(); future.get(); } }
      privatevoid addTopics(AdminClient adminClient, List<NewTopic> topicsToAdd) { CreateTopicsResult topicResults = adminClient.createTopics(topicsToAdd); try { topicResults.all().get(this.operationTimeout, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Interrupted while waiting for topic creation results", e); } catch (TimeoutException e) { thrownew KafkaException("Timed out waiting for create topics results", e); } catch (ExecutionException e) { if (e.getCause() instanceof TopicExistsException) { logger.debug("Failed to create topics", e.getCause()); } else { logger.error("Failed to create topics", e.getCause()); thrownew KafkaException("Failed to create topics", e.getCause()); } } }
      privatevoid createTopics(AdminClient admin, List<NewTopic> newTopics) { CreateTopicsResult createTopics = admin.createTopics(newTopics); try { createTopics.all().get(this.adminTimeout, TimeUnit.SECONDS); } catch (Exception e) { thrownew KafkaException(e); } }
      publicvoid createTopics(String... topics) throws ExecutionException, InterruptedException { Map<String, Object> adminConfigs = new HashMap<>(); adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, connectionString()); try(AdminClient admin = AdminClient.create(adminConfigs)) { List<NewTopic> newTopics = Stream.of(topics) .map(t -> new NewTopic(t, numBroker, (short) numBroker)) .collect(Collectors.toList()); CreateTopicsResult createTopics = admin.createTopics(newTopics); createTopics.all().get(); } }
      @Override publicvoid createTopic(Topic topic, Handler<AsyncResult<Void>> handler) { NewTopic newTopic = TopicSerialization.toNewTopic(topic, null); LOGGER.debug("Creating topic {}", newTopic); KafkaFuture<Void> future = adminClient.createTopics( Collections.singleton(newTopic)).values().get(newTopic.name()); queueWork(new UniWork<>("createTopic", future, handler)); }
      Sours: https://www.tabnine.com/code/java/methods/org.apache.kafka.clients.admin.AdminClient/createTopics
      Apache Kafka in 5 minutes

      Java Code Examples for org.apache.kafka.clients.admin.AdminClient

      The following examples show how to use org.apache.kafka.clients.admin.AdminClient. These examples are extracted from open source projects. You can vote up the ones you like or vote down the ones you don't like, and go to the original project or source file by following the links above each example. You may check out the related API usage on the sidebar.

      Example 1

      private void createTopics(String bootstrapServers) { Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers); properties.put("connections.max.idle.ms", 10000); properties.put("request.timeout.ms", 5000); try (AdminClient client = AdminClient.create(properties)) { CreateTopicsResult result = client.createTopics(Arrays.asList( new NewTopic("storage-topic", 1, (short) 1), new NewTopic("global-id-topic", 1, (short) 1), new NewTopic("snapshot-topic", 1, (short) 1) )); try { result.all().get(); } catch ( InterruptedException | ExecutionException e ) { throw new IllegalStateException(e); } } }

      Example 2

      public DefaultCollector() throws ConfigFileNotFoundException { CollectorConfig config = CollectorConfig.INSTANCE(); config.init(); // init meta client metaClient = new MetaClient(config.getMetaServerHost(), config.getMetaServerPort()); // init kafka admin client Properties properties = new Properties(); properties.setProperty("bootstrap.servers", config.getKafkaBootstrapServers()); properties.setProperty("client.id", "producerAdmin"); properties.setProperty("metadata.max.age.ms", "3000"); kafkaAdminClient = AdminClient.create(properties); this.collectorRuntime = new CollectorRuntime(config); init(); }

      Example 3

      @SuppressWarnings({"rawtypes", "unchecked"}) private MiniKafkaCluster(List<String> brokerIds) throws IOException, InterruptedException { this.zkServer = new EmbeddedZooKeeper(); this.tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "mini-kafka-cluster"); this.kafkaServer = new ArrayList<>(); int port = 0; for (String id : brokerIds) { port = getAvailablePort(); KafkaConfig c = new KafkaConfig(createBrokerConfig(id, port)); Seq seq = scala.collection.JavaConverters.collectionAsScalaIterableConverter(Collections.emptyList()).asScala().toSeq(); kafkaServer.add(new KafkaServer(c, Time.SYSTEM, Option.empty(), seq)); } Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + port); adminClient = AdminClient.create(props); }

      Example 4

      /** * Simple smoke test to ensure broker running appropriate listeners. */ @Test void validateListener() throws ExecutionException, InterruptedException { try (final AdminClient adminClient = getKafkaTestUtils().getAdminClient()) { final ConfigResource broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1"); // Pull broker configs final Config configResult = adminClient .describeConfigs(Collections.singletonList(broker1Resource)) .values() .get(broker1Resource) .get(); // Check listener final String actualListener = configResult.get("listeners").value(); Assertions.assertTrue( actualListener.contains(getExpectedListenerProtocol() + "://"), "Expected " + getExpectedListenerProtocol() + ":// and found: " + actualListener); // Check inter broker protocol final String actualBrokerProtocol = configResult.get("security.inter.broker.protocol").value(); Assertions.assertEquals(getExpectedListenerProtocol(), actualBrokerProtocol, "Unexpected inter-broker protocol"); } }

      Example 5

      /** * Test that KafkaAdminFactory can create a working AdminClient when connecting to a non-ssl cluster. */ @Test public void testCreateNonSslAdminClient() throws ExecutionException, InterruptedException { // Create Cluster config final ClusterConfig clusterConfig = ClusterConfig.newBuilder() .withBrokerHosts(sharedKafkaTestResource.getKafkaConnectString()) .build(); final KafkaAdminFactory kafkaAdminFactory = new KafkaAdminFactory( new KafkaClientConfigUtil("not/used", "MyPrefix") ); // Create instance try (final AdminClient adminClient = kafkaAdminFactory.create(clusterConfig, "MyClientId")) { // Call method to validate things work as expected final DescribeClusterResult results = adminClient.describeCluster(); assertNotNull("Should have a non-null result", results); // Request future result final Collection<Node> nodes = results.nodes().get(); assertNotNull("Should have non-null node result", nodes); assertFalse("Should have non-empty node", nodes.isEmpty()); } }

      Example 6

      private void createTopics(Properties envProps) { Map<String, Object> config = new HashMap<>(); config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers")); AdminClient client = AdminClient.create(config); List<NewTopic> topics = new ArrayList<>(); topics.add(new NewTopic( envProps.getProperty("input.avro.movies.topic.name"), parseInt(envProps.getProperty("input.avro.movies.topic.partitions")), parseShort(envProps.getProperty("input.avro.movies.topic.replication.factor")))); topics.add(new NewTopic( envProps.getProperty("output.proto.movies.topic.name"), parseInt(envProps.getProperty("output.proto.movies.topic.partitions")), parseShort(envProps.getProperty("output.proto.movies.topic.replication.factor")))); client.createTopics(topics); client.close(); }

      Example 7

      private void createTopic(AdminClient adminClient, String name, int partitionCount, boolean tolerateLowerPartitionsOnBroker, KafkaTopicProperties properties) { try { createTopicIfNecessary(adminClient, name, partitionCount, tolerateLowerPartitionsOnBroker, properties); } // TODO: Remove catching Throwable. See this thread: // TODO: // https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/514#discussion_r241075940 catch (Throwable throwable) { if (throwable instanceof Error) { throw (Error) throwable; } else { // TODO: // https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/514#discussion_r241075940 throw new ProvisioningException("Provisioning exception", throwable); } } }

      Example 8

      public void createTopic(String name) throws Exception { try (AdminClient admin = AdminClient.create(getAdminConfig())) { int partitions = 3; int replication = this.replicationFactor; Matcher matcher = Pattern.compile("(-\\d+[p|r])").matcher(name); while (matcher.find()) { String group = matcher.group(); if (group.endsWith("p")) { partitions = getNumber(group); } else { replication = getNumber(group); } } NewTopic topic = new NewTopic(name, partitions, (short) replication); CreateTopicsResult topicsResult = admin.createTopics(singleton(topic)); topicsResult.all().get(5, TimeUnit.SECONDS); } catch (Exception e) { LOGGER.log(Level.SEVERE, "Error on create topics " + name, e); throw e; } }

      Example 9

      @Override public void deleteTestTopic(String topic) { LOG.info("Deleting topic {}", topic); Properties props = getSecureProperties(); props.putAll(getStandardProperties()); String clientId = Long.toString(new Random().nextLong()); props.put("client.id", clientId); AdminClient adminClient = AdminClient.create(props); // We do not use a try-catch clause here so we can apply a timeout to the admin client closure. try { tryDelete(adminClient, topic); } catch (Exception e) { e.printStackTrace(); fail(String.format("Delete test topic : %s failed, %s", topic, e.getMessage())); } finally { adminClient.close(Duration.ofMillis(5000L)); maybePrintDanglingThreadStacktrace(clientId); } }

      Example 10

      public void createTopics(Properties envProps) { Map<String, Object> config = new HashMap<>(); config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers")); AdminClient client = AdminClient.create(config); List<NewTopic> topics = new ArrayList<>(); Map<String, String> topicConfigs = new HashMap<>(); topicConfigs.put("retention.ms", Long.toString(Long.MAX_VALUE)); NewTopic ratings = new NewTopic(envProps.getProperty("rating.topic.name"), Integer.parseInt(envProps.getProperty("rating.topic.partitions")), Short.parseShort(envProps.getProperty("rating.topic.replication.factor"))); ratings.configs(topicConfigs); topics.add(ratings); NewTopic counts = new NewTopic(envProps.getProperty("rating.count.topic.name"), Integer.parseInt(envProps.getProperty("rating.count.topic.partitions")), Short.parseShort(envProps.getProperty("rating.count.topic.replication.factor"))); counts.configs(topicConfigs); topics.add(counts); client.createTopics(topics); client.close(); }

      Example 11

      public void createTopics(Properties envProps) { Map<String, Object> config = new HashMap<>(); config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers")); AdminClient client = AdminClient.create(config); List<NewTopic> topics = new ArrayList<>(); topics.add(new NewTopic( envProps.getProperty("input.topic.name"), Integer.parseInt(envProps.getProperty("input.topic.partitions")), Short.parseShort(envProps.getProperty("input.topic.replication.factor")))); topics.add(new NewTopic( envProps.getProperty("output.topic.name"), Integer.parseInt(envProps.getProperty("output.topic.partitions")), Short.parseShort(envProps.getProperty("output.topic.replication.factor")))); client.createTopics(topics); client.close(); }

      Example 12

      /** * Create topics using AdminClient API */ private void createTopics(Properties envProps) { Map<String, Object> config = new HashMap<>(); config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers")); AdminClient client = AdminClient.create(config); List<NewTopic> topics = new ArrayList<>(); topics.add(new NewTopic( envProps.getProperty("input.ratings.topic.name"), parseInt(envProps.getProperty("input.ratings.topic.partitions")), parseShort(envProps.getProperty("input.ratings.topic.replication.factor")))); topics.add(new NewTopic( envProps.getProperty("output.rating-averages.topic.name"), parseInt(envProps.getProperty("output.rating-averages.topic.partitions")), parseShort(envProps.getProperty("output.rating-averages.topic.replication.factor")))); client.createTopics(topics); client.close(); }

      Example 13

      public void createTopics(Properties envProps) { Map<String, Object> config = new HashMap<>(); config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers")); AdminClient client = AdminClient.create(config); List<NewTopic> topics = new ArrayList<>(); topics.add(new NewTopic( envProps.getProperty("input.rock.topic.name"), Integer.parseInt(envProps.getProperty("input.rock.topic.partitions")), Short.parseShort(envProps.getProperty("input.rock.topic.replication.factor")))); topics.add(new NewTopic( envProps.getProperty("input.classical.topic.name"), Integer.parseInt(envProps.getProperty("input.classical.topic.partitions")), Short.parseShort(envProps.getProperty("input.classical.topic.replication.factor")))); topics.add(new NewTopic( envProps.getProperty("output.topic.name"), Integer.parseInt(envProps.getProperty("output.topic.partitions")), Short.parseShort(envProps.getProperty("output.topic.replication.factor")))); client.createTopics(topics); client.close(); }

      Example 14

      public ClusterTopicManipulationService(String name, AdminClient adminClient) { LOGGER.info("ClusterTopicManipulationService constructor initiated {}", this.getClass().getName()); _isOngoingTopicCreationDone = true; _isOngoingTopicDeletionDone = true; _adminClient = adminClient; _executor = Executors.newSingleThreadScheduledExecutor(); _reportIntervalSecond = Duration.ofSeconds(1); _running = new AtomicBoolean(false); _configDefinedServiceName = name; // TODO: instantiate a new instance of ClusterTopicManipulationMetrics(..) here. MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS); List<MetricsReporter> reporters = new ArrayList<>(); reporters.add(new JmxReporter(Service.JMX_PREFIX)); Metrics metrics = new Metrics(metricConfig, reporters, new SystemTime()); Map<String, String> tags = new HashMap<>(); tags.put("name", name); _clusterTopicManipulationMetrics = new ClusterTopicManipulationMetrics(metrics, tags); }

      Example 15

      @Override public KafkaClientModel load(LoadingKey key) throws Exception { KafkaMediaSrcParameter kafkaMediaSrcParameter = key.mediaSourceInfo.getParameterObj(); PluginWriterParameter pluginWriterParameter = key.getPluginWriterParameter(); Map<String, String> mapparamters = kafkaMediaSrcParameter.getMapparamters(); String bootstrapServers = kafkaMediaSrcParameter.getBootstrapServers(); Map<String, Object> conf = new HashMap<>(8); conf.put("bootstrap.servers", bootstrapServers); conf.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); conf.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); conf.put("acks", "all"); conf.put("batch.size", pluginWriterParameter.getBatchSize()); conf.putAll(mapparamters); KafkaProducer producer = new KafkaProducer<>(conf); AdminClient client = AdminClient.create(conf); return new KafkaFactory.KafkaClientModel(producer, client); }

      Example 16

      public void createTopics(final Properties envProps) { final Map<String, Object> config = new HashMap<>(); config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers")); try (final AdminClient client = AdminClient.create(config)) { final List<NewTopic> topics = new ArrayList<>(); topics.add(new NewTopic( envProps.getProperty("input.topic.name"), Integer.parseInt(envProps.getProperty("input.topic.partitions")), Short.parseShort(envProps.getProperty("input.topic.replication.factor")))); topics.add(new NewTopic( envProps.getProperty("output.topic.name"), Integer.parseInt(envProps.getProperty("output.topic.partitions")), Short.parseShort(envProps.getProperty("output.topic.replication.factor")))); topics.add(new NewTopic( envProps.getProperty("special.order.topic.name"), Integer.parseInt(envProps.getProperty("special.order.topic.partitions")), Short.parseShort(envProps.getProperty("special.order.topic.replication.factor")))); client.createTopics(topics); } }

      Example 17

      @BeforeEach void setUp() throws ExecutionException, InterruptedException { testBucketAccessor.clear(gcsPrefix); final Properties adminClientConfig = new Properties(); adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); adminClient = AdminClient.create(adminClientConfig); final Map<String, Object> producerProps = new HashMap<>(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); producer = new KafkaProducer<>(producerProps); final NewTopic newTopic0 = new NewTopic(TEST_TOPIC_0, 4, (short) 1); final NewTopic newTopic1 = new NewTopic(TEST_TOPIC_1, 4, (short) 1); adminClient.createTopics(Arrays.asList(newTopic0, newTopic1)).all().get(); connectRunner = new ConnectRunner(pluginDir, kafka.getBootstrapServers(), OFFSET_FLUSH_INTERVAL_MS); connectRunner.start(); }

      Example 18

      private KafkaFactory.KafkaClientModel get(){ Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.104.156.83:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka' password='kafka';"); KafkaProducer<String, Byte[]> producer = new KafkaProducer<>(props); AdminClient client = AdminClient.create(props); KafkaFactory.KafkaClientModel kafkaClientModel = new KafkaFactory.KafkaClientModel(producer, client); return kafkaClientModel; }

      Example 19

      public static void createTopics(String brokers, String topicName) throws IOException { // Set properties used to configure admin client Properties properties = getProperties(brokers); try (final AdminClient adminClient = KafkaAdminClient.create(properties)) { int numPartitions = 8; short replicationFactor = (short)3; final NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor); final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic)); createTopicsResult.values().get(topicName).get(); System.out.print("Topic " + topicName + " created"); } catch (Exception e) { System.out.print("Create Topics denied\n"); System.out.print(e.getMessage()); //throw new RuntimeException(e.getMessage(), e); } }

      Example 20

      @Test(expected = TopicExistsException.class) public void testSameTopicCannotBeProvisionedAgain() throws Throwable { try (AdminClient admin = AdminClient.create( Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getEmbeddedKafka().getBrokersAsString()))) { admin.createTopics(Collections .singletonList(new NewTopic("fooUniqueTopic", 1, (short) 1))).all() .get(); try { admin.createTopics(Collections .singletonList(new NewTopic("fooUniqueTopic", 1, (short) 1))) .all().get(); } catch (Exception ex) { assertThat(ex.getCause() instanceof TopicExistsException).isTrue(); throw ex.getCause(); } } }

      Example 21

      @Before public void before() throws Exception { Map<String, Object> configMap = new HashMap<>(); configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); configMap.put("application.id", "KSQL"); configMap.put("commit.interval.ms", 0); configMap.put("cache.max.bytes.buffering", 0); configMap.put("auto.offset.reset", "earliest"); KsqlConfig ksqlConfig = new KsqlConfig(configMap); adminClient = AdminClient.create(ksqlConfig.getKsqlAdminClientConfigProps()); topicClient = new KafkaTopicClientImpl(adminClient); ksqlEngine = new KsqlEngine(ksqlConfig, topicClient); metaStore = ksqlEngine.getMetaStore(); createInitTopics(); produceInitData(); execInitCreateStreamQueries(); }

      Example 22

      /** * Get information about all topics in Kafka. * @return Set of topics found in Kafka. */ public List<TopicListing> getTopics() { try (final AdminClient adminClient = getAdminClient()) { return new ArrayList<>(adminClient.listTopics().listings().get()); } catch (final InterruptedException | ExecutionException e) { throw new RuntimeException(e.getMessage(), e); } }

      Example 23

      @SuppressWarnings("rawtypes") @Test public void bootPropertiesOverriddenExceptServers() throws Exception { KafkaProperties bootConfig = new KafkaProperties(); bootConfig.getProperties().put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); bootConfig.setBootstrapServers(Collections.singletonList("localhost:1234")); KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties( bootConfig); binderConfig.getConfiguration().put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SSL"); ClassPathResource ts = new ClassPathResource("test.truststore.ks"); binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ts.getFile().getAbsolutePath()); binderConfig.setBrokers("localhost:9092"); KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig, bootConfig); AdminClient adminClient = provisioner.createAdminClient(); assertThat(KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class); Map configs = KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder.configs", Map.class); assertThat( ((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0)) .isEqualTo("localhost:1234"); adminClient.close(); }

      Example 24

      /** * Get the data for the topic partition in the specified consumer group */ public Map<Integer, Long> getKafkaOffset(String clusterAlias, String group, String topic, Set<Integer> partitionids) { Map<Integer, Long> partitionOffset = new HashMap<>(); Properties prop = new Properties(); prop.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, parseBrokerServer(clusterAlias)); if (SystemConfigUtils.getBooleanProperty(clusterAlias + ".kafka.eagle.sasl.enable")) { sasl(prop, clusterAlias); } if (SystemConfigUtils.getBooleanProperty(clusterAlias + ".kafka.eagle.ssl.enable")) { ssl(prop, clusterAlias); } AdminClient adminClient = null; try { adminClient = AdminClient.create(prop); List<TopicPartition> tps = new ArrayList<>(); for (int partitionid : partitionids) { TopicPartition tp = new TopicPartition(topic, partitionid); tps.add(tp); } ListConsumerGroupOffsetsOptions consumerOffsetOptions = new ListConsumerGroupOffsetsOptions(); consumerOffsetOptions.topicPartitions(tps); ListConsumerGroupOffsetsResult offsets = adminClient.listConsumerGroupOffsets(group, consumerOffsetOptions); for (Entry<TopicPartition, OffsetAndMetadata> entry : offsets.partitionsToOffsetAndMetadata().get().entrySet()) { if (topic.equals(entry.getKey().topic())) { partitionOffset.put(entry.getKey().partition(), entry.getValue().offset()); } } } catch (Exception e) { LOG.error("Get consumer offset has error, msg is " + e.getMessage()); e.printStackTrace(); } finally { adminClient.close(); } return partitionOffset; }

      Example 25

      /** Alter topic config. */ public String changeTopicConfig(String clusterAlias, String topic, String type, ConfigEntry configEntry) { JSONObject object = new JSONObject(); Properties prop = new Properties(); prop.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, parseBrokerServer(clusterAlias)); if (SystemConfigUtils.getBooleanProperty(clusterAlias + ".kafka.eagle.sasl.enable")) { kafkaService.sasl(prop, clusterAlias); } if (SystemConfigUtils.getBooleanProperty(clusterAlias + ".kafka.eagle.ssl.enable")) { kafkaService.ssl(prop, clusterAlias); } try { switch (type) { case Topic.ADD: AdminClient adminClientAdd = AdminClient.create(prop); object.put("type", type); object.put("value", addTopicConfig(clusterAlias, adminClientAdd, topic, configEntry)); adminClientAdd.close(); break; case Topic.DELETE: AdminClient adminClientDelete = AdminClient.create(prop); object.put("type", type); object.put("value", deleteTopicConfig(clusterAlias, adminClientDelete, topic, configEntry)); adminClientDelete.close(); break; case Topic.DESCRIBE: object.put("type", type); object.put("value", describeTopicConfig(clusterAlias, topic)); break; default: break; } } catch (Exception e) { e.printStackTrace(); LOG.error("Type[" + type + "] topic config has error, msg is " + e.getMessage()); } return object.toJSONString(); }

      Example 26

      /** * Close the given AdminClient with the given timeout. * * @param adminClient AdminClient to be closed. * @param timeoutMs the timeout. */ public static void closeAdminClientWithTimeout(AdminClient adminClient, long timeoutMs) { closeClientWithTimeout(() -> { try { ((AutoCloseable) adminClient).close(); } catch (Exception e) { throw new IllegalStateException("Failed to close the Admin Client.", e); } }, timeoutMs); }

      Example 27

      /** * Creates the given topic if it does not exist. * * @param adminClient The adminClient to send createTopics request. * @param topicToBeCreated A wrapper around the topic to be created. * @return {@code false} if the topic to be created already exists, {@code true} otherwise. */ protected static boolean createTopic(AdminClient adminClient, NewTopic topicToBeCreated) { try { CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(topicToBeCreated)); createTopicsResult.values().get(topicToBeCreated.name()).get(CruiseControlMetricsUtils.CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); LOG.info("Topic {} has been created.", topicToBeCreated.name()); } catch (InterruptedException | ExecutionException | TimeoutException e) { if (e.getCause() instanceof TopicExistsException) { return false; } throw new IllegalStateException(String.format("Unable to create topic %s.", topicToBeCreated.name()), e); } return true; }

      Example 28

      @Before public void before() { kafkaAdminClient = AdminClient.create(config()); TopologyBuilderAdminClient adminClient = new TopologyBuilderAdminClient(kafkaAdminClient); topicManager = new TopicManager(adminClient); }

      Example 29

      @Test public void testAlwaysUseLargeMessageEnvelope() throws Exception { //create the test topic try (AdminClient adminClient = createRawAdminClient(null)) { adminClient.createTopics(Collections.singletonList(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1))).all().get(1, TimeUnit.MINUTES); } { long startTime = System.currentTimeMillis(); Properties props = new Properties(); props.setProperty(LARGE_MESSAGE_ENABLED_CONFIG, "true"); props.setProperty(LARGE_MESSAGE_SEGMENT_WRAPPING_REQUIRED_CONFIG, "true"); props.setProperty(MAX_MESSAGE_SEGMENT_BYTES_CONFIG, "200"); props.setProperty(CLIENT_ID_CONFIG, "testProducer"); LiKafkaProducer<String, String> largeMessageProducer = createProducer(props); // This is how large we expect the final message to be, including the version byte, checksum, segment info and // the user payload itself. final int expectedProducedMessageSize = + Byte.BYTES + Integer.BYTES + LargeMessageSegment.SEGMENT_INFO_OVERHEAD + "hello".length(); largeMessageProducer.send(new ProducerRecord<>(TOPIC, "hello"), (recordMetadata, e) -> { assertEquals(recordMetadata.serializedValueSize(), expectedProducedMessageSize); }); largeMessageProducer.close(); } }

      Example 30

      @Override public int run() { try (AdminClient kafka = KafkaAdminClient.create(kafkaConfig())) { TopicService topicService = new TopicServiceImpl(kafka, true); Collection<ConfiguredTopic> existing = topicService.listExisting(true).values() .stream() .sorted(Comparator.comparing(ConfiguredTopic::getName)) .collect(Collectors.toList()); System.out.println(MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(existing)); return SUCCESS; } catch (JsonProcessingException e) { LOG.error("Failed to dump config", e); return FAILURE; } }
      Sours: https://www.programcreek.com/java-api-examples/?api=org.apache.kafka.clients.admin.AdminClient

      Client example admin kafka

      Best Java code snippets using org.apache.kafka.clients.admin.AdminClient.describeTopics(Showing top 20 results out of 315)

      • Common ways to obtain AdminClient

        private void myMethod () {

        }

        public DescribeTopicsResult describeTopics(Collection<String> topicNames) { returndescribeTopics(topicNames, new DescribeTopicsOptions()); }
        @Test publicvoid testAdminClientWithInvalidCredentials() { Map<String, Object> props = new HashMap<>(saslClientConfigs); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); try (AdminClient client = AdminClient.create(props)) { DescribeTopicsResult result = client.describeTopics(Collections.singleton("test")); result.all().get(); fail("Expected an authentication error!"); } catch (Exception e) { assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(), e.getCause() instanceof SaslAuthenticationException); } }
        @Override publicint getLeaderToShutDown(String topic) throws Exception { AdminClient client = AdminClient.create(getStandardProperties()); TopicDescription result = client.describeTopics(Collections.singleton(topic)).all().get().get(topic); return result.partitions().get(0).leader().id(); }
        publicvoid createTopics(List<String> topicNames, int numPartitions) { List<NewTopic> newTopics = new ArrayList<>(); for (String topicName: topicNames) { NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1); newTopics.add(newTopic); } getAdminClient().createTopics(newTopics); DescribeTopicsResult dtr = getAdminClient().describeTopics(topicNames); try { dtr.all().get(10, TimeUnit.SECONDS); } catch (Exception e) { thrownew RuntimeException("Error getting topic info", e); } } publicvoid deleteTopic(String topicName) {
        singletonList(partitionMetadata))))); DescribeTopicsResult result = env.adminClient().describeTopics(Collections.singleton(topic)); Map<String, TopicDescription> topicDescriptions = result.all().get(); assertEquals(leader, topicDescriptions.get(topic).partitions().get(0).leader());
        private Map<String, TopicDescription> createTopics() throws InterruptedException { AdminClient adminClient = getAdminClient(broker(0).getPlaintextAddr()); adminClient.createTopics(Arrays.asList(new NewTopic(TOPIC0, 1, (short) 1), new NewTopic(TOPIC1, 1, (short) 2))); Map<String, TopicDescription> topicDescriptions0 = null; Map<String, TopicDescription> topicDescriptions1 = null; do { try (AdminClient adminClient0 = getAdminClient(broker(0).getPlaintextAddr()); AdminClient adminClient1 = getAdminClient(broker(1).getPlaintextAddr())) { topicDescriptions0 = adminClient0.describeTopics(Arrays.asList(TOPIC0, TOPIC1)).all().get(); topicDescriptions1 = adminClient1.describeTopics(Arrays.asList(TOPIC0, TOPIC1)).all().get(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } catch (ExecutionException ee) { } } while (topicDescriptions0 == null || topicDescriptions0.size() < 2 || topicDescriptions1 == null || topicDescriptions1.size() < 2); return topicDescriptions0; }
        @Test publicvoid testInvalidTopicNames() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); List<String> sillyTopicNames = asList("", null); Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values(); for (String sillyTopicName : sillyTopicNames) { TestUtils.assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); Map<String, KafkaFuture<TopicDescription>> describeFutures = env.adminClient().describeTopics(sillyTopicNames).values(); for (String sillyTopicName : sillyTopicNames) { TestUtils.assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); List<NewTopic> newTopics = new ArrayList<>(); for (String sillyTopicName : sillyTopicNames) { newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1)); } Map<String, KafkaFuture<Void>> createFutures = env.adminClient().createTopics(newTopics).values(); for (String sillyTopicName : sillyTopicNames) { TestUtils.assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); } }
        privatevoid addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) { if (topics.size() > 0) { Map<String, NewTopic> topicNameToTopic = new HashMap<>(); topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> v = t)); DescribeTopicsResult topicInfo = adminClient .describeTopics(topics.stream() .map(NewTopic::name) .collect(Collectors.toList())); List<NewTopic> topicsToAdd = new ArrayList<>(); Map<String, NewPartitions> topicsToModify = checkPartitions(topicNameToTopic, topicInfo, topicsToAdd); if (topicsToAdd.size() > 0) { addTopics(adminClient, topicsToAdd); } if (topicsToModify.size() > 0) { modifyTopics(adminClient, topicsToModify); } } }
        @Test publicvoid testAddTopics() throws Exception { AdminClient adminClient = AdminClient.create(this.admin.getConfig()); DescribeTopicsResult topics = adminClient.describeTopics(Arrays.asList("foo", "bar")); topics.all().get(); new DirectFieldAccessor(this.topic1).setPropertyValue("numPartitions", 2); new DirectFieldAccessor(this.topic2).setPropertyValue("numPartitions", 3); this.admin.initialize(); topics = adminClient.describeTopics(Arrays.asList("foo", "bar")); Map<String, TopicDescription> results = topics.all().get(); results.forEach((name, td) -> assertThat(td.partitions()).hasSize(name.equals("foo") ? 2 : 3)); adminClient.close(10, TimeUnit.SECONDS); }
        privateint getDefaultNumberOfPartitions(AdminClient adminClient) { try { return adminClient.describeTopics(Collections.singleton(Operation.OPERATION_EVENTS)) .all().get().values().iterator().next() .partitions().size(); } catch (InterruptedException | ExecutionException | NullPointerException e) { log.warn("Error while Calculating Number of Partitions from Topic: " + Operation.OPERATION_EVENTS + " Assuming " + DEFAULT_NUM_PARTITIONS); return DEFAULT_NUM_PARTITIONS; } } }
        public TopicDescription describeTopic(final String topicName) { try (final AdminClient adminClient = getAdminClient()) { final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName)); return describeTopicsResult.values().get(topicName).get(); } catch (final InterruptedException | ExecutionException e) { thrownew RuntimeException(e.getMessage(), e); } }
        TopicDescription topicDescription(String topicName) { DescribeTopicsResult dt = adminClient.describeTopics(singleton(topicName)); try { dt.all().get(); TopicDescription topicDescription = dt.values().get(topicName).get(); return topicDescription; } catch (InterruptedException | ExecutionException e) { if (e.getCause() != null && e.getCause() instanceof UnknownTopicOrPartitionException) { return null; } thrownew IllegalStateException("Exception occured during topic details retrieval. name: " + topicName, e); } }
        publicint getNumberOfPartitions(String topic) { DescribeTopicsResult descriptions = adminClient.describeTopics(Collections.singletonList(topic)); try { return descriptions.values().get(topic).get().partitions().size(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); thrownew StreamRuntimeException(e); } catch (ExecutionException e) { thrownew StreamRuntimeException(e); } }
        privateint numberOfPartitionsForTopic(Topic topic) throws ExecutionException, InterruptedException { List<String> kafkaTopicsNames = kafkaNamesMapper.toKafkaTopics(topic).stream() .map(kafkaTopic -> kafkaTopic.name().asString()) .collect(Collectors.toList()); return adminClient.describeTopics(kafkaTopicsNames).all().get().values().stream() .map(v -> v.partitions().size()) .reduce(0, Integer::sum); } }
        public TopicDescription getDescriptionByTopicName(String topic) throws Exception { List<String> topics = new ArrayList<String>(); topics.add(topic); DescribeTopicsOptions dto = new DescribeTopicsOptions(); dto.timeoutMs(5 * 1000); DescribeTopicsResult dtr = adminClient.describeTopics(topics, dto); return dtr.all().get().get(topic); }
        privatevoid maybeCreateConsumerOffsets() throws InterruptedException { final String consumerOffsets = "__consumer_offsets"; retry("create consumer offsets", () -> { Map<String, TopicDescription> description = admin.describeTopics(Collections.singleton(consumerOffsets)).all().get(10, TimeUnit.SECONDS); if (description.isEmpty()) { createTopic(consumerOffsets); } }); }
        privateint numberOfPartitions(final String topic, final String brokerList) throws ExecutionException, InterruptedException { if (!brokerConfig.isEnabled()) { return1; } final AdminClient client = makeAdminClient(brokerList); final DescribeTopicsResult result = client.describeTopics(Lists.newArrayList(topic)); final TopicDescription topicDescription = result.all().get().get(topic); return topicDescription.partitions().size(); }
        @Override publicvoid topicMetadata(TopicName topicName, Handler<AsyncResult<TopicMetadata>> handler) { LOGGER.debug("Getting metadata for topic {}", topicName); ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName.toString()); KafkaFuture<TopicDescription> descriptionFuture = adminClient.describeTopics( Collections.singleton(topicName.toString())).values().get(topicName.toString()); KafkaFuture<Config> configFuture = adminClient.describeConfigs( Collections.singleton(resource)).values().get(resource); queueWork(new MetadataWork(descriptionFuture, configFuture, result -> handler.handle(result))); }
        @Override boolean runInternal(StopWatch stopWatch) throws InterruptedException, ExecutionException { stopWatch.start("adminClient.listTopics()"); Collection<String> topicNames = adminClient.listTopics().listings().get() .stream().map(TopicListing::name).filter(this::shouldCollectEvent).collect(Collectors.toList()); topicsMap.removeAll(new RemoveTopicPredicate(topicNames)); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicNames); describeTopicsResult.all().get().forEach( (topic, topicDescription) -> topicsMap.executeOnKey(topic, new SetTopicPartitionsProcessor( topicDescription.partitions().stream().map(TopicPartitionInfo::partition).collect(Collectors.toList())) ) ); metaMap.set(this.getName() + TopicServiceScheduler.LAST_SUCCESS_PREFIX, System.currentTimeMillis()); log.debug("Topics:" + topicsMap.entrySet()); log.debug(stopWatch.prettyPrint()); returntrue; }
        public Map<String, TopicDescription> getTopicAndDescriptions() throws Exception { try { ListTopicsOptions lto = new ListTopicsOptions(); lto.timeoutMs(10 * 1000); ListTopicsResult ltr = adminClient.listTopics(lto); DescribeTopicsOptions dto = new DescribeTopicsOptions(); dto.timeoutMs(15 * 1000); DescribeTopicsResult dtr = adminClient.describeTopics(ltr.names().get(), dto); return dtr.all().get(); } catch (Exception e) { throw e; } }
        Sours: https://www.codota.com/code/java/methods/org.apache.kafka.clients.admin.AdminClient/describeTopics
        Kafka Consumer Groups Examples

        Kafka - Introduction to Kafka Admin API

        The Admin API supports managing and inspecting topics, brokers, acls, and other Kafka objects.

        In this tutorial we will see getting started examples of how to use Kafka Admin API.

        Start Kafka server as describe here.

        How to list Kafka configuration?

        package com.logicbig.example; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.common.Node; import org.apache.kafka.common.config.ConfigResource; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException; public class ListingConfigs { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties config = new Properties(); config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); AdminClient admin = AdminClient.create(config); for (Node node : admin.describeCluster().nodes().get()) { System.out.println("-- node: " + node.id() + " --"); ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, "0"); DescribeConfigsResult dcr = admin.describeConfigs(Collections.singleton(cr)); dcr.all().get().forEach((k, c) -> { c.entries() .forEach(configEntry -> {System.out.println(configEntry.name() + "= " + configEntry.value());}); }); } } } -- node: 0 --
        advertised.host.name= null
        log.cleaner.min.compaction.lag.ms= 0
        metric.reporters=
        quota.producer.default= 9223372036854775807
        offsets.topic.num.partitions= 50
        log.flush.interval.messages= 9223372036854775807
        controller.socket.timeout.ms= 30000
        auto.create.topics.enable= true
        log.flush.interval.ms= null
        principal.builder.class= null
        replica.socket.receive.buffer.bytes= 65536
        min.insync.replicas= 1
        replica.fetch.wait.max.ms= 500
        num.recovery.threads.per.data.dir= 1
        ssl.keystore.type= JKS
        password.encoder.iterations= 4096
        sasl.mechanism.inter.broker.protocol= GSSAPI
        default.replication.factor= 1
        ssl.truststore.password= null
        log.preallocate= false
        sasl.kerberos.principal.to.local.rules= DEFAULT
        fetch.purgatory.purge.interval.requests= 1000
        ssl.endpoint.identification.algorithm= https
        replica.socket.timeout.ms= 30000
        message.max.bytes= 1000012
        transactional.id.expiration.ms= 604800000
        transaction.state.log.replication.factor= 1
        control.plane.listener.name= null
        num.io.threads= 8
        sasl.login.refresh.buffer.seconds= 300
        connections.max.reauth.ms= 0
        connection.failed.authentication.delay.ms= 100
        offsets.commit.required.acks= -1
        log.flush.offset.checkpoint.interval.ms= 60000
        delete.topic.enable= true
        quota.window.size.seconds= 1
        ssl.truststore.type= JKS
        offsets.commit.timeout.ms= 5000
        quota.window.num= 11
        zookeeper.connect= localhost:2181
        authorizer.class.name=
        password.encoder.secret= null
        log.cleaner.max.compaction.lag.ms= 9223372036854775807
        num.replica.fetchers= 1
        alter.log.dirs.replication.quota.window.size.seconds= 1
        log.retention.ms= null
        alter.log.dirs.replication.quota.window.num= 11
        log.roll.jitter.hours= 0
        password.encoder.old.secret= null
        log.cleaner.enable= true
        offsets.load.buffer.size= 5242880
        log.cleaner.delete.retention.ms= 86400000
        ssl.client.auth= none
        controlled.shutdown.max.retries= 3
        offsets.topic.replication.factor= 1
        queued.max.requests= 500
        transaction.state.log.min.isr= 1
        log.cleaner.threads= 1
        ssl.secure.random.implementation= null
        sasl.kerberos.service.name= null
        sasl.kerberos.ticket.renew.jitter= 0.05
        socket.request.max.bytes= 104857600
        ssl.trustmanager.algorithm= PKIX
        zookeeper.session.timeout.ms= 6000
        log.retention.bytes= -1
        sasl.jaas.config= null
        log.message.timestamp.type= CreateTime
        sasl.kerberos.min.time.before.relogin= 60000
        zookeeper.set.acl= false
        connections.max.idle.ms= 600000
        offsets.retention.minutes= 10080
        max.connections= 2147483647
        delegation.token.expiry.time.ms= 86400000
        transaction.state.log.num.partitions= 50
        replica.fetch.backoff.ms= 1000
        inter.broker.protocol.version= 2.4-IV1
        kafka.metrics.reporters=
        listener.security.protocol.map= PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
        log.retention.hours= 168
        num.partitions= 1
        client.quota.callback.class= null
        broker.id.generation.enable= true
        listeners= null
        ssl.provider= null
        ssl.enabled.protocols= TLSv1.2,TLSv1.1,TLSv1
        inter.broker.listener.name= null
        delete.records.purgatory.purge.interval.requests= 1
        log.roll.ms= null
        alter.config.policy.class.name= null
        delegation.token.expiry.check.interval.ms= 3600000
        ssl.cipher.suites=
        zookeeper.max.in.flight.requests= 10
        log.flush.scheduler.interval.ms= 9223372036854775807
        log.index.size.max.bytes= 10485760
        ssl.keymanager.algorithm= SunX509
        sasl.login.callback.handler.class= null
        security.inter.broker.protocol= PLAINTEXT
        replica.fetch.max.bytes= 1048576
        sasl.server.callback.handler.class= null
        advertised.port= null
        log.cleaner.dedupe.buffer.size= 134217728
        replica.high.watermark.checkpoint.interval.ms= 5000
        replication.quota.window.size.seconds= 1
        log.cleaner.io.buffer.size= 524288
        sasl.kerberos.ticket.renew.window.factor= 0.8
        create.topic.policy.class.name= null
        zookeeper.connection.timeout.ms= 6000
        metrics.recording.level= INFO
        password.encoder.cipher.algorithm= AES/CBC/PKCS5Padding
        controlled.shutdown.retry.backoff.ms= 5000
        security.providers= null
        log.roll.hours= 168
        log.cleanup.policy= delete
        log.flush.start.offset.checkpoint.interval.ms= 60000
        ssl.principal.mapping.rules= DEFAULT
        host.name=
        replica.selector.class= null
        log.roll.jitter.ms= null
        transaction.state.log.segment.bytes= 104857600
        max.connections.per.ip= 2147483647
        offsets.topic.segment.bytes= 104857600
        background.threads= 10
        quota.consumer.default= 9223372036854775807
        request.timeout.ms= 30000
        group.initial.rebalance.delay.ms= 0
        log.message.format.version= 2.4-IV1
        sasl.login.class= null
        log.index.interval.bytes= 4096
        log.dir= /tmp/kafka-logs
        log.segment.bytes= 1073741824
        log.cleaner.backoff.ms= 15000
        offset.metadata.max.bytes= 4096
        ssl.truststore.location= null
        replica.fetch.response.max.bytes= 10485760
        group.max.session.timeout.ms= 1800000
        ssl.keystore.password= null
        port= 9092
        zookeeper.sync.time.ms= 2000
        log.retention.minutes= null
        log.segment.delete.delay.ms= 60000
        log.dirs= /tmp/kafka-logs
        controlled.shutdown.enable= true
        compression.type= producer
        max.connections.per.ip.overrides=
        log.message.timestamp.difference.max.ms= 9223372036854775807
        sasl.login.refresh.min.period.seconds= 60
        password.encoder.key.length= 128
        sasl.login.refresh.window.factor= 0.8
        kafka.metrics.polling.interval.secs= 10
        transaction.abort.timed.out.transaction.cleanup.interval.ms= 60000
        sasl.kerberos.kinit.cmd= /usr/bin/kinit
        log.cleaner.io.max.bytes.per.second= 1.7976931348623157E308
        auto.leader.rebalance.enable= true
        leader.imbalance.check.interval.seconds= 300
        log.cleaner.min.cleanable.ratio= 0.5
        replica.lag.time.max.ms= 10000
        max.incremental.fetch.session.cache.slots= 1000
        delegation.token.master.key= null
        num.network.threads= 3
        ssl.key.password= null
        reserved.broker.max.id= 1000
        sasl.client.callback.handler.class= null
        metrics.num.samples= 2
        transaction.remove.expired.transaction.cleanup.interval.ms= 3600000
        socket.send.buffer.bytes= 102400
        log.message.downconversion.enable= true
        ssl.protocol= TLS
        password.encoder.keyfactory.algorithm= null
        transaction.state.log.load.buffer.size= 5242880
        socket.receive.buffer.bytes= 102400
        ssl.keystore.location= null
        replica.fetch.min.bytes= 1
        broker.rack= null
        unclean.leader.election.enable= false
        num.replica.alter.log.dirs.threads= null
        sasl.enabled.mechanisms= GSSAPI
        group.min.session.timeout.ms= 6000
        offsets.retention.check.interval.ms= 600000
        log.cleaner.io.buffer.load.factor= 0.9
        transaction.max.timeout.ms= 900000
        producer.purgatory.purge.interval.requests= 1000
        metrics.sample.window.ms= 30000
        group.max.size= 2147483647
        broker.id= 0
        offsets.topic.compression.codec= 0
        delegation.token.max.lifetime.ms= 604800000
        replication.quota.window.num= 11
        log.retention.check.interval.ms= 300000
        advertised.listeners= null
        leader.imbalance.per.broker.percentage= 10
        sasl.login.refresh.window.jitter= 0.05
        queued.max.request.bytes= -1
        Sours: https://www.logicbig.com/tutorials/misc/kafka/admin-api-getting-started.html

        You will also like:

        packagecom.trivadis.kafka.sample;import staticorg.junit.Assert.*;importjava.util.Collections;importjava.util.HashMap;importjava.util.Map;importjava.util.Set;importjava.util.concurrent.ExecutionException;importorg.apache.kafka.clients.admin.AdminClient;importorg.apache.kafka.clients.admin.AdminClientConfig;importorg.apache.kafka.clients.admin.AlterConfigsOptions;importorg.apache.kafka.clients.admin.AlterConfigsResult;importorg.apache.kafka.clients.admin.Config;importorg.apache.kafka.clients.admin.ConfigEntry;importorg.apache.kafka.clients.admin.CreateTopicsOptions;importorg.apache.kafka.clients.admin.DescribeConfigsResult;importorg.apache.kafka.clients.admin.KafkaAdminClient;importorg.apache.kafka.clients.admin.ListTopicsResult;importorg.apache.kafka.clients.admin.NewTopic;importorg.apache.kafka.common.KafkaFuture;importorg.apache.kafka.common.config.ConfigResource;importorg.apache.kafka.common.config.TopicConfig;importorg.junit.After;importorg.junit.Before;importorg.junit.Test;importkafka.log.LogConfig;publicclassTestKafkaAdminClient {privateAdminClient client =null;@Beforepublicvoidsetup() {Map<String, Object> conf =newHashMap<>(); conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.69.154:19092"); conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); client =AdminClient.create(conf); }@Afterpublicvoidteardown() { client.close(); }@TestpublicvoidtestNames() throwsInterruptedException, ExecutionException {ListTopicsResult ltr = client.listTopics();KafkaFuture<Set<String>> names = ltr.names();System.out.println(names.get()); }//@TestpublicvoidtestCreateTopic() {int partitions =8;short replicationFactor =2;try {KafkaFuture<Void> future = client .createTopics(Collections.singleton(newNewTopic("tweet", partitions, replicationFactor)),newCreateTopicsOptions().timeoutMs(10000)) .all(); future.get(); } catch (InterruptedException|ExecutionException e) {// TODO Auto-generated catch block e.printStackTrace(); } }//@TestpublicvoidtestDeleteTopic() {KafkaFuture<Void> future = client.deleteTopics(Collections.singleton("tweet")).all();try { future.get(); } catch (InterruptedException|ExecutionException e) {// TODO Auto-generated catch block e.printStackTrace(); } }@TestpublicvoidtestChangeProperties() throwsInterruptedException, ExecutionException {ConfigResource resource =newConfigResource(ConfigResource.Type.TOPIC, "tweet");// get the current topic configurationDescribeConfigsResult describeConfigsResult = client.describeConfigs(Collections.singleton(resource));Map<ConfigResource, Config> config; config = describeConfigsResult.all().get();System.out.println(config);// create a new entry for updating the retention.ms value on the same topicConfigEntry retentionEntry =newConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "60000");Map<ConfigResource, Config> updateConfig =newHashMap<ConfigResource, Config>(); updateConfig.put(resource, newConfig(Collections.singleton(retentionEntry)));AlterConfigsResult alterConfigsResult = client.alterConfigs(updateConfig); alterConfigsResult.all(); describeConfigsResult = client.describeConfigs(Collections.singleton(resource)); config = describeConfigsResult.all().get(); }}
        Sours: https://github.com/gschmutz/kafka-examples/blob/master/kafka-admin-client/src/test/java/com/trivadis/kafka/sample/TestKafkaAdminClient.java


        1261 1262 1263 1264 1265