发布于2023-05-20 21:13 阅读(671) 评论(0) 点赞(6) 收藏(4)
好文推荐:
2.5万字详解23种设计模式
基于Netty搭建websocket集群实现服务器消息推送
2.5万字讲解DDD领域驱动设计
延时队列:是一种消息队列,可以用于在指定时间或经过一定时间后执行某种操作。
小编已经做好了Kafka延时队列的封装,以后只需要一行代码就可以实现kafka延时队列了,代码中有详细注释,完整代码已经给大家整理好了,领取方式放在了文章末。
1,订单超时自动取消:用户下单后,如果在指定时间内未完成支付,系统会自动取消订单,释放库存。
2,定时推送:比如消息通知,用户预约某个服务,系统会在服务开始前一定时间发送提醒短信。
3,定时任务:将需要定时执行的任务放入延时队列中,等到指定的时间到达时再进行执行,例如生成报表、统计数据等操作。
4,限时抢购:将限时抢购的结束时间放入延时队列中,当时间到达时自动下架商品。
…
1.1 优点:
①Redis的延迟队列是基于Redis的sorted set实现的,性能较高。
②Redis的延迟队列可以通过TTL设置过期时间,灵活性较高。
③简单易用,适用于小型系统。
④性能较高,支持高并发。
1.2 缺点:
①可靠性相对较低,可能会丢失消息,就算redis最高级别的持久化也是有可能丢一条的,每次请求都做aof,但是aof是异步的,所以不保证这一条操作能被持久化。
②而且Redis持久化的特性也导致其在数据量较大时,存储和查询效率逐渐降低,此时会需要对其进行分片和负载均衡。
③Redis的延迟队列需要手动实现消息重试机制,更严谨的消息队列需要数据库兜底。
1.3 应用场景:
①适用于较小规模的系统,实时性要求较高的场景。
②适用于轻量级的任务调度和消息通知场景,适合短期延迟任务,不适合长期任务,例如订单超时未支付等。
2.1 优点:
①Kafka的优点在于其高并发、高吞吐量和可扩展性强,同时支持分片。
②可靠性高,支持分布式和消息持久化。
③消费者可以随时回溯消费。
④支持多个消费者并行消费、消费者组等机制。
2.2 缺点:
①没有原生的延迟队列功能,需要使用topic和消费者组来实现,实现延迟队列需要额外的开发工作。
②消费者需要主动拉取数据,可能会导致延迟,精度不是特别高。
在此案例中代码已经实现了,直接拿来使用就可以了。
2.3 应用场景:
适用于大规模的数据处理,实时性要求较高的,高吞吐量的消息处理场景。
3.1 优点:
①RabbitMQ的延迟队列是通过RabbitMQ的插件实现的,易于部署和使用。
②RabbitMQ的延迟队列支持消息重试和消息顺序处理,可靠性较高。
③支持消息持久化和分布式。
④支持优先级队列和死信队列。
⑤提供了丰富的插件和工具。
3.2 缺点:
①RabbitMQ的延迟队列性能较低,不适用于高吞吐量的场景。
②性能较低,不适合高并发场景。
③实现延迟队列需要额外的配置,但是配置就很简单了。
3.3应用场景:
适用于中小型的任务调度和消息通知,对可靠性要求高的场景。
4.1 优点:
①RocketMQ的延迟队列是RocketMQ原生支持的,易于使用和部署。
②RocketMQ的延迟队列支持消息重试和消息顺序处理,可靠性较高。
③高性能和高吞吐量,支持分布式和消息持久化。
④RocketMQ使用简单,性能好,并且支持延迟队列功能。
4.2 缺点:
①RocketMQ的延迟队列不支持动态添加或删除队列。
②RocketMQ的延迟队列需要保证消息的顺序,可能会导致消息延迟。
③在节点崩溃后,RocketMQ有可能发生消息丢失。
4.3 应用场景:
①适用于大规模的数据处理,对性能和吞吐量要求较高的场景。
②适合于任务量较大、需要延迟消息和定时消息的场景。例如电商平台、社交软件等。
③适用于分布式任务调度和高可靠性消息通知场景。
基于以上四种实现延时队列的分析来,选择对应的技术方案的基础上呢,不同公司的mq的基础设施不同,如果只有Kafka,也没必要引入RabbitMQ和RocketMq来实现,引入新的组件也会顺便带来新的问题。
网上搜Kafka实现延时队列有很多文章,很多文章说使用Kafka内部的时间轮,支持延时操作,但这是Kafka自己内部使用的,时间轮只是一个工具类,用户无法将其作为延迟队列来使用。
Kafka延时队列的最佳实践,使用Kafka消费者的暂停和恢复机制来实现。
以下代码只列出了核心实现,完整代码已经给大家整理好了,可以关注【微信公众号】微信搜索【老板来一杯java】,然后【加群】直接获取源码。
源码目录:
该类封装了一个线程安全的KafkaConsumer,因为原生的 KafkaConsumer是不支持线程共享的,直接使用会报错:java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
package com.wdyin.kafka.delay; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.Set; /** * Kafka同步消费者 * @author WDYin * @date 2023/4/14 **/ public class KafkaSyncConsumer<K, V> extends KafkaConsumer<K, V> { KafkaSyncConsumer(Properties properties) { super(properties); } @Override public synchronized ConsumerRecords<K, V> poll(Duration timeout) { return super.poll(timeout); } @Override public synchronized Set<TopicPartition> paused() { return super.paused(); } synchronized void pauseAndSeek(TopicPartition partition, long offset) { super.pause(Collections.singletonList(partition)); super.seek(partition, offset); } @Override public synchronized void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) { super.commitSync(offsets); } synchronized void resume(TopicPartition topicPartition) { super.resume(Collections.singleton(topicPartition)); } @Override public synchronized void commitSync(){ super.commitSync(); } }
定义一个Kafka延期队列,包含的内容:KafkaDelayQueue,其中有延迟队列配置,主题,消费组,延迟时间,目标主题,KafkaSyncConsumer,ApplicationContext,poll线程池,delay线程池等等
package com.wdyin.kafka.delay; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import java.time.Duration; import java.util.Collections; import java.util.concurrent.ThreadPoolExecutor; /** * kafka延时队列 * * @Author WDYin * @Date 2022/7/2 **/ @Slf4j @Getter @Setter class KafkaDelayQueue<K, V> { private String topic; private String group; private Integer delayTime; private String targetTopic; private KafkaDelayConfig kafkaDelayConfig; private KafkaSyncConsumer<K, V> kafkaSyncConsumer; private ApplicationContext applicationContext; private ThreadPoolTaskScheduler threadPoolPollTaskScheduler; private ThreadPoolTaskScheduler threadPoolDelayTaskScheduler; void send() { try { kafkaSyncConsumer.subscribe(Collections.singletonList(topic)); this.threadPoolPollTaskScheduler .scheduleWithFixedDelay(pollTask(), kafkaDelayConfig.getPollInterval()); } catch (Exception e) { log.error("KafkaDelayQueue subscribe error", e); } } private KafkaPollTask<K, V> pollTask(){ return new KafkaPollTask<>(this, Duration.ofMillis(kafkaDelayConfig.getPollTimeout()), delayTime, applicationContext); } KafkaDelayTask<K, V> delayTask(TopicPartition partition){ return new KafkaDelayTask<>(kafkaSyncConsumer, partition); } @Slf4j private static class KafkaPollTask<K, V> implements Runnable { private KafkaDelayQueue<K, V> kafkaDelayQueue; private Duration timeout; private Integer delayTime; private ApplicationContext applicationContext; KafkaPollTask(KafkaDelayQueue<K, V> kafkaDelayQueue, Duration timeout, Integer delayTime, ApplicationContext applicationContext) { this.kafkaDelayQueue = kafkaDelayQueue; this.timeout = timeout; this.applicationContext = applicationContext; this.delayTime = delayTime; } @Override public void run() { try { ConsumerRecords<K, V> records = kafkaDelayQueue.getKafkaSyncConsumer().poll(timeout); applicationContext.publishEvent(new KafkaPollEvent<>(records, delayTime, kafkaDelayQueue)); } catch (Exception e) { log.error("KafkaDelayQueue consumer fail", e); } } } @Slf4j private static class KafkaDelayTask<K, V> implements Runnable { private KafkaSyncConsumer<K, V> kafkaSyncConsumer; private TopicPartition partition; private KafkaDelayTask(KafkaSyncConsumer<K, V> kafkaSyncConsumer, TopicPartition partition) { this.kafkaSyncConsumer = kafkaSyncConsumer; this.partition = partition; } @Override public void run() { try { kafkaSyncConsumer.resume(partition); } catch (Exception e) { log.error("KafkaDelayQueue resume failed", e); } } } }
Kafka延期队列的工厂,用于及其管理延迟队列
package com.wdyin.kafka.delay; import lombok.Data; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.context.ApplicationContext; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import java.util.Properties; /** * 延时队列工厂 * @author WDYin * @date 2023/4/17 **/ @Data public class KafkaDelayQueueFactory { private KafkaDelayConfig kafkaDelayConfig; private Properties properties; private ApplicationContext applicationContext; private Integer concurrency; public KafkaDelayQueueFactory(Properties properties, KafkaDelayConfig kafkaDelayConfig) { Assert.notNull(properties, "properties cannot null"); Assert.notNull(kafkaDelayConfig.getDelayThreadPool(), "delayThreadPool cannot null"); Assert.notNull(kafkaDelayConfig.getPollThreadPool(), "pollThreadPool cannot null"); Assert.notNull(kafkaDelayConfig.getPollInterval(), "pollInterval cannot null"); Assert.notNull(kafkaDelayConfig.getPollTimeout(), "timeout cannot null"); this.properties = properties; this.kafkaDelayConfig = kafkaDelayConfig; } public void listener(String topic, String group, Integer delayTime, String targetTopic) { if (StringUtils.isEmpty(topic)) { throw new RuntimeException("topic cannot empty"); } if (StringUtils.isEmpty(group)) { throw new RuntimeException("group cannot empty"); } if (StringUtils.isEmpty(delayTime)) { throw new RuntimeException("delayTime cannot empty"); } if (StringUtils.isEmpty(targetTopic)) { throw new RuntimeException("targetTopic cannot empty"); } KafkaSyncConsumer<String, String> kafkaSyncConsumer = createKafkaSyncConsumer(group); KafkaDelayQueue<String, String> kafkaDelayQueue = createKafkaDelayQueue(topic, group, delayTime, targetTopic, kafkaSyncConsumer); kafkaDelayQueue.send(); } private KafkaDelayQueue<String, String> createKafkaDelayQueue(String topic, String group, Integer delayTime, String targetTopic, KafkaSyncConsumer<String, String> kafkaSyncConsumer) { KafkaDelayQueue<String, String> kafkaDelayQueue = new KafkaDelayQueue<>(kafkaSyncConsumer, kafkaDelayConfig); Assert.notNull(applicationContext, "kafkaDelayQueue need applicationContext"); kafkaDelayQueue.setApplicationContext(applicationContext); kafkaDelayQueue.setDelayTime(delayTime); kafkaDelayQueue.setTopic(topic); kafkaDelayQueue.setGroup(group); kafkaDelayQueue.setTargetTopic(targetTopic); return kafkaDelayQueue; } private KafkaSyncConsumer<String, String> createKafkaSyncConsumer(String group) { properties.put(ConsumerConfig.GROUP_ID_CONFIG, group); return new KafkaSyncConsumer<>(properties); } }
package com.wdyin.kafka.delay; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.springframework.context.ApplicationListener; import org.springframework.kafka.core.KafkaTemplate; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.*; /** * 延时队列监听 * @Author : WDYin * @Date : 2021/5/7 * @Desc : */ @Slf4j public class KafkaPollListener<K, V> implements ApplicationListener<KafkaPollEvent<K, V>> { private KafkaTemplate kafkaTemplate; public KafkaPollListener(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @Override public void onApplicationEvent(KafkaPollEvent<K, V> event) { ConsumerRecords<K, V> records = (ConsumerRecords<K, V>) event.getSource(); Integer delayTime = event.getDelayTime(); KafkaDelayQueue<K, V> kafkaDelayQueue = event.getKafkaDelayQueue(); KafkaSyncConsumer<K, V> kafkaSyncConsumer = kafkaDelayQueue.getKafkaSyncConsumer(); //1.获取poll到的有消息的分区 Set<TopicPartition> partitions = records.partitions(); //2.存储需要commit的消息,提高效率批量提交 Map<TopicPartition, OffsetAndMetadata> commitMap = new HashMap<>(); //3.遍历有消息的分区 partitions.forEach((partition) -> { List<ConsumerRecord<K, V>> consumerRecords = records.records(partition); //4.遍历分区里面的消息 for (ConsumerRecord<K, V> record : consumerRecords) { //5.获取消息创建时间 long startTime = (record.timestamp() / 1000) * 1000; long endTime = startTime + delayTime; //6.不符合条件的分区暂停消费 long now = System.currentTimeMillis(); if (endTime > now) { kafkaSyncConsumer.pauseAndSeek(partition, record.offset()); //7.使用 schedule()执行定时任务 kafkaDelayQueue.getThreadPoolPollTaskScheduler().schedule(kafkaDelayQueue.delayTask(partition), new Date(endTime)); //无需继续消费该分区下的其他消息,直接消费其他分区 break; } log.info("{}: partition:{}, offset:{}, key:{}, value:{}, messageDate:{}, nowDate:{}, messageDate:{}, nowDate:{}", Thread.currentThread().getName() + "#" + Thread.currentThread().getId(), record.topic() + "-" + record.partition(), record.offset(), record.key(), record.value(), LocalDateTime.ofInstant(Instant.ofEpochMilli(startTime), ZoneId.systemDefault()), LocalDateTime.now(), startTime, Instant.now().getEpochSecond()); //发送目标主题 kafkaTemplate.send(kafkaDelayQueue.getTargetTopic(), record.value()); //更新需要commit的消息 commitMap.put(partition, new OffsetAndMetadata(record.offset() + 1)); } }); //8.批量提交,提高效率,commitSync耗时几百毫秒 if (!commitMap.isEmpty()) { kafkaSyncConsumer.commitSync(commitMap); } } }
package com.wdyin.kafka.config; import com.wdyin.kafka.delay.KafkaDelayConfig; import com.wdyin.kafka.delay.KafkaDelayQueueFactory; import com.wdyin.kafka.delay.KafkaPollListener; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** * @author WDYin * @date 2023/4/21 **/ @Configuration @Slf4j public class KafkaConfig { @Value("${spring.kafka.consumer.bootstrap-servers}") private String bootstrapServers; @Resource private ApplicationContext applicationContext; /** * 消费者参数配置 * @param bootstrapServers * @param isAutoSubmit * @return */ private Map<String, Object> consumerProps(String bootstrapServers, Boolean isAutoSubmit) { Map<String, Object> props = new HashMap<>(); //kafka broker地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //取消自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, isAutoSubmit); //一次拉取消息数量,可根据实际情况自行调整 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); //序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } /** * spring生产者参数配置 * @param bootstrapServer * @return */ private HashMap<String, Object> producerProps(String bootstrapServer) { HashMap<String, Object> configProps = new HashMap<>(); //broke地址 configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); //序列化 configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //幂等发送给broker configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); //生产者时将多少数据累积到一个批次中,设置为0的目的提高实时性 configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "0"); return configProps; } /** * 用于spring的@listener注解进行消费,并非用于延时队列 * @return */ @Bean("kafkaContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps(bootstrapServers, Boolean.FALSE))); //线程数为1 factory.setConcurrency(1); //poll超时时间 factory.getContainerProperties().setPollTimeout(1500L); //手动立即提交offset factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } /** * spring kafkaTemplate注册 * @return */ @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps(bootstrapServers))); } /** * 延时队列-Kafka同步消费者配置 * @return */ public Properties kafkaSyncConsumerProperties() { // Consumer的配置 Properties properties = new Properties(); // 服务地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // 关闭offset自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 消费者offset自动提交到Kafka的频率(以毫秒为单位) properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "15000"); // KEY的反序列化器类 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // VALUE的反序列化器类 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); return properties; } /** * 延时队列-注册延时队列工厂 * @return */ @Bean public KafkaDelayQueueFactory kafkaDelayQueueFactory() { KafkaDelayConfig kafkaDelayConfig = new KafkaDelayConfig(); kafkaDelayConfig.setPollThreadPool(1); kafkaDelayConfig.setPollTimeout(50); kafkaDelayConfig.setPollInterval(50); kafkaDelayConfig.setDelayThreadPool(10); KafkaDelayQueueFactory kafkaDelayQueueFactory = new KafkaDelayQueueFactory(kafkaSyncConsumerProperties(), kafkaDelayConfig); kafkaDelayQueueFactory.setApplicationContext(applicationContext); return kafkaDelayQueueFactory; } /** * 延时队列-注册消费者poll监听器 * @param kafkaTemplate * @return */ @Bean public KafkaPollListener kafkaPollListener(KafkaTemplate kafkaTemplate) { return new KafkaPollListener<>(kafkaTemplate); } }
一个延时主题对应一个延迟时间,后续有新的延迟任务只需要在此注册延迟任务的监听即可!
import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * @author WDYin * @date 2023/4/18 **/ @Component public class KafkaDelayApplication { @Resource private KafkaDelayQueueFactory kafkaDelayQueueFactory; @PostConstruct public void init() { //延迟30秒 kafkaDelayQueueFactory.listener("delay-30-second-topic", "delay-30-second-group", 1 * 30 * 1000, "delay-60-second-target-topic"); //延迟60秒 kafkaDelayQueueFactory.listener("delay-60-second-topic", "delay-60-second-group", 1 * 60 * 1000, "delay-60-second-target-topic"); //延迟30分钟 kafkaDelayQueueFactory.listener("delay-30-minute-topic", "delay-30-minute-group", 30 * 60 * 1000, "delay-30-minute-target-topic"); } }
好文推荐:
2.5万字详解23种设计模式
微服务springcloud环境下基于Netty搭建websocket集群实现服务器消息推送----netty是yyds
2.5万字讲解DDD领域驱动设计
作者:门路弄土了吗
链接:http://www.phpheidong.com/blog/article/546053/1eecb8f39612d147b649/
来源:php黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 php黑洞网 All Rights Reserved 版权所有,并保留所有权利。 京ICP备18063182号-4
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!