Estoy tratando de ejecutar Kafka-0.8 Log4j appender y soy incapaz de hacerlo.
Quiero que mi aplicación para enviar registro directamente a kafka a través de Log4j appender.

Aquí está mi log4j.propiedades.
No pude encontrar ningún codificador, por lo que acabo de configurarlo para que use default codificador.
(e.g me comentó la línea).

log4j.rootLogger=INFO, stdout, KAFKA

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n

log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.layout.ConversionPattern=%-5p: %c - %m%n
log4j.appender.KAFKA.BrokerList=hnode01:9092
log4j.appender.KAFKA.Topic=DKTestEvent

#log4j.appender.KAFKA.SerializerClass=kafka.log4j.AppenderStringEncoder

Y esta es mi aplicación de ejemplo.

import org.apache.log4j.Logger;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.PropertyConfigurator;

public class HelloWorld {

        static Logger logger = Logger.getLogger(HelloWorld.class.getName());

        public static void main(String[] args) {
            PropertyConfigurator.configure(args[0]);

            logger.info("Entering application.");
            logger.debug("Debugging!.");
            logger.info("Exiting application.");
        }
}

He utilizado maven para la compilación.
He incluido kafka_2.8.2-0.8.0 y log4j_1.2.17 en mi pom.xml

Y estoy recibiendo estos errores:

INFO [main] (Logging.scala:67) - Verifying properties
INFO [main] (Logging.scala:67) - Property metadata.broker.list is overridden to hnode01:9092
INFO [main] (Logging.scala:67) - Property serializer.class is overridden to kafka.serializer.StringEncoder
INFO [main] (HelloWorld.java:14) - Entering application.
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 0 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 1 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 2 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 3 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 4 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 5 for 1 topic(s) Set(DKTestEvent)
.
.
.
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 60 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 61 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 62 for 1 topic(s) Set(DKTestEvent)
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 63 for 1 topic(s) Set(DKTestEvent)
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 64 for 1 topic(s) Set(DKTestEvent)
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 65 for 1 topic(s) Set(DKTestEvent)
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 66 for 1 topic(s) Set(DKTestEvent)
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 67 for 1 topic(s) Set(DKTestEvent)
.
.
.
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 534 for 1 topic(s) Set(DKTestEvent)
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
java.lang.StackOverflowError
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
at org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
at org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.error(Category.java:322)
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
at kafka.utils.Utils$.swallow(Utils.scala:189)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.info(Category.java:666)
at kafka.utils.Logging$class.info(Logging.scala:67)
at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:187)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
.
.
.

Estoy recibiendo por encima de error continuamente si yo no terminar el programa.

Si me olvido de algo, por favor, hágamelo saber.

OriginalEl autor style95 | 2014-02-26

3 Comentarios

  1. 5

    Creo que Jonas ha identificado el problema, que es el de Kafka productor de registro es también la registra en el Kafka appender causando un bucle infinito y la eventual desbordamiento de pila (sin juego de palabras)
    Puede configurar todos los registros de Kafka para ir a otra appender. La siguiente muestra de enviar la salida a stdout:

    log4j.logger.kafka=INFO, stdout
    

    Así que usted debe terminar con la siguiente en el log4j.propiedades

    log4j.rootLogger=INFO, stdout, KAFKA
    log4j.logger.kafka=INFO, stdout
    log4j.logger.HelloWorld=INFO, KAFKA
    

    OriginalEl autor Naveen Warusavithana

  2. 2

    Me han sido capaces de generar eventos a través de log4j en Kafka 0.8.2.2. Aquí está mi configuración de log4j:

    <?xml version="1.0" encoding="UTF-8" ?>
    <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
    
    <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
    
       <appender name="console" class="org.apache.log4j.ConsoleAppender">
          <param name="Target" value="System.out" />
          <layout class="org.apache.log4j.PatternLayout">
             <param name="ConversionPattern" value="%-5p %c{1} - %m%n" />
          </layout>
       </appender>
       <appender name="fileAppender" class="org.apache.log4j.RollingFileAppender">
          <param name="Threshold" value="INFO" />
          <param name="MaxBackupIndex" value="100" />
          <param name="File" value="/tmp/agna-LogFile.log" />
          <layout class="org.apache.log4j.PatternLayout">
             <param name="ConversionPattern" value="%d  %-5p  [%c{1}] %m %n" />
          </layout>
       </appender>
       <appender name="kafkaAppender" class="kafka.producer.KafkaLog4jAppender">
          <param name="Topic" value="kafkatopic" />
          <param name="BrokerList" value="localhost:9092" />
          <param name="syncSend" value="true" />
          <layout class="org.apache.log4j.PatternLayout">
             <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n" />
          </layout>
       </appender>
       <logger name="org.apache.kafka">
          <level value="error" />
          <appender-ref ref="console" />
       </logger>
       <logger name="com.example.kafkaLogger">
          <level value="debug" />
          <appender-ref ref="kafkaAppender" />
       </logger>
       <root>
          <priority value="debug" />
          <appender-ref ref="console" />
       </root>
    </log4j:configuration>
    

    Aquí está el código fuente:

    package com.example;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import org.json.simple.JSONArray;
    import org.json.simple.JSONObject;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.KafkaProducer;
    
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    public class JsonProducer {
        static Logger defaultLogger = LoggerFactory.getLogger(JsonProducer.class);
        static Logger kafkaLogger = LoggerFactory.getLogger("com.example.kafkaLogger");
    
        public static void main(String args[]) {
    
            JsonProducer obj = new JsonProducer();
    
            String str = obj.getJsonObjAsString();
    
            //Use the logger
            kafkaLogger.info(str);
    
            try {
                //Construct and send message
                obj.constructAndSendMessage();
            } catch (InterruptedException e) {
                defaultLogger.error("Caught interrupted exception " + e);
            } catch (ExecutionException e) {
                defaultLogger.error("Caught execution exception " + e);
            }   
        }
    
        private String getJsonObjAsString() {
            JSONObject obj = new JSONObject();
            obj.put("name", "John");
            obj.put("age", new Integer(55));
            obj.put("address", "123 MainSt, Palatine, IL");
    
            JSONArray list = new JSONArray();
            list.add("msg 1");
            list.add("msg 2");
            list.add("msg 3");
    
            obj.put("messages", list);
    
            return obj.toJSONString();
        }
    
        private void constructAndSendMessage() throws InterruptedException, ExecutionException {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    
            boolean sync = false;
            String topic = "kafkatopic";
            String key = "mykey";
            String value = "myvalue1 mayvalue2 myvalue3";
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value);
            if (sync) {
                producer.send(producerRecord).get();
            } else {
                producer.send(producerRecord);
            }
            producer.close();
        }
    }
    

    Todo el proyecto está disponible en el siguiente enlace:

    https://github.com/ypant/kafka-json-producer.git

    OriginalEl autor Yagna Pant

  3. 1

    Intenta establecer el appender asincrónica, como este:
    log4j.appender.KAFKA.ProducerType=async

    Parece razonable que se entra en un bucle infinito porque el kafka productor tiene el registro en sí mismo..

    OriginalEl autor Jonas Bergström

Dejar respuesta

Please enter your comment!
Please enter your name here