该项目可用于java、spark和hadoop项目的运行日志转发到kafka/飞书
- 工程中的日志通过该SDK收集发送到kafka,通过logstash收集到ES,通过kibana建立项目日志索引区分查看
- 日志直接输出飞书
--jars libs/kafka-clients-1.0.1.3.0.0.0-1634.jar, \ --files spark-log4j.properties, \ --conf spark.driver.extraJavaOptions="-Dlog4j.configuration=spark-log4j.properties" \ --conf spark.executor.extraJavaOptions="-Dlog4j.configuration=spark-log4j.properties" \
- 详见Spark和Flink官网
- 接入日志方式类似,详见下文中logback和log4j
<?xml version="1.0" encoding="UTF-8" ?> <configuration scan="true" scanPeriod="3 seconds"> <!--设置日志输出为控制台--> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%logger{32}] %msg%n</pattern> </encoder> </appender> <appender name="KAFKA" class="io.github.duhanmin.router.log.kafka.logback.KafkaLogbackAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>WARN</level> </filter> <syncSend>false</syncSend> <brokerList>ip:port</brokerList> <appName>hotwheels-api</appName> <topic>HOTWHEELS-REALTIME_LOG</topic> </appender> <!--设置日志输出为文件--> <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <File>logFile.log</File> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <FileNamePattern>logFile.%d{yyyy-MM-dd_HH-mm}.log.zip</FileNamePattern> </rollingPolicy> <layout class="ch.qos.logback.classic.PatternLayout"> <Pattern>%d{HH:mm:ss,SSS} [%thread] %-5level %logger{32} - %msg%n</Pattern> </layout> </appender> <root> <level value="info"/> <appender-ref ref="STDOUT"/> <appender-ref ref="FILE"/> <appender-ref ref="KAFKA"/> </root> </configuration>
log4j.rootLogger=INFO,console,KAFKA ## appender KAFKA log4j.appender.KAFKA=com.router.log.kafka.log4jappender.KafkaLog4jAppender log4j.appender.KAFKA.topic=topic log4j.appender.KAFKA.appName=name log4j.appender.KAFKA.brokerList=ip:port log4j.appender.KAFKA.compressionType=none log4j.appender.KAFKA.syncSend=true log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout log4j.appender.KAFKA.Threshold=WARN log4j.appender.KAFKA.layout.ConversionPattern=%d (%t) [%p - %l] %m%n ## appender console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n
可以将日志打到飞书
<?xml version="1.0" encoding="UTF-8"?> <configuration status="INFO" monitorInterval="30" packages="logger.FileAppender"> <appenders> <!--这个输出控制台的配置--> <console name="Console" target="SYSTEM_OUT"> <!--输出日志的格式--> <PatternLayout pattern="%highlight{[ %p ] [%-d{yyyy-MM-dd HH:mm:ss}] [%l] %m%n}"/> </console> <!-- 这个就是自定义的Appender --> <FeiShuLog4j2Appender name="FeiShuLog4j2Appender" appName = "FeiShuLog4j2Appender" syncSend="true" url="https://open.feishu.cn/open-apis/bot/v2/hook/b675ed8d-c7fa-4225-ae42-45a519831456"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/> </FeiShuLog4j2Appender> </appenders> <loggers> <!--过滤掉spring和mybatis的一些无用的DEBUG信息--> <root level="INFO"> <appender-ref ref="Console"/> <appender-ref ref="FeiShuLog4j2Appender"/> </root> </loggers> </configuration>
下载源
<repositories> <repository> <id>maven-repo-master</id> <url>https://raw.github.com/duhanmin/mvn-repo/master/</url> <snapshots> <enabled>true</enabled> <updatePolicy>always</updatePolicy> </snapshots> </repository> </repositories>
包
<dependency> <groupId>com.duhanmin</groupId> <artifactId>log-router</artifactId> <version>0.1.1</version> </dependency>
日志样例
{
"fqnOfCategoryClass": "org.apache.commons.logging.impl.SLF4JLocationAwareLog",
"eventId": "07d643ef-d86a-4c37-9612-28b2b81936e3",
"address": "192.168.2.155",
"level": "ERROR",
"eventChannel": "topic",
"message": "错误来了",
"categoryName": "2019年06月03日 16:02:47,327 (Executor task launch worker for task 3) [ERROR - test.rocketmq.rocketmqT1ドル.process(rocketmqT.java:56)] 错误来了",
"threadName": "Executor task launch worker for task 3",
"timeStamp": "1559548967327",
"throwableInfo": "java.lang.ArithmeticException: / by zero at test.rocketmq.rocketmqT1ドル.process(rocketmqT.java:54) at test.rocketmq.rocketmqT1ドル.process(rocketmqT.java:49) at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch1ドル.apply(ForeachSink.scala:53) at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch1ドル.apply(ForeachSink.scala:49) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition1ドル$$anonfun$apply29ドル.apply(RDD.scala:935) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition1ドル$$anonfun$apply29ドル.apply(RDD.scala:935) at org.apache.spark.SparkContext$$anonfun$runJob5ドル.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob5ドル.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ",
"eventTime": 1559548967341,
"host_name": "DESKTOP-V0FGT1M"
}logstash配置
input {
kafka {
bootstrap_servers => ["ip1:port,ip2:port"]
group_id => "es-transfer"
topics => ["HOTWHEELS-REALTIME_LOG"]
consumer_threads => 5
decorate_events => true
codec => "json"
}
}
output {
elasticsearch {
hosts => ["ip1:port","ip2:port"]
codec => "json"
index => "%{eventChannel}"
}
}https://duhanmin.github.io/2020/04/01/%E9%80%9A%E7%94%A8%E6%97%A5%E5%BF%97%E6%94%B6%E9%9B%86SDK/