rsylog -> kafka1 -> 日志处理job -> kafka2 -> datawrite...
Tạo vào: 8 tháng 1, 2026
Tạo vào: 8 tháng 1, 2026
rsylog -> kafka1 -> 日志处理job -> kafka2 -> datawriter -> es3/es4
如果直接由 由日志处理job 直接写到 es3/es4 是否会出现一些问题
会的,把「日志处理 job」从写 Kafka2 改成直接写 ES3/ES4,通常会引入一类很典型的系统性问题。下面按“会出什么问题 / 什么时候不明显 / 怎么规避”来讲(结合你这条链路的语义:Kafka2 + datawriter 本质上是个缓冲与落盘/写入层)。
ES 的写入能力受很多因素影响:refresh、merge、segment、磁盘、热点 shard、GC、集群重平衡等。
现在有 Kafka2 做缓冲,ES 抖一下,datawriter 慢一点就行;
改成 job 直写 ES,ES 一慢,job 就得背压/阻塞,严重时会导致:
job 直写 ES 时,一旦出现网络闪断、429/503、写超时:
Kafka2 在这里相当于一个可持久化的缓冲 + 可回放的重放点。
Kafka2 + datawriter 往往能把“offset 提交”和“下游写入”解耦成可控流程。
job 直写 ES 时,如果没有稳定的 document_id(幂等 key):
datawriter 往往会做:
Kafka2 把“处理结果”与“写入 ES 的实现细节”隔离开:
有 Kafka2 时你可以:
Kafka2 是天然削峰。没有它:
Kafka2 让“处理结果”可以被多个消费者复用(例如同时写 ES、写 HDFS、写 OLAP)。
直写 ES 之后想加一个下游,往往得回到 job 里改逻辑或再引入一层队列。
一般要同时满足这些条件,直写才不太容易踩坑:
(尽量不改架构也能做得更稳)
_id(例如 trace_id+timestamp+seq / 业务唯一 id),避免重试导致重复。如果目标是减少链路、降低成本或延迟,通常会先做:
如果你告诉我这几个信息,我可以更具体地判断“会不会出问题、会出哪些、严重程度”以及给出推荐方案(不需要你再贴很多东西):
但结论先说在前面:大多数生产环境里,直接让处理 job 写 ES,最容易出问题的就是:ES 抖动导致整条链路背压雪崩 + 重试语义导致重复/丢失 + 补数变难。
日志处理job 是通过 spark 完成的,是scala 写的一个代码,你觉得这一块有提速的空间吗?
有,而且通常Spark 这块提速空间还挺大——但要看你现在的 job 属于哪种“慢”:是 读 Kafka 慢 / 计算慢 / 写下游慢(Kafka2/ES)/ 小文件&shuffle 慢 / GC 慢。在你这条链路里,最常见的瓶颈其实是写出端和Spark 微批/序列化开销,其次才是纯计算。
下面按“最常见可提速点”给你一份 Scala Spark(含 Structured Streaming)的加速清单,基本不需要改架构就能看到收益。
症状
write、foreachBatch、sink stage提速手段
batch.size / linger.ms / buffer.memory,并发 partition 对齐(partition 数别太少)。如果你们 Spark 只是“处理很快但写很慢”,那优化 Spark 本体参数收益有限,应该优先优化 sink。
如果你用的是 Structured Streaming(Kafka source):
maxOffsetsPerTrigger 设得太小会导致频繁触发、调度开销巨大。适当调大(结合延迟目标)。Trigger.ProcessingTime("x seconds") 太短也会被调度开销拖累。foreachBatch 里不要每批 new 连接池/ES client/Producer,改成可复用或用 lazy 单例(注意线程安全)。count/collect/show 等多次 action(会导致多次计算/多次读)。Scala 里很常见的慢点:
from_json(get_json_object) 这类内置。groupByKey / 大聚合 / join 是最贵的。能提前过滤就提前过滤;能 map-side 解决别 shuffle。spark.sql.shuffle.partitions 默认 200 经常不合适:spark.serializer=org.apache.spark.serializer.KryoSerializer(并注册常用 class)state 是提速/提稳的关键点:
你只要看三处就能基本定性:
在 kafka1 -> spark日志处理job -> kafka2 -> datawriter -> es 这种架构里:
如果你愿意给我 3 个信息(发文字就行,不用贴图),我可以直接告诉你“优先动哪 3 个点最可能立刻提速”:
这个的时间 是不是把 sink 也给算进去了?
是的,这个 Duration 是把 sink(你这个 foreach at Util.scala:102)完整算进去的,而且你这张图里已经非常明确了。
我直接结合你这张 Spark UI 来“对号入座”解释一下,避免抽象说法。
在 Spark Streaming / Structured Streaming 里:
一个 Streaming Job 的 Duration = 从触发 batch 开始,到这个 batch 的 output operation(sink)全部执行完成为止
你这里的关键信息有三点:
textStreaming job from [output operation 0, batch time 15:04:30] foreach at Util.scala:102
2/260/60👉 foreach at Util.scala:102 就是 sink
👉 Spark UI 展示的这个 job,本质就是“这个 batch 的 sink 执行 job”
所以:
Duration = 这个 batch 的计算 + shuffle + sink 写出(Kafka2 / ES / 其它 IO)总耗时
因为 Spark Streaming 的模型是这样的(非常关键):
text每个 batch ├── 构建 DAG ├── 执行 transformation ├── 最后执行 output operation(foreach / write / save) └── batch 结束
Spark 不会:
sink 是这个 job 的最后一个 stage,没跑完,job 不会结束。
你 UI 里看到:
2 stages2~3s 波动这说明:
👉 Spark 本身不是瓶颈
如果 sink 慢,典型表现会是:
你这张图:
现在快 ≠ 去掉 Kafka2 也会快
原因是:
如果你改成:
Spark foreach → 直接写 ES
那么:
基于这张图的判断:
👉 当前这条 Spark job 已经“够快了”,短期内不是最值得动的地方
如果你要提速,优先级应该是:
Spark UI 里的 Streaming Job Duration 是包含 sink 的。
从当前数据看,Spark job 本身和写 Kafka2 都非常健康;
如果改成 Spark 直写 ES,Duration 会直接暴露 ES 的所有抖动风险。
如果你愿意,下一步我可以帮你对比两种架构在 Spark UI 上“会长什么样”,让你在讨论“要不要干掉 Kafka2”时有非常直观的论据。
text// 日志解析成map val mappedLogs: DStream[Map[String, Tuple2[String, Any]]] = lines.transform(rdd => { // 向 zookeeper 更新 offset, 因为 RDD 在经过转换之后就无法获取到 offsets 了, 所以在这里更新 offset, 如果计算失败,可能会丢数据 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val jsonOffsetsList = offsetRanges.sortBy(offsets => offsets.topic + offsets.partition.toString).toJson.toString() println(new java.util.Date().toString + jsonOffsetsList) lines.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) OffsetUtil.saveOffsetToHdfs(jobConf.offsetFile, jsonOffsetsList) // pinIndexMap = dynamicLogConf.indexPinSet.flatMap(indexPins => indexPins._2.map(pin => pin -> indexPins._1)) val sha1 = MessageDigest.getInstance("SHA1").digest(jsonOffsetsList.getBytes).map("%02x".format(_)).mkString rdd.map(line => (sha1, line.value())) }).flatMap(line => { // 根据读到的数据里的类型,判断出该数据来自于边缘还是二级源 // 动态修改对应的配置 val cacheLevelIndex = logConfig.cacheLevelIndex val mainKeysSplit = line._2.split(logConfig.mainKeyDelimiter) val dataType = if (mainKeysSplit.length > cacheLevelIndex) { val nodeLevel = mainKeysSplit(cacheLevelIndex) val nodeLevelSplits = nodeLevel.split(",") if (nodeLevelSplits.length < 2) { "unknown" } else { mainKeysSplit(1) = nodeLevelSplits(0) mainKeysSplit(cacheLevelIndex) = nodeLevelSplits(1) if (mainKeysSplit(cacheLevelIndex) == "2") { "internal" } else if (mainKeysSplit(cacheLevelIndex) == "1") { "external" } else { "unknown" } } } dataType match { case "external" => ReqstatLogParse.parse(mainFormatFiledsPop, valueFormatFiledsPop, line._1, mainKeysSplit, logConfig) case "internal" => ReqstatLogParse.parse(mainFormatFiledsOri, valueFormatFiledsOri, line._1, mainKeysSplit, logConfig) case _ => List.empty[Map[String, Tuple2[String, Any]]] } }).filter(log => log.nonEmpty) // 聚合 // 按照设定的维度进行聚合 val schemes = logConfig.schemes.map(schm => { val schemeDef = schm.split("#") Map("schemeName" -> schemeDef(0), "keyFields" -> schemeDef(1).split(":").toList, "statInterval" -> schemeDef(2), "destType" -> schemeDef(3)) }) val output = schemes.foreach(schm => { aggByKey(mappedLogs, schm("keyFields").asInstanceOf[List[String]], logConfig.measures, schm("schemeName").asInstanceOf[String], schm("statInterval").asInstanceOf[String].toLong * 1000, jobConf.samplingFraction, schm("destType").asInstanceOf[String], kafkaSink ) })
这样的kafkasink ,也是一定要,接收到 kafka的返回值,才会完成这个job吗?
不一定。Spark 这个 batch/job 什么时候算“完成”,取决于你这个 kafkaSink 的实现有没有“等 Kafka ack”。
在 DStream 里,一个 stage/task 完成的条件是:你在 foreachRDD / foreachPartition(或你们 kafkaSink 的写出逻辑)里的函数返回了。
也就是说:
.get() 等待(或至少 flush() 确保都发出并收到 ack)→ Spark 的完成时间就会把 Kafka ack 的等待也算进去。Kafka 的 producer.send(record) 默认返回一个 Future[RecordMetadata],不调用 get() 就不会阻塞等 ack。所以很多“kafkaSink”实现是:
send(...)flush() 或不 flush这种情况下,Spark job 完成 ≠ Kafka 已经确认落盘。
看你们 kafkaSink/KafkaSink.scala(或你们封装的类)里有没有类似:
producer.send(...).get() ✅(明确等 ack)futures.foreach(_.get()) ✅producer.flush() ✅(不如 get 严格,但能把已发送/重试跑完一轮)producer.send(...) 然后啥也没做 ❌(基本不等 ack)从你贴的 UI 看(Completed Stages 里 runJob at EsSpark.scala:107 这类),1s 结束很像是“写出逻辑很轻/异步化”,但仅凭截图不能 100% 下结论,关键还是看 sink 实现。
你这里在 transform 里就做了:
scalalines.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) OffsetUtil.saveOffsetToHdfs(...)
而且你自己注释也写了:“如果计算失败, 可能会丢数据”——这不是“可能”,而是只要后面解析/聚合/写 Kafka 任何一步失败,就会出现已提交 offset 但数据没成功写出的丢失。
如果你们希望至少做到 at-least-once(不丢但可能重复),通常要改成:
要做到 effectively-once,还需要配合幂等写(例如下游用稳定 key 作为 _id 或 Kafka 侧开启幂等等)。
“这样的 kafkasink,也是一定要接收到 kafka 的返回值,才会完成这个 job 吗?”
不一定。
kafkaSink 在每个 partition 写完时“阻塞等待 ack”(future.get / futures.get / flush 等),Spark 才会把等待 Kafka 返回值算进 job 完成。如果你把 kafkaSink 那段(大概 20~40 行)贴出来(特别是 foreachPartition 里怎么 send 的),我可以直接告诉你:你们现在是 fire-and-forget、flush-only 还是 严格等 ack,以及对应的“会不会丢 / 会不会重复 / 吞吐会怎样”。
rsyslog -> kafka1 -> reqstat_job(处理日志) -> kafka2 -> datawriter(写入日志) -> es3/es4 -> req-stat 看板
这个东西的学名叫什么? 数据流程? 处理链路?
这个问题问得很好,其实你们这套东西在数据工程/日志平台领域是一个“非常标准、但叫法因场景而异”的架构。我给你一个从“最正式”到“最口语”的命名谱系,方便你在技术文档 / 架构评审 / 对外沟通时选用。
这是一个「实时日志处理数据管道(Real-time Log Processing Pipeline)」
也可以更具体地说是
「基于 Kafka 的实时日志 ETL / 流式数据处理链路」
这是最标准、最中性的学名,几乎所有数据平台都接受。
你的例子完整表达可以是:
实时日志数据处理管道(Real-time Log Data Pipeline)
强调点:
(你已经用得很准确了 👍)
👉 你这条就是:
rsyslog → Kafka → Spark → Kafka → ES 的实时日志处理链路
或者更完整:
基于 Kafka + Spark Streaming 的流式日志处理管道
强调:
如果你站在“业务/平台”视角讲:
👉 在对非大数据背景的人讲时非常好用。
你这套系统可以拆成 5 层:
日志采集层(Log Collection Layer)
消息缓冲 / 解耦层(Messaging / Buffer Layer)
流式处理层(Stream Processing Layer)
存储与索引层(Storage & Indexing Layer)
应用与可视化层(Serving / Visualization Layer)
这一写法在架构评审里非常加分。
你可以直接复制用的版本:
这是一个基于 Kafka 的实时日志流式处理管道,用于对 rsyslog 采集的日志进行实时解析、聚合,并最终写入 Elasticsearch 供 req-stat 看板查询与展示。
如果是你们团队内部 / 文档 / 评审,我建议统一叫:
实时日志处理链路(Real-time Log Processing Pipeline)
如果你愿意,我也可以帮你把这条链路画成一张标准架构图(层次 + 组件 + 数据流向),那在评审或汇报时会非常好用。