¿Cómo puedo obtener el número de particiones para cualquier tema de kafka del código? He investigado muchos enlaces pero ninguno parece funcionar.
Mencionando algunos:
http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api
http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic
http://qnalist.com/questions/5809219/get-replication-and-partition-count-of-a-topic
que parecen discusiones similares.
También hay enlaces similares en SO que no tienen una solución funcional para esto.
peter.petrov
Ir a tu kafka/bin
directorio.
Luego ejecuta esto:
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name
Deberías ver lo que necesitas debajo PartitionCount
.
Topic:topic_name PartitionCount:5 ReplicationFactor:1 Configs:
Topic: topic_name Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 3 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001
Al usar una versión en la que Zookeeper ya no depende de Kafka
kafka-topics --describe --bootstrap-server localhost:9092 --topic topic_name
-
Voté a favor porque estaba buscando una solución sin código y esto fue perfecto.
–David Kaczynski
2 de marzo de 2017 a las 15:24
-
Si desea resumir todas las particiones (incluidas las réplicas) para una expresión regular de tema, consulte mi respuesta a continuación.
– Programador pragmático
14 de enero de 2021 a las 20:29
-
no hay ningún campo “PartitionCount” para mí. cada partición tiene su propia línea pero no un campo de conteo total de particiones para ser visto.
– Bunyamin Şentürk
29 de julio de 2022 a las 8:47
Sunil Patil
En la API Producer 0.82 y la API Consumer 0.9 puedes usar algo como
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
producer.partitionsFor("test")
-
Esto supone que estoy usando KafkaConsumer para mi consumidor, pero estoy usando ConsumerConnector para él. referirse: cwiki.apache.org/confluence/display/KAFKA/…
– vish4071
17 de febrero de 2016 a las 9:10
En código java podemos usar AdminClient
para obtener sumas de partes de un tema.
Properties props = new Properties();
props.put("bootstrap.servers", "host:9092");
AdminClient client = AdminClient.create(props);
DescribeTopicsResult result = client.describeTopics(Arrays.asList("TEST"));
Map<String, KafkaFuture<TopicDescription>> values = result.values();
KafkaFuture<TopicDescription> topicDescription = values.get("TEST");
int partitions = topicDescription.get().partitions().size();
System.out.println(partitions);
Marko Bonaci
Así es como lo hago:
/**
* Retrieves list of all partitions IDs of the given {@code topic}.
*
* @param topic
* @param seedBrokers List of known brokers of a Kafka cluster
* @return list of partitions or empty list if none found
*/
public static List<Integer> getPartitionsForTopic(String topic, List<BrokerInfo> seedBrokers) {
for (BrokerInfo seed : seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed.getHost(), seed.getPort(), 20000, 128 * 1024, "partitionLookup");
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<Integer> partitions = new ArrayList<>();
// find our partition's metadata
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
partitions.add(part.partitionId());
}
}
return partitions; // leave on first successful broker (every broker has this info)
} catch (Exception e) {
// try all available brokers, so just report error and go to next one
LOG.error("Error communicating with broker [" + seed + "] to find list of partitions for [" + topic + "]. Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
throw new RuntimeError("Could not get partitions");
}
Tenga en cuenta que solo necesitaba extraer los ID de partición, pero también puede recuperar cualquier otro metadato de partición, como leader
, isr
, replicas
…
Y BrokerInfo
es solo un POJO simple que tiene host
y port
campos.
MD5
Debajo de shell cmd puede imprimir el número de particiones. Debería estar en el directorio kafka bin antes de ejecutar el cmd:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{print "count of partitions=" $1}'
Tenga en cuenta que debe cambiar el nombre del tema según sus necesidades. También puede validar esto usando la condición if:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{if ($1=="16") print "valid partitions"}'
El comando cmd anterior imprime particiones válidas si el recuento es 16. Puede cambiar el recuento según sus requisitos.
pjkmgs
Usar PartitionList de KafkaConsumer
//create consumer then loop through topics
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
ArrayList<Integer> partitionList = new ArrayList<>();
System.out.println(partitions.get(0).partition());
for(int i = 0; i < partitions.size(); i++){
partitionList.add(partitions.get(i).partition());
}
Collections.sort(partitionList);
Debería funcionar como un encanto. Avíseme si hay una forma más sencilla de acceder a la lista de particiones desde el tema.
Avinash Kumar Pandey
Entonces, el siguiente enfoque funciona para kafka 0.10 y no utiliza ninguna API de productor o consumidor. Utiliza algunas clases de la API de Scala en Kafka, como ZkConnection y ZkUtils.
ZkConnection zkConnection = new ZkConnection(zkConnect);
ZkUtils zkUtils = new ZkUtils(zkClient,zkConnection,false);
System.out.println(JavaConversions.mapAsJavaMap(zkUtils.getPartitionAssignmentForTopics(
JavaConversions.asScalaBuffer(topicList))).get("bidlogs_kafka10").size());
¿Qué versión de Kafka?
– Marko Bonaci
16 de febrero de 2016 a las 17:48
vish4071, ¿qué tal aceptar la solución que terminó usando?
– Marko Bonaci
7 de octubre de 2018 a las 9:09