By ramu


2013-06-05 17:59:45 8 Comments

How Can we create a topic in Kafka from the IDE using API because when I do this:

bin/kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181

I get the error:

bash: bin/kafka-create-topic.sh: No such file or directory

And I followed the developer setup as it is.

11 comments

@ForeverLearner 2019-02-21 09:32:04

Based on the latest kafka-client api and Kafka 2.1.1, the working version of code follows:

Import the latest kafka-clients using sbt.

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += Seq("org.apache.kafka" % "kafka-clients" % "2.1.1",
"org.apache.kafka" %% "kafka" % "2.1.1")

The code for topic creation in scala:

import java.util.Arrays
import java.util.Properties

import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}

class CreateKafkaTopic {
  def create(): Unit = {
    val config = new Properties()
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.30.1.5:9092")

    val localKafkaAdmin = AdminClient.create(config)

    val partitions = 3
    val replication = 1.toShort
    val topic = new NewTopic("integration-02", partitions, replication)
    val topics = Arrays.asList(topic)

    val topicStatus = localKafkaAdmin.createTopics(topics).values()
    //topicStatus.values()
    println(topicStatus.keySet())
  }

}

Validate the new topic using:

./kafka-topics.sh --zookeeper 192.30.1.5:2181 --list

Hope it is helpful to someone. Reference: http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html

@Mahesh Mogal 2018-07-02 18:18:20

There is AdminZkClient which we can use to manage topics in Kafka server.

String zookeeperHost = "127.0.0.1:2181";
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
                connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);

AdminZkClient adminZkClient = new AdminZkClient(zkClient);

String topicName1 = "myTopic";
int partitions = 3;
int replication = 1;
Properties topicConfig = new Properties();

adminZkClient.createTopic(topicName1,partitions,replication,
            topicConfig,RackAwareMode.Disabled$.MODULE$);

You can refer this link for details https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java/

@Dmitry Minkovsky 2017-07-15 21:37:25

As of 0.11.0.0 all you need is:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

This artifact now contains the AdminClient (org.apache.kafka.clients.admin).

AdminClient can handle many Kafka admin tasks, including topic creation:

Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");

AdminClient admin = AdminClient.create(config);

Map<String, String> configs = new HashMap<>();
int partitions = 1;
int replication = 1;

admin.createTopics(asList(new NewTopic("topic", partitions, replication).configs(configs)));

The output of this command is a CreateTopicsResult, which you can use to get a Future for the whole operation or for each individual topic creation:

  • to get a future for the whole operation, use CreateTopicsResult#all().
  • to get Futures for all the topics individually, use CreateTopicsResult#values().

For example:

CreateTopicsResult result = ...
KafkaFuture<Void> all = result.all();

or:

CreateTopicsResult result = ...
for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
    try {
        entry.getValue().get();
        log.info("topic {} created", entry.getKey());
    } catch (InterruptedException | ExecutionException e) {
        if (Throwables.getRootCause(e) instanceof TopicExistsException) {
            log.info("topic {} existed", entry.getKey());
        }
    }
}

KafkaFuture is "a flexible future which supports call chaining and other asynchronous programming patterns," and "will eventually become a thin shim on top of Java 8's CompletebleFuture."

@sam 2017-09-14 09:49:34

Is there a reason why you set twice the replication and partitions ? once in the constructor and once in the .configs ?

@Dmitry Minkovsky 2017-09-14 15:21:47

@sam not sure what you mean? Replication and partition are only specified in the NewTopic constructor

@sam 2017-09-14 15:30:05

Ah my bad, I mistook the variables for settings put into the config , sorry for that

@Dmitry Minkovsky 2017-09-14 15:31:25

@sam all good! Yeah the configs are the topic-specific configs you can find in the documentation

@jmhostalet 2018-07-13 06:56:39

hangs forever :-(

@its_a_paddo 2017-06-20 08:58:23

As of Kafka 0.10.1 the ZKStringSerializer mentioned by Michael is private (for Scala). You can use the factory methods createZkClient or createZkClientAndConnection in ZkUtils.

Scala example for Kafka 0.10.1:

import kafka.utils.ZkUtils

val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(
  "localhost:2181", sessionTimeoutMs, connectionTimeoutMs) 

Then just create the topic as Michael suggested:

import kafka.admin.AdminUtils

val zkUtils = new ZkUtils(zkClient, zkConnection, false)
val numPartitions = 4
val replicationFactor = 1
val topicConfig = new Properties
val topic = "my-topic"
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig)

@Michael G. Noll 2014-04-29 09:00:25

In Kafka 0.8.1+ -- the latest version of Kafka as of today -- you can programmatically create a new topic via AdminCommand. The functionality of CreateTopicCommand (part of the older Kafka 0.8.0) that was mentioned in one of the previous answers to this question was moved to AdminCommand.

Scala example for Kafka 0.8.1:

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient

// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
// Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
// createTopic() will only seem to work (it will return without error).  The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, connectionTimeoutMs,
    ZKStringSerializer)

// Create a topic named "myTopic" with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, topicConfig)

Build dependencies, using sbt as example:

libraryDependencies ++= Seq(
  "com.101tec" % "zkclient" % "0.4",
  "org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
    exclude("javax.jms", "jms")
    exclude("com.sun.jdmk", "jmxtools")
    exclude("com.sun.jmx", "jmxri"),
  ...
)

EDIT: Added Java example for Kafka 0.9.0.0 (latest version as of Jan 2016).

Maven dependencies:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.7</version>
</dependency>

Code:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

  public static void main(String[] args) {
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;
    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(
        zookeeperConnect,
        sessionTimeoutMs,
        connectionTimeoutMs,
        ZKStringSerializer$.MODULE$);

    // Security for Kafka was added in Kafka 0.9.0.0
    boolean isSecureKafkaCluster = false;
    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
    zkClient.close();
  }

}

EDIT 2: Added Java example for Kafka 0.10.2.0 (latest version as of April 2017).

Maven dependencies:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.2.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.9</version>
</dependency>

Code:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

  public static void main(String[] args) {
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here

    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(
        zookeeperConnect,
        sessionTimeoutMs,
        connectionTimeoutMs,
        ZKStringSerializer$.MODULE$);

    // Security for Kafka was added in Kafka 0.9.0.0
    boolean isSecureKafkaCluster = false;

    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
    zkClient.close();
  }

}

@mkhq 2014-05-03 15:54:16

Note that when not initializing the ZkClient with ZKStringSerializer, createTopic will return without error. The topic will exist in zookeeper and be returned when listing topics, but Kafka itself does not create the topic.

@Chee Loong Soon 2016-03-16 21:12:24

@quux00 You can easily create it using this code that I wrote: ZkClient zkClient = new ZkClient(zkServer); zkClient.setZkSerializer(ZKStringSerializer$.MODULE$); Refer to this: source code for zkClient link

@Sanket 2013-06-23 07:59:56

From which IDE are your trying ?

Please provide complete path , below are the command from terminal which will create a topic

  1. cd kafka/bin
  2. ./kafka-create-topic.sh --topic test --zookeeper localhost:2181

@Dmitriusan 2016-07-18 17:26:54

If you are using Kafka 0.10.0.0+, creating topic from Java requires passing parameter of RackAwareMode type. It's a Scala case object, and getting it's instance from Java is tricky (proof: How do I "get" a Scala case object from Java? for example. But it is not applicable for our case).

Luckily, rackAwareMode is an optional parameter. Yet Java does not support optional parameters. How do we solve that? Here is a solution:

AdminUtils.createTopic(zkUtils, topic, 1, 1, 
    AdminUtils.createTopic$default$5(),
    AdminUtils.createTopic$default$6());

Use it with miguno's answer, and you are good to go.

@Hild 2013-08-23 22:49:19

From Kafka 0.8 Producer Example the sample below will create a topic named page_visits and also start producing if the auto.create.topics.enable attribute is set to true (default) in the Kafka Broker config file

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();

        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        for (long nEvents = 0; nEvents < events; nEvents++) { 
            long runtime = new Date().getTime();  
            String ip = “192.168.2.” + rnd.nextInt(255); 
            String msg = runtime + “,www.example.com,” + ip; 
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
            producer.send(data);
        }
        producer.close();
   }
}

@Morgan Kenyon 2015-12-10 18:35:56

I'm sure you're aware that this doesn't technically create a topic via an API as the OP asks. It just uses the auto.create.topics.enable feature to create a topic whenever a non existent topic is referenced. And using this method there is no way to specify topic settings other than the default hard coded in broker config files.

@Biks 2013-08-28 06:20:44

You can try with kafka.admin.CreateTopicCommand scala class to create Topic from Java code...providng the necessary arguments.

String [] arguments = new String[8];
arguments[0] = "--zookeeper";
arguments[1] = "10.***.***.***:2181";
arguments[2] = "--replica";
arguments[3] = "1";
arguments[4] = "--partition";
arguments[5] = "1";
arguments[6] = "--topic";
arguments[7] = "test-topic-Biks";

CreateTopicCommand.main(arguments);

NB: You should add the maven dependencies for jopt-simple-4.5 & zkclient-0.1

@Jaya Ananthram 2014-11-20 12:22:53

For creating a topic through java api and Kafka 0.8+ try the following,

First import below statement

import kafka.utils.ZKStringSerializer$;

Create object for ZkClient in the following way,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

@Gregory Patmore 2013-12-19 23:16:51

A few ways your call wouldn't work.

  1. If your Kafka cluster didn't have enough nodes to support a replication value of 3.

  2. If there is a chroot path prefix you have to append it after the zookeeper port

  3. You arent in the Kafka install directory when running (This is the most likely)

Related Questions

Sponsored Content

38 Answered Questions

[SOLVED] Can't execute jar- file: "no main manifest attribute"

36 Answered Questions

[SOLVED] Create ArrayList from array

31 Answered Questions

58 Answered Questions

[SOLVED] How to create a memory leak in Java?

44 Answered Questions

[SOLVED] Can't start Eclipse - Java was started but returned exit code=13

21 Answered Questions

[SOLVED] How do I call one constructor from another in Java?

  • 2008-11-12 20:10:19
  • ashokgelal
  • 872322 View
  • 2274 Score
  • 21 Answer
  • Tags:   java constructor

12 Answered Questions

[SOLVED] List all kafka topics

2 Answered Questions

[SOLVED] Error creating topic in Apache Kafka

1 Answered Questions

[SOLVED] How to permanently delete kafka topic?

  • 2016-06-28 18:26:52
  • user432024
  • 4529 View
  • 1 Score
  • 1 Answer
  • Tags:   apache-kafka

2 Answered Questions

[SOLVED] Where do Kafka stores the topic in a multi node cluster?

  • 2015-12-11 11:39:38
  • Plaban
  • 731 View
  • 0 Score
  • 2 Answer
  • Tags:   java apache-kafka

Sponsored Content