kafka obtiene el recuento de particiones para un tema

5 minutos de lectura

¿Cómo puedo obtener el número de particiones para cualquier tema de kafka del código? He investigado muchos enlaces pero ninguno parece funcionar.

Mencionando algunos:

http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api

http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic

http://qnalist.com/questions/5809219/get-replication-and-partition-count-of-a-topic

que parecen discusiones similares.

También hay enlaces similares en SO que no tienen una solución funcional para esto.

  • ¿Qué versión de Kafka?

    – Marko Bonaci

    16 de febrero de 2016 a las 17:48

  • vish4071, ¿qué tal aceptar la solución que terminó usando?

    – Marko Bonaci

    7 de octubre de 2018 a las 9:09

avatar de usuario de peter.petrov
peter.petrov

Ir a tu kafka/bin directorio.

Luego ejecuta esto:

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name

Deberías ver lo que necesitas debajo PartitionCount.

Topic:topic_name        PartitionCount:5        ReplicationFactor:1     Configs:
        Topic: topic_name       Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 3    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 4    Leader: 1001    Replicas: 1001  Isr: 1001

Al usar una versión en la que Zookeeper ya no depende de Kafka

kafka-topics --describe --bootstrap-server localhost:9092 --topic topic_name

  • Voté a favor porque estaba buscando una solución sin código y esto fue perfecto.

    –David Kaczynski

    2 de marzo de 2017 a las 15:24

  • Si desea resumir todas las particiones (incluidas las réplicas) para una expresión regular de tema, consulte mi respuesta a continuación.

    – Programador pragmático

    14 de enero de 2021 a las 20:29

  • no hay ningún campo “PartitionCount” para mí. cada partición tiene su propia línea pero no un campo de conteo total de particiones para ser visto.

    – Bunyamin Şentürk

    29 de julio de 2022 a las 8:47

Avatar de usuario de Sunil Patil
Sunil Patil

En la API Producer 0.82 y la API Consumer 0.9 puedes usar algo como

Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
producer.partitionsFor("test")

En código java podemos usar AdminClient para obtener sumas de partes de un tema.

Properties props = new Properties();
props.put("bootstrap.servers", "host:9092");
AdminClient client = AdminClient.create(props);

DescribeTopicsResult result = client.describeTopics(Arrays.asList("TEST"));
Map<String, KafkaFuture<TopicDescription>>  values = result.values();
KafkaFuture<TopicDescription> topicDescription = values.get("TEST");
int partitions = topicDescription.get().partitions().size();
System.out.println(partitions);

Avatar de usuario de Marko Bonaci
Marko Bonaci

Así es como lo hago:

  /**
   * Retrieves list of all partitions IDs of the given {@code topic}.
   * 
   * @param topic
   * @param seedBrokers List of known brokers of a Kafka cluster
   * @return list of partitions or empty list if none found
   */
  public static List<Integer> getPartitionsForTopic(String topic, List<BrokerInfo> seedBrokers) {
    for (BrokerInfo seed : seedBrokers) {
      SimpleConsumer consumer = null;
      try {
        consumer = new SimpleConsumer(seed.getHost(), seed.getPort(), 20000, 128 * 1024, "partitionLookup");
        List<String> topics = Collections.singletonList(topic);
        TopicMetadataRequest req = new TopicMetadataRequest(topics);
        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

        List<Integer> partitions = new ArrayList<>();
        // find our partition's metadata
        List<TopicMetadata> metaData = resp.topicsMetadata();
        for (TopicMetadata item : metaData) {
          for (PartitionMetadata part : item.partitionsMetadata()) {
            partitions.add(part.partitionId());
          }
        }
        return partitions;  // leave on first successful broker (every broker has this info)
      } catch (Exception e) {
        // try all available brokers, so just report error and go to next one
        LOG.error("Error communicating with broker [" + seed + "] to find list of partitions for [" + topic + "]. Reason: " + e);
      } finally {
        if (consumer != null)
          consumer.close();
      }
    }
    throw new RuntimeError("Could not get partitions");
  }

Tenga en cuenta que solo necesitaba extraer los ID de partición, pero también puede recuperar cualquier otro metadato de partición, como leader, isr, replicas
Y BrokerInfo es solo un POJO simple que tiene host y port campos.

Avatar de usuario de MD5
MD5

Debajo de shell cmd puede imprimir el número de particiones. Debería estar en el directorio kafka bin antes de ejecutar el cmd:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{print "count of partitions=" $1}'

Tenga en cuenta que debe cambiar el nombre del tema según sus necesidades. También puede validar esto usando la condición if:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{if ($1=="16") print "valid partitions"}'

El comando cmd anterior imprime particiones válidas si el recuento es 16. Puede cambiar el recuento según sus requisitos.

avatar de usuario de pjkmgs
pjkmgs

Usar PartitionList de KafkaConsumer

     //create consumer then loop through topics
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    List<PartitionInfo> partitions = consumer.partitionsFor(topic);

    ArrayList<Integer> partitionList = new ArrayList<>();
    System.out.println(partitions.get(0).partition());

    for(int i = 0; i < partitions.size(); i++){
        partitionList.add(partitions.get(i).partition());
    }

    Collections.sort(partitionList);

Debería funcionar como un encanto. Avíseme si hay una forma más sencilla de acceder a la lista de particiones desde el tema.

Avatar de usuario de Avinash Kumar Pandey
Avinash Kumar Pandey

Entonces, el siguiente enfoque funciona para kafka 0.10 y no utiliza ninguna API de productor o consumidor. Utiliza algunas clases de la API de Scala en Kafka, como ZkConnection y ZkUtils.

    ZkConnection zkConnection = new ZkConnection(zkConnect);
    ZkUtils zkUtils = new ZkUtils(zkClient,zkConnection,false);
    System.out.println(JavaConversions.mapAsJavaMap(zkUtils.getPartitionAssignmentForTopics(
         JavaConversions.asScalaBuffer(topicList))).get("bidlogs_kafka10").size());

¿Ha sido útil esta solución?