做网站的网络公司税收优惠,画册设计是什么,网站建设-好发信息网,网页制作教程pdf半夜被电话叫醒#xff0c;消息积压了200万条#xff0c;消费者根本追不上。
这种场景搞过Kafka的应该都经历过#xff0c;整理一下踩过的坑和解决方案。
坑一#xff1a;消息积压
现象
监控告警#xff1a;topic-order的lag超过100万。
# 查看消费者lag
kafka-consumer-g…半夜被电话叫醒消息积压了200万条消费者根本追不上。这种场景搞过Kafka的应该都经历过整理一下踩过的坑和解决方案。坑一消息积压现象监控告警topic-order的lag超过100万。# 查看消费者lagkafka-consumer-groups.sh --bootstrap-server localhost:9092\--describe --group order-consumer GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG order-consumer topic-order0123456723456781111111order-consumer topic-order1123456823456791111111order-consumer topic-order2123456923456801111111三个分区每个积压100多万加起来300多万。排查过程1. 先看生产速度# 查看topic的写入速度kafka-run-class.sh kafka.tools.GetOffsetShell\--broker-list localhost:9092\--topic topic-order --time -1# 隔10秒再执行一次算差值# 发现每秒写入约5000条2. 再看消费速度消费者日志显示处理一条消息要200ms算下来每秒只能处理5条。问题找到了消费太慢。解决方案方案一增加消费者实例Kafka的分区数决定了最大并行度。3个分区最多3个消费者并行。# 先增加分区注意分区只能增不能减kafka-topics.sh --bootstrap-server localhost:9092\--alter --topic topic-order --partitions12然后部署12个消费者实例。方案二批量消费// 原来一条一条处理KafkaListener(topicstopic-order)publicvoidconsume(Stringmessage){processOrder(message);// 200ms}// 优化后批量处理KafkaListener(topicstopic-order)publicvoidconsumeBatch(ListStringmessages){// 攒一批再处理减少IO次数batchProcessOrders(messages);// 批量写库}配置调整spring:kafka:consumer:max-poll-records:500# 一次拉取500条listener:type:batch# 批量模式方案三异步处理KafkaListener(topicstopic-order)publicvoidconsume(Stringmessage){// 扔到线程池异步处理executor.submit(()-processOrder(message));}但要注意异步处理需要手动管理offset提交不然可能丢消息。效果优化后消费速度从5条/秒提升到3000条/秒积压2小时内消化完。坑二消息丢失现象业务反馈有订单没收到但生产端日志显示发送成功了。排查1. 生产端配置props.put(acks,1);// 问题在这acks1表示leader收到就返回成功但如果leader挂了、follower还没同步消息就丢了。2. 消费端配置props.put(enable.auto.commit,true);props.put(auto.commit.interval.ms,1000);自动提交offset如果消费处理到一半程序挂了offset已经提交了这条消息就丢了。解决方案生产端// acksall所有ISR副本都写入才算成功props.put(acks,all);// 重试次数props.put(retries,3);// 开启幂等性props.put(enable.idempotence,true);消费端// 关闭自动提交props.put(enable.auto.commit,false);// 手动提交KafkaListener(topicstopic-order)publicvoidconsume(ConsumerRecordString,Stringrecord,Acknowledgmentack){try{processOrder(record.value());ack.acknowledge();// 处理成功才提交}catch(Exceptione){// 处理失败不提交会重新消费log.error(处理失败,e);}}Broker端# 最小ISR副本数 min.insync.replicas2 # 不允许非ISR副本选举为leader unclean.leader.election.enablefalse坑三重复消费现象同一条消息被处理了两次导致订单重复扣款。原因消费者处理完消息还没来得及提交offset就挂了。重启后从上次提交的offset开始消费这条消息又被消费一次。Kafka是at-least-once语义不保证exactly-once。解决方案业务幂等publicvoidprocessOrder(Stringmessage){OrderorderJSON.parseObject(message,Order.class);// 先查是否已处理过if(orderService.exists(order.getOrderId())){log.info(订单已处理过跳过: {},order.getOrderId());return;}// 处理订单orderService.process(order);}Redis去重publicvoidprocessOrder(Stringmessage){StringmsgIdextractMsgId(message);// Redis SETNX已存在返回falsebooleanisNewredis.setIfAbsent(kafka:processed:msgId,1,24,TimeUnit.HOURS);if(!isNew){log.info(消息已处理过: {},msgId);return;}// 处理业务doProcess(message);}数据库唯一约束-- 用唯一约束兜底CREATEUNIQUEINDEXuk_order_idONorders(order_id);坑四消费者频繁Rebalance现象日志里频繁出现Revoking previously assigned partitions Rebalance triggered消费者不停地Rebalance效率极低。原因1. 心跳超时// 默认10秒没心跳就认为消费者挂了session.timeout.ms10000如果处理一条消息超过10秒就会被踢出消费组。2. poll间隔太长// 默认5分钟内必须调用pollmax.poll.interval.ms300000处理500条消息花了6分钟超时了。解决方案// 增加session超时时间props.put(session.timeout.ms,30000);props.put(heartbeat.interval.ms,10000);// 增加poll间隔props.put(max.poll.interval.ms,600000);// 减少单次拉取数量props.put(max.poll.records,100);核心原则确保在max.poll.interval.ms内能处理完max.poll.records条消息。坑五顺序消费需求同一个用户的操作必须按顺序处理。问题默认情况下消息分散到不同分区不同分区的消费顺序无法保证。解决方案指定分区key// 用userId作为key相同userId的消息会落到同一分区kafkaTemplate.send(topic-order,userId,message);单分区方案不推荐除非量很小// 只用一个分区保证全局顺序kafkaTemplate.send(topic-order,0,null,message);注意事项同一分区内保证顺序但重试可能打乱顺序设置max.in.flight.requests.per.connection1保证严格顺序props.put(max.in.flight.requests.per.connection,1);性能调优参数生产者# 批量发送攒够16K或等1ms就发 batch.size16384 linger.ms1 # 发送缓冲区 buffer.memory33554432 # 压缩推荐lz4 compression.typelz4消费者# 单次拉取大小 fetch.min.bytes1 fetch.max.bytes52428800 fetch.max.wait.ms500 # 单次poll记录数 max.poll.records500Broker# 日志保留 log.retention.hours168 log.retention.bytes1073741824 # 分区数根据消费者数量设置 num.partitions12 # 副本 default.replication.factor3 min.insync.replicas2监控指标这几个指标必须监控指标含义报警阈值ConsumerLag消费延迟根据业务定MessagesInPerSec写入速度突增报警BytesInPerSec流量接近带宽报警UnderReplicatedPartitions副本不足的分区0报警OfflinePartitionsCount离线分区0报警集群运维我们的Kafka集群分布在两个机房之前两边网络不通很麻烦。后来用星空组网把两个机房组到一个网络里Kafka的跨机房复制配置简单多了。总结Kafka踩坑清单问题原因解决方案消息积压消费慢加分区、批量消费、异步处理消息丢失acks配置不当acksall、手动提交重复消费at-least-once语义业务幂等、去重频繁Rebalance超时配置不当调整超时参数顺序问题多分区并行指定分区keyKafka本身很稳定大多数问题都是配置和使用不当导致的。有Kafka相关问题欢迎评论区讨论~