ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Kafka Java Producer与kerberos

2019-10-07 06:02:28  阅读:250  来源: 互联网

标签:jaas java apache-kafka kerberos hortonworks-data-platform


在kerberosed环境中向kafka主题发送消息时收到错误.我们在hdp 2.3上有集群

我跟着这个http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/

但是对于发送消息,我必须首先明确地执行kinit,然后才能将消息发送到kafka主题.
我试图通过java类编织,但这也行不通.
PFB代码:

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {

    public static void main(String[] args) {

        String principalName = "ctadmin";
        String keyTabPath = "/etc/security/keytabs/ctadmin.keytab";
        boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath);

        if (!authStatus) {
            System.out.println("Authntication fails, try something else  "  + authStatus);
        } else {
            System.out.println("Authntication successfull " + authStatus);
        }

        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf");
        System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
        System.setProperty("sun.security.krb5.debug", "true");

        try {
            long events = Long.parseLong("3");
            Random rnd = new Random();

            Properties props = new Properties();
            System.out.println("After broker list- " + args[0]);

            props.put("metadata.broker.list", args[0]);
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks", "1");
            props.put("security.protocol", "PLAINTEXTSASL");

            //props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner");


            System.out.println("After config prop -1");

            ProducerConfig config = new ProducerConfig(props);

            System.out.println("After config prop -2 config" + config);

            Producer<String, String> producer = new Producer<String, String>(config);

            System.out.println("After config prop -3");

            for (long nEvents = 0L; nEvents < events; nEvents += 1L) {
                Date runtime = new Date();
                String ip = "192.168.2" + rnd.nextInt(255);
                String msg = runtime + " www.example.com, " + ip;
                KeyedMessage<String, String> data = new KeyedMessage<String, String>("test_march4", ip, msg);

                System.out.println("After config prop -1 data" + data);

                producer.send(data);
            }
            producer.close();

        } catch (Throwable th) {
            th.printStackTrace();

        }
    }
}

Pom.xml:从hortonworks repo下载的所有依赖项.

        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>0.9.0.2.3.4.0-3485</version>
            </dependency>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.9.0.2.3.4.0-3485</version>
            </dependency>

            <dependency>
                <groupId>org.jasypt</groupId>
                <artifactId>jasypt-spring31</artifactId>
                <version>1.9.2</version>
                <scope>compile</scope>
            </dependency>

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.1.2.3.4.0-3485</version>
            </dependency>

        </dependencies>

错误:
案例1:当我指定myuser kafka_jass.conf时

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
After config prop -2 configkafka.producer.ProducerConfig@643293ae
java.lang.SecurityException: Configuration Error:
        Line 6: expected [controlFlag]
        at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:110)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:379)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:258)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:250)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:249)
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:291)
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
        at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
        at kafka.producer.Producer.<init>(Producer.scala:50)
        at kafka.producer.Producer.<init>(Producer.scala:73)
        at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
        at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)
Caused by: java.io.IOException: Configuration Error:
        Line 6: expected [controlFlag]
        at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:563)
        at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:413)
        at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383)
        at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283)
        at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:219)
        at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:108)

MyUser_Kafka_jass.conf

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   doNotPrompt=true
   useTicketCache=true
   renewTicket=true
   principal="ctadmin/prod-dev1-dn1@PROD.COM";
   useKeyTab=true
   serviceName="kafka"
   keyTab="/etc/security/keytabs/ctadmin.keytab"
   client=true;
};
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/ctadmin.keytab"
   storeKey=true
   useTicketCache=true
   serviceName="zookeeper"
   principal="ctadmin/prod-dev1-dn1@PROD.COM";
};

case2:当我指定Kafkas自己的jaas文件时

Java config name: /etc/krb5.conf
Loaded from Java config
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner  authentication information from the user
        at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)
        at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
        at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
        at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)
        at javax.security.auth.login.LoginContext.login(LoginContext.java:595)
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298)
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
        at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
        at kafka.producer.Producer.<init>(Producer.scala:50)
        at kafka.producer.Producer.<init>(Producer.scala:73)
        at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
        at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)

这很好,如果我在运行这个应用程序之前做了kinit,否则它将通过上面的错误.
我不能在我的生产环境中这样做,如果我们的应用程序本身有任何方法可以做到这一点,那么请帮助我.
如果您需要更多详细信息,请与我们联系.

谢谢:)

解决方法:

我不知道第一次犯了什么错误,在我再次做的事情之下,它运作正常.

首先给出所有主题的访问权限:

bin/kafka-acls.sh --add --allow-principals user:ctadmin --operation ALL --topic marchTesting --authorizer-properties zookeeper.connect={hostname}:2181

创建jass文件:
卡夫卡的Jaas.conf

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 doNotPrompt=true
 useTicketCache=true
 principal="ctadmin@HSCALE.COM"
 useKeyTab=true
 serviceName="kafka"
 keyTab="/etc/security/keytabs/ctadmin.keytab"
 client=true;
};

Java程序:

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer {

    public static void main(String[] args) {
        String topic = args[0];

        Properties props = new Properties();
        props.put("metadata.broker.list", "{Hostname}:6667");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        props.put("security.protocol", "PLAINTEXTSASL");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);

        for (int i = 0; i < 10; i++){
            producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date()));
        }
    }
}

运行申请:

java -Djava.security.auth.login.config = / home / ctadmin / kafka-jaas.conf -Djava.security.krb5.conf = / etc / krb5.conf -Djavax.security.auth.useSubjectCredsOnly = true -cp kafka -testing-0.0.1-jar-with-dependencies.jar com.ct.test.kafka.KafkaProducer

标签:jaas,java,apache-kafka,kerberos,hortonworks-data-platform
来源: https://codeday.me/bug/20191007/1864916.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有