Llegar error al enviar el mensaje a kafka tema en kerberosed ambiente. Tenemos grupo de hdp 2.3

He seguido este http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/

Pero para el envío de mensajes, tengo que hacer kinit explícitamente en primer lugar, a continuación, sólo soy capaz de enviar un mensaje a kafka tema.
Traté de hacer, tejer a través de la clase de java, pero que también no funciona.
PFB código:

package com.ct.test.kafka;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static void main(String[] args) {
String principalName = "ctadmin";
String keyTabPath = "/etc/security/keytabs/ctadmin.keytab";
boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath);
if (!authStatus) {
System.out.println("Authntication fails, try something else  "  + authStatus);
} else {
System.out.println("Authntication successfull " + authStatus);
}
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
System.setProperty("sun.security.krb5.debug", "true");
try {
long events = Long.parseLong("3");
Random rnd = new Random();
Properties props = new Properties();
System.out.println("After broker list- " + args[0]);
props.put("metadata.broker.list", args[0]);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("security.protocol", "PLAINTEXTSASL");
//props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner");
System.out.println("After config prop -1");
ProducerConfig config = new ProducerConfig(props);
System.out.println("After config prop -2 config" + config);
Producer<String, String> producer = new Producer<String, String>(config);
System.out.println("After config prop -3");
for (long nEvents = 0L; nEvents < events; nEvents += 1L) {
Date runtime = new Date();
String ip = "192.168.2" + rnd.nextInt(255);
String msg = runtime + " www.example.com, " + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("test_march4", ip, msg);
System.out.println("After config prop -1 data" + data);
producer.send(data);
}
producer.close();
} catch (Throwable th) {
th.printStackTrace();
}
}
}

Pom.xml : Todas dependencia descargado de hortonworks repo.

        <dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.2.3.4.0-3485</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.2.3.4.0-3485</version>
</dependency>
<dependency>
<groupId>org.jasypt</groupId>
<artifactId>jasypt-spring31</artifactId>
<version>1.9.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1.2.3.4.0-3485</version>
</dependency>
</dependencies>

De Error :
Case1 : cuando me especificar myuser kafka_jass.conf

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
After config prop -2 configkafka.producer.[email protected]
java.lang.SecurityException: Configuration Error:
Line 6: expected [controlFlag]
at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:110)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)
at javax.security.auth.login.Configuration$2.run(Configuration.java:258)
at javax.security.auth.login.Configuration$2.run(Configuration.java:250)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:249)
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:291)
at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
at kafka.producer.Producer.<init>(Producer.scala:50)
at kafka.producer.Producer.<init>(Producer.scala:73)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)
Caused by: java.io.IOException: Configuration Error:
Line 6: expected [controlFlag]
at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:563)
at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:413)
at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383)
at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283)
at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:219)
at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:108)

MyUser_Kafka_jass.conf

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
renewTicket=true
principal="ctadmin/[email protected]";
useKeyTab=true
serviceName="kafka"
keyTab="/etc/security/keytabs/ctadmin.keytab"
client=true;
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/ctadmin.keytab"
storeKey=true
useTicketCache=true
serviceName="zookeeper"
principal="ctadmin/[email protected]";
};

case2 : Cuando me especificar Kafka propio jaas archivo

Java config name: /etc/krb5.conf
Loaded from Java config
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner  authentication information from the user
at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)
at javax.security.auth.login.LoginContext.login(LoginContext.java:595)
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298)
at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
at kafka.producer.Producer.<init>(Producer.scala:50)
at kafka.producer.Producer.<init>(Producer.scala:73)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)

Esto funciona bien, si hago kinit antes de ejecutar esta aplicación, de lo contrario será a través de error anterior.
Yo no puedo hacer esto en mi entorno de producción, si hay alguna forma de hacer esto por nuestra propia aplicación, por favor que me ayude.
Por favor, hágamelo saber si usted necesita más detalles.

Gracias:)

OriginalEl autor Kalpesh | 2016-03-11

2 Comentarios

  1. 1

    No sé qué error hizo la primera vez, por debajo de las cosas que lo hice de nuevo, y funciona bien.

    Primero dar a todos el acceso a tema:

    bin/kafka-acls.sh --add --allow-principals user:ctadmin --operation ALL --topic marchTesting --authorizer-properties zookeeper.connect={hostname}:2181

    crear jass archivo:
    kafka-jaas.conf

    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    doNotPrompt=true
    useTicketCache=true
    principal="[email protected]"
    useKeyTab=true
    serviceName="kafka"
    keyTab="/etc/security/keytabs/ctadmin.keytab"
    client=true;
    };

    Programa Java:

    package com.ct.test.kafka;
    import java.util.Date;
    import java.util.Properties;
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    public class KafkaProducer {
    public static void main(String[] args) {
    String topic = args[0];
    Properties props = new Properties();
    props.put("metadata.broker.list", "{Hostname}:6667");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("security.protocol", "PLAINTEXTSASL");
    ProducerConfig config = new ProducerConfig(props);
    Producer<String, String> producer = new Producer<String, String>(config);
    for (int i = 0; i < 10; i++){
    producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date()));
    }
    }
    }

    Ejecutar la aplicación:

    java -Djava.de seguridad.auth.de inicio de sesión.config=/home/ctadmin/kafka-jaas.conf -Djava.de seguridad.krb5.conf=/etc/krb5.conf -Djavax.de seguridad.auth.useSubjectCredsOnly=true -cp kafka-testing-0.0.1-jar-with-dependencies.jar com.ct.prueba.kafka.KafkaProducer

    Llego a continuación Excepción ERROR Utils$:106 - fetching topic metadata for topics [Set(test3)] from broker [ArrayBuffer(id:0,host:D-9539.mydomain.com,port:6667)] failed kafka.common.KafkaException: fetching topic metadata for topics [Set(test3)] from broker [ArrayBuffer(id:0,host:D-9539.mydomain.com,port:6667)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
    Estoy luchando para escribir un Java productor de cliente para conectarse con Kafka productor, que es compatible con Kerberos. Estoy recibiendo KrbException: Identificador no coincide con el valor esperado (906) error.

    OriginalEl autor Kalpesh

  2. 1

    El error está en un punto y coma que tiene en su jaas archivo como se puede ver en esta pieza de salida:

    Line 6: expected [controlFlag]

    Esta línea no puede tener el carácter de punto y coma:

    principal="ctadmin/[email protected]";

    que sólo puede existir en la última línea:

    OriginalEl autor victorgp

Dejar respuesta

Please enter your comment!
Please enter your name here