深圳市路桥建设集团有限公司招标采购网站百度地图怎么搜街景
深圳市路桥建设集团有限公司招标采购网站,百度地图怎么搜街景,广告设计公司入选合作库评分细则,企业信息查询网站查询在分布式系统中#xff0c;消息队列是实现系统解耦、异步通信和流量削峰的核心组件#xff0c;而 Apache Kafka 凭借其高吞吐量、高可靠性、可扩展性等优势#xff0c;成为当前最主流的消息中间件之一。无论是日志收集、实时数据处理#xff0c;还是微服务间通信#xff0…在分布式系统中消息队列是实现系统解耦、异步通信和流量削峰的核心组件而 Apache Kafka 凭借其高吞吐量、高可靠性、可扩展性等优势成为当前最主流的消息中间件之一。无论是日志收集、实时数据处理还是微服务间通信Kafka 都扮演着至关重要的角色。本文将从 Kafka 的核心概念出发手把手带大家完成从环境搭建、配置优化到消息生产消费的全流程实战帮助新手快速上手 Kafka。一、Kafka 核心概念速览在开始实操前我们需要先理清 Kafka 的核心术语这是理解后续操作的基础。Kafka 的架构相对简洁主要包含以下核心组件1. 生产者Producer消息的发送者负责将业务数据封装成消息并发送到 Kafka 集群。生产者可以通过配置确认机制ACK来保证消息的可靠性同时支持批量发送以提高吞吐量。2. 消费者Consumer消息的接收者从 Kafka 集群中拉取消息并进行业务处理。消费者通常以**消费者组Consumer Group**的形式工作同一消费者组内的消费者共同消费一个主题的消息避免重复消费不同消费者组则可以独立消费同一主题的消息。3. 主题Topic消息的分类容器生产者将消息发送到指定主题消费者从指定主题拉取消息。主题是逻辑上的概念物理上会被划分为多个分区Partition分区是 Kafka 实现高吞吐量和并行处理的核心。4. 分区Partition每个主题可以包含多个分区分区内的消息按发送顺序存储为有序的日志Log并以偏移量Offset标记消息的位置。分区的数量决定了主题的并行处理能力通常建议根据业务吞吐量设置合理的分区数如与消费者组内的消费者数量匹配。5. 副本Replica为保证分区的高可用性Kafka 会为每个分区创建多个副本。副本分为首领副本Leader和跟随者副本Follower生产者和消费者仅与首领副本交互跟随者副本负责同步首领副本的数据当首领副本故障时跟随者副本会通过选举机制成为新的首领。6. 集群Cluster由多个 Kafka 服务器Broker组成每个 Broker 是一个独立的服务节点负责存储分区数据和处理客户端请求。集群通过 ZooKeeper旧版本或 KRaft新版本进行元数据管理、首领选举和负载均衡。核心逻辑总结生产者将消息发送到 Topic 的 Partition消费者组从 Partition 拉取消息Partition 的副本机制保证高可用集群通过协调组件实现分布式管理。二、Kafka 环境搭建与配置Linux 系统Kafka 运行依赖 Java 环境因此首先需要安装 JDK推荐使用 JDK 11 及以上版本。本文以 Kafka 3.6.0 版本支持 KRaft 模式无需依赖 ZooKeeper为例进行讲解。1. 安装前准备配置 Java 环境下载 JDK从 Oracle 官网或 OpenJDK 官网下载 JDK 11例如openjdk-11-jdk_x64_linux.tar.gz。解压并配置环境变量# 解压到指定目录tar -zxvf openjdk-11-jdk_x64_linux.tar.gz -C /usr/local/配置环境变量编辑 /etc/profile 文件echo “export JAVA_HOME/usr/local/openjdk-11” /etc/profileecho “export PATH$JAVA_HOME/bin:$PATH” /etc/profile生效环境变量source /etc/profile验证安装java -version2. 安装并配置 Kafka下载 Kafka从 Kafka 官网https://kafka.apache.org/downloads下载二进制包例如kafka_2.13-3.6.0.tgz2.13 是 Scala 版本3.6.0 是 Kafka 版本。解压 Kafkatar -zxvf kafka_2.13-3.6.0.tgz -C /usr/local/ cd /usr/local/kafka_2.13-3.6.0配置 KRaft 模式重点Kafka 3.0 推荐使用 KRaft 模式替代 ZooKeeper简化集群部署。核心配置文件为config/kraft/server.properties关键配置项如下# 节点唯一 ID集群中每个 Broker 需不同取值范围 0-2147483647node.id1监听地址PLAINTEXT 为无加密协议端口默认 9092listenersPLAINTEXT://:9092广告地址客户端实际连接的地址若为远程访问需配置服务器 IPadvertised.listenersPLAINTEXT://192.168.1.100:9092日志存储目录分区数据和元数据的存储路径log.dirs/usr/local/kafka_2.13-3.6.0/logs集群 ID需先通过命令生成cluster.idabc12345-xxxx-xxxx-xxxx-xxxxxxxxx分区副本数默认 1生产环境建议设置 2-3 保证高可用default.replication.factor2主题默认分区数默认 1根据业务吞吐量调整num.partitions3生成集群 ID 并初始化集群# 生成集群 ID记录输出的 ID配置到 server.properties 中./bin/kafka-storage.sh random-uuid初始化存储目录使用上述生成的集群 ID./bin/kafka-storage.sh format -t 集群ID -c config/kraft/server.properties3. 启动与停止 Kafka 服务启动 Kafka# 前台启动便于查看日志适合调试./bin/kafka-server-start.sh config/kraft/server.properties后台启动生产环境推荐./bin/kafka-server-start.sh -daemon config/kraft/server.properties停止 Kafka./bin/kafka-server-stop.sh验证服务状态通过端口监听确认 Kafka 是否启动成功默认端口 9092netstat -tuln | grep 9092三、Kafka 核心操作主题、生产者与消费者Kafka 提供了命令行工具用于管理主题、测试生产消费同时也支持通过 Java、Python 等语言的客户端进行开发。本节先讲解命令行操作再介绍 Java 客户端的实战代码。1. 主题管理命令行主题是消息的载体所有生产消费操作都围绕主题展开常见操作如下# 1. 创建主题指定分区数 3副本数 2./bin/kafka-topics.sh --create --topic test_topic --partitions3--replication-factor2--bootstrap-server192.168.1.100:9092# 2. 查看所有主题./bin/kafka-topics.sh --list --bootstrap-server192.168.1.100:9092# 3. 查看主题详情分区、副本分布等./bin/kafka-topics.sh --describe --topic test_topic --bootstrap-server192.168.1.100:9092# 4. 修改主题例如增加分区数注意分区数只能增加不能减少./bin/kafka-topics.sh --alter --topic test_topic --partitions5--bootstrap-server192.168.1.100:9092# 5. 删除主题需确保 server.properties 中 delete.topic.enabletrue./bin/kafka-topics.sh --delete --topic test_topic --bootstrap-server192.168.1.100:90922. 命令行生产消费测试通过 Kafka 自带的命令行工具可以快速测试主题的生产消费功能启动消费者监听 test_topic 主题./bin/kafka-console-consumer.sh --topic test_topic --bootstrap-server 192.168.1.100:9092 --from-beginning参数说明--from-beginning表示从主题的第一条消息开始消费。启动生产者向 test_topic 主题发送消息./bin/kafka-console-producer.sh --topic test_topic --bootstrap-server 192.168.1.100:9092输入任意文本并回车即可在消费者终端看到对应的消息说明生产消费流程正常。3. Java 客户端实战生产消费消息实际开发中我们更多通过 Kafka 客户端 API 实现生产消费。Kafka 官方提供了 Java 客户端以下是基于 Kafka 3.6.0 的实战代码。步骤 1引入依赖MavendependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion3.6.0/version/dependency步骤 2生产者代码实现发送消息生产者核心配置包括集群地址、消息序列化方式、ACK 确认机制等代码如下importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;publicclassKafkaProducerDemo{// Kafka 集群地址privatestaticfinalStringBOOTSTRAP_SERVERS192.168.1.100:9092;// 主题名称privatestaticfinalStringTOPICtest_topic;publicstaticvoidmain(String[]args){// 1. 配置生产者参数PropertiespropsnewProperties();// 集群地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);// 消息键的序列化方式键用于分区分配相同键的消息会发送到同一分区props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 消息值的序列化方式props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// ACK 确认机制1首领副本接收成功即返回all所有同步副本接收成功才返回可靠性最高props.put(ProducerConfig.ACKS_CONFIG,all);// 重试次数消息发送失败时的重试次数props.put(ProducerConfig.RETRIES_CONFIG,3);// 批量发送大小当消息达到 16KB 时批量发送props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// 批量发送延迟10ms 内未达到批量大小也会发送props.put(ProducerConfig.LINGER_MS_CONFIG,10);// 2. 创建生产者实例KafkaProducerString,StringproducernewKafkaProducer(props);// 3. 发送消息同步发送便于获取发送结果异步发送可通过回调函数处理结果try{for(inti0;i10;i){// 构建消息参数主题、消息键、消息值ProducerRecordString,Stringgt;recordnewProducerRecord(TOPIC,key-i,Kafka message - i);// 同步发送消息producer.send(record).get();System.out.println(Sent message: i);}}catch(Exceptione){e.printStackTrace();}finally{// 关闭生产者释放资源producer.close();}}}步骤 3消费者代码实现拉取消息消费者核心配置包括集群地址、消息反序列化方式、消费者组 ID、偏移量提交方式等代码如下importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.Collections;importjava.util.Properties;publicclassKafkaConsumerDemo{privatestaticfinalStringBOOTSTRAP_SERVERS192.168.1.100:9092;privatestaticfinalStringTOPICtest_topic;// 消费者组 ID同一组内消费者共同消费主题组 ID 必须唯一privatestaticfinalStringGROUP_IDtest_consumer_group;publicstaticvoidmain(String[]args){// 1. 配置消费者参数PropertiespropsnewProperties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);// 消费者组 IDprops.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);// 消息键的反序列化方式props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 消息值的反序列化方式props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 偏移量提交方式true自动提交false手动提交更灵活控制消息消费可靠性props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 自动提交偏移量的间隔时间仅当自动提交开启时生效props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);// 消费者启动时的偏移量位置earliest从最开始消费latest从最新消息开始消费props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,earliest);// 2. 创建消费者实例KafkaConsumerlt;String,Stringgt;consumernewKafkaConsumer(props);// 3. 订阅主题可订阅单个或多个主题此处订阅 test_topicconsumer.subscribe(Collections.singletonList(TOPIC));// 4. 循环拉取消息消费者是长轮询模型需持续拉取try{while(true){// 拉取消息超时时间 100ms若没有消息则返回空ConsumerRecordsString,Stringrecordsconsumer.poll(Duration.ofMillis(100));// 遍历处理消息for(ConsumerRecordString,Stringrecord:records){System.out.printf(Received message: topic%s, partition%d, offset%d, key%s, value%s%n,record.topic(),record.partition(),record.offset(),record.key(),record.value());}// 手动提交偏移量确保消息处理完成后再提交避免消息丢失consumer.commitSync();}}catch(Exceptione){e.printStackTrace();}finally{// 关闭消费者自动提交最终的偏移量consumer.close();}}}四、常见问题与优化技巧1. 常见问题排查生产者无法发送消息检查 Kafka 服务是否正常运行确认advertised.listeners配置的地址是否可被生产者访问检查主题是否存在。消费者无法拉取消息确认消费者组 ID 配置正确检查AUTO_OFFSET_RESET_CONFIG配置若设置为 latest需确保有新消息发送检查主题权限是否允许消费。消息重复消费消费者手动提交偏移量时若消息处理完成前程序崩溃会导致偏移量未提交重启后重复消费。解决方案通过业务唯一 ID 实现幂等性处理或使用 Kafka 的事务机制。消息丢失生产者 ACK 机制设置为 1 或 0 时若首领副本故障可能导致消息丢失消费者自动提交偏移量时若消息未处理完成就提交偏移量程序崩溃会导致消息丢失。解决方案生产者 ACK 设为 all消费者使用手动提交偏移量。2. 性能优化技巧主题优化根据业务吞吐量设置合理的分区数建议每个分区的吞吐量控制在 1000-5000 条/秒生产环境副本数设置为 2-3平衡可用性和性能。生产者优化开启批量发送调整BATCH_SIZE_CONFIG和LINGER_MS_CONFIG使用异步发送减少等待时间合理设置消息压缩方式COMPRESSION_TYPE_CONFIG推荐 gzip。消费者优化消费者组内的消费者数量与主题分区数保持一致避免资源浪费或分区分配不均增大拉取消息的批量大小MAX_POLL_RECORDS_CONFIG避免在消费线程中执行耗时操作可通过线程池异步处理消息。服务器优化选择高性能的磁盘如 SSD存储日志调整 JVM 堆大小建议 4-8G关闭磁盘缓存刷盘的同步机制通过log.flush.interval.messages配置异步刷盘。五、总结本文从 Kafka 的核心概念出发完整覆盖了环境搭建、主题管理、消息生产消费的全流程同时提供了 Java 客户端实战代码和常见问题解决方案。Kafka 的核心优势在于高吞吐量和高可靠性掌握其核心原理如分区、副本、消费者组是灵活运用 Kafka 的关键。后续可以进一步学习 Kafka 的高级特性如事务机制、流处理Kafka Streams、连接器Kafka Connect等以满足更复杂的业务场景如实时数据清洗、跨系统数据同步。希望本文能帮助大家快速入门 Kafka并在实际项目中发挥其价值。