From e02f580ba4537dc674ea27602285aa7ad2da1086 Mon Sep 17 00:00:00 2001 From: Easy <1091927336@qq.com> Date: Thu, 9 Dec 2021 17:06:16 +0800 Subject: [PATCH 01/14] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=BC=94=E7=A4=BA?= =?UTF-8?q?=E5=9C=B0=E5=9D=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index a74bcc4b..79e8ce34 100644 --- a/README.md +++ b/README.md @@ -49,11 +49,11 @@ SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor > 大家不要恶意链接,谢谢! -| 管理 | 说明 | 其他 | -| ---- | ---- |---- | -| 123.57.69.210:1883 | mqtt端口 |用户名:smqtt 密码:smqtt | -| 123.57.69.210:8999 | mqtt over websocket |用户名:smqtt 密码:smqtt | -| http://123.57.69.210:60000/smqtt/admin | 管理后台 |用户名:smqtt 密码:smqtt | +| 管理 | 说明 | 其他 | +|----------------------------------------| ---- |---- | +| 113.90.145.99:18886 | mqtt端口 |用户名:smqtt 密码:smqtt | +| 113.90.145.99:18888 | mqtt over websocket |用户名:smqtt 密码:smqtt | +| http://113.90.145.99:18887/smqtt/admin | 管理后台 |用户名:smqtt 密码:smqtt | ## 启动方式 -- Gitee From 760ab2aa75d136b6e77e7e843de0e688cc36e021 Mon Sep 17 00:00:00 2001 From: luxurong Date: Thu, 16 Dec 2021 21:09:46 +0800 Subject: [PATCH 02/14] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=88=A4=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/test/java/ClusterNode1.java | 2 +- .../src/test/java/topic/TopicTest.java | 53 +++++++++++++++++++ .../quickmsg/common/channel/MqttChannel.java | 4 +- .../github/quickmsg/common/enums/Event.java | 7 ++- .../core/spi/DefaultProtocolAdaptor.java | 8 +-- 5 files changed, 65 insertions(+), 9 deletions(-) create mode 100644 smqtt-bootstrap/src/test/java/topic/TopicTest.java diff --git a/smqtt-bootstrap/src/test/java/ClusterNode1.java b/smqtt-bootstrap/src/test/java/ClusterNode1.java index cf2d7159..d0cd268d 100644 --- a/smqtt-bootstrap/src/test/java/ClusterNode1.java +++ b/smqtt-bootstrap/src/test/java/ClusterNode1.java @@ -9,7 +9,7 @@ public class ClusterNode1 { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = Bootstrap.builder() - .rootLevel(Level.INFO) + .rootLevel(Level.ERROR) .websocketConfig( BootstrapConfig.WebsocketConfig .builder() diff --git a/smqtt-bootstrap/src/test/java/topic/TopicTest.java b/smqtt-bootstrap/src/test/java/topic/TopicTest.java new file mode 100644 index 00000000..d67f144a --- /dev/null +++ b/smqtt-bootstrap/src/test/java/topic/TopicTest.java @@ -0,0 +1,53 @@ +package topic; + +import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.topic.SubscribeTopic; +import io.github.quickmsg.common.topic.TopicRegistry; +import io.github.quickmsg.core.spi.DefaultTopicRegistry; +import io.netty.handler.codec.mqtt.MqttQoS; + +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.*; + +/** + * @author luxurong + */ +public class TopicTest { + + static ExecutorService service = Executors.newFixedThreadPool(100); + + private static TopicRegistry topicRegistry = new DefaultTopicRegistry(); + + private static Map channelMap = new ConcurrentHashMap<>(); + + public static void main(String[] args) { + CountDownLatch count = new CountDownLatch(500000); + for(int i=0;i<500000;i++){ + service.execute(()->{ + int index = new Random().nextInt(1000); + MqttChannel mqttChannel =channelMap.computeIfAbsent(index,in->{ + MqttChannel mqttChannel1=new MqttChannel(); + mqttChannel1.setTopics(new CopyOnWriteArraySet<>()); + return mqttChannel1; + }); + SubscribeTopic subscribeTopic=new SubscribeTopic(String.valueOf(index), MqttQoS.AT_MOST_ONCE,mqttChannel); + topicRegistry.registrySubscribeTopic(subscribeTopic); + topicRegistry.getAllTopics(); + count.countDown(); + }); + } + try { + count.await(); + Map> topics = topicRegistry.getAllTopics(); + System.out.println(topics); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + + +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java index 93d16b10..ff1f6c95 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java @@ -188,7 +188,7 @@ public class MqttChannel { */ public Mono write(MqttMessage mqttMessage, boolean retry) { // http本地mock - if (this.getIsMock()) { + if (this.getIsMock() && !this.active()) { return Mono.empty(); } else { return MqttMessageSink.MQTT_SINK.sendMessage(mqttMessage, this, retry, replyMqttMessageMap); @@ -346,7 +346,7 @@ public class MqttChannel { @Override public String toString() { return "MqttChannel{" + - " address='" + this.connection.address().toString() + '\'' + +// " address='" + this.connection.address().toString() + '\'' + ", clientIdentifier='" + clientIdentifier + '\'' + ", status=" + status + ", keepalive=" + keepalive + diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/enums/Event.java b/smqtt-common/src/main/java/io/github/quickmsg/common/enums/Event.java index b94259bf..94229d0f 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/enums/Event.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/enums/Event.java @@ -79,8 +79,8 @@ public enum Event { /** * body * - * @param mqttChannel {@link MqttChannel } - * @param body {@link Object } + * @param mqttChannel {@link MqttChannel } + * @param body {@link Object } * @return ByteBuf */ public abstract ByteBuf writeBody(MqttChannel mqttChannel, Object body); @@ -92,6 +92,9 @@ public enum Event { message , System.currentTimeMillis(), Boolean.FALSE), receiveContext); + if (message instanceof MqttPublishMessage) { + ((MqttPublishMessage) message).release(); + } } } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java index 7f0235b6..4007bacd 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java @@ -45,11 +45,11 @@ public class DefaultProtocolAdaptor implements ProtocolAdaptor { @Override public void chooseProtocol(MqttChannel mqttChannel, SmqttMessage smqttMessage, ReceiveContext receiveContext) { MqttMessage mqttMessage = smqttMessage.getMessage(); - log.info(" 【{}】【{}】 【{}】", - Thread.currentThread().getName(), - mqttMessage.fixedHeader().messageType(), - mqttChannel); if (mqttMessage.decoderResult() != null && (mqttMessage.decoderResult().isSuccess())) { + log.info(" 【{}】【{}】 【{}】", + Thread.currentThread().getName(), + mqttMessage.fixedHeader().messageType(), + mqttChannel); Optional.ofNullable(types.get(mqttMessage.fixedHeader().messageType())) .ifPresent(protocol -> protocol .doParseProtocol(smqttMessage, mqttChannel) -- Gitee From e85a52bb26a7ce612bd946d5bfbef60b2fb8a86f Mon Sep 17 00:00:00 2001 From: luxurong Date: Mon, 20 Dec 2021 19:51:54 +0800 Subject: [PATCH 03/14] doc --- README.md | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a4b2823d..cab247c8 100644 --- a/README.md +++ b/README.md @@ -216,7 +216,20 @@ docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1 [config.yaml](config/config.yaml) -5. 启动springboot服务服务即可 +4. 启动springboot服务服务即可 +5. 如果引入的是spring-boot-starter-parent的管理包,如果启动报错,则需要添加以下依赖 +```xml + + io.projectreactor + reactor-core + 3.4.9 + + + io.projectreactor.netty + reactor-netty + 1.0.10 + +``` ## 官网地址 -- Gitee From 4cd9ce4168351b2a7799cd3f628e1735bec3525d Mon Sep 17 00:00:00 2001 From: luxurong Date: Tue, 21 Dec 2021 18:10:02 +0800 Subject: [PATCH 04/14] json --- .../common/message/HeapMqttMessage.java | 2 +- .../quickmsg/common/message/JsonMap.java | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) create mode 100644 smqtt-common/src/main/java/io/github/quickmsg/common/message/JsonMap.java diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java index c9d3b19e..ac27b4a7 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java @@ -43,7 +43,7 @@ public class HeapMqttMessage { private Object getJsonObject(String body) { if (body.startsWith("{") && body.endsWith("}")) { - return JacksonUtil.json2Bean(body, Map.class); + return JacksonUtil.json2Bean(body, JsonMap.class); } else { return body; } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/JsonMap.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/JsonMap.java new file mode 100644 index 00000000..4fb3e3a6 --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/JsonMap.java @@ -0,0 +1,20 @@ +package io.github.quickmsg.common.message; + +import io.github.quickmsg.common.utils.JacksonUtil; + +import java.util.HashMap; + +/** + * @author luxurong + */ +public class JsonMap extends HashMap { + + public JsonMap(int initialCapacity) { + super(initialCapacity); + } + + @Override + public String toString() { + return JacksonUtil.map2Json(this); + } +} -- Gitee From 60383604ebaff6eb33b889b7b0d5ec2fe53075b7 Mon Sep 17 00:00:00 2001 From: luxurong Date: Tue, 21 Dec 2021 20:17:51 +0800 Subject: [PATCH 05/14] json --- smqtt-bootstrap/src/test/java/ClusterNode1.java | 2 +- .../java/io/github/quickmsg/common/message/JsonMap.java | 5 +++++ .../java/io/github/quickmsg/common/utils/JacksonUtil.java | 1 + .../java/io/github/quickmsg/source/mqtt/MqttSourceBean.java | 6 ++++-- 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/smqtt-bootstrap/src/test/java/ClusterNode1.java b/smqtt-bootstrap/src/test/java/ClusterNode1.java index d0cd268d..cf2d7159 100644 --- a/smqtt-bootstrap/src/test/java/ClusterNode1.java +++ b/smqtt-bootstrap/src/test/java/ClusterNode1.java @@ -9,7 +9,7 @@ public class ClusterNode1 { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = Bootstrap.builder() - .rootLevel(Level.ERROR) + .rootLevel(Level.INFO) .websocketConfig( BootstrapConfig.WebsocketConfig .builder() diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/JsonMap.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/JsonMap.java index 4fb3e3a6..345b75e0 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/JsonMap.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/JsonMap.java @@ -9,6 +9,11 @@ import java.util.HashMap; */ public class JsonMap extends HashMap { + public JsonMap() { + super(); + } + + public JsonMap(int initialCapacity) { super(initialCapacity); } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/JacksonUtil.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/JacksonUtil.java index cb8a5c34..3ba292d3 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/JacksonUtil.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/JacksonUtil.java @@ -27,6 +27,7 @@ public class JacksonUtil { mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true); + } public static String bean2Json(Object data) { diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java index 6416db1b..34286940 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java @@ -7,6 +7,7 @@ import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3ConnectBuilder; import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck; import io.github.quickmsg.common.rule.source.Source; import io.github.quickmsg.common.rule.source.SourceBean; +import io.github.quickmsg.common.utils.JacksonUtil; import io.netty.util.internal.StringUtil; import lombok.extern.slf4j.Slf4j; @@ -91,12 +92,13 @@ public class MqttSourceBean implements SourceBean { @Override public void transmit(Map object) { String topic = (String) object.get("topic"); - String msg = (String) object.get("msg"); + Object msg =object.get("msg"); + String bytes = msg instanceof Map ? JacksonUtil.map2Json((Map) msg): msg.toString(); Boolean retain = (Boolean) object.get("retain"); Integer qos = Optional.ofNullable((Integer)object.get("qos")).orElse(0); client.publishWith() .topic(topic) - .payload(msg.getBytes()) + .payload(bytes.getBytes()) .qos(Objects.requireNonNull(MqttQos.fromCode(qos))) .retain(retain) .send() -- Gitee From 92f8001b29735c4d8ebe60f8856de363d6731c67 Mon Sep 17 00:00:00 2001 From: luxurong Date: Tue, 21 Dec 2021 21:38:46 +0800 Subject: [PATCH 06/14] ack --- .../io/github/quickmsg/common/ack/AbsAck.java | 67 ++++++++++++++++++ .../io/github/quickmsg/common/ack/Ack.java | 23 ++++++ .../quickmsg/common/ack/AckManager.java | 16 +++++ .../github/quickmsg/common/ack/RetryAck.java | 20 ++++++ .../quickmsg/common/ack/TimeAckManager.java | 36 ++++++++++ .../quickmsg/common/channel/MqttChannel.java | 70 +++++++++++-------- .../common/context/ReceiveContext.java | 9 +++ .../core/mqtt/AbstractReceiveContext.java | 5 ++ .../quickmsg/core/mqtt/MqttReceiver.java | 2 +- .../core/protocol/CommonProtocol.java | 32 ++++++--- .../core/protocol/PublishAckProtocol.java | 15 ++-- .../core/websocket/WebSocketMqttReceiver.java | 2 +- 12 files changed, 252 insertions(+), 45 deletions(-) create mode 100644 smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java create mode 100644 smqtt-common/src/main/java/io/github/quickmsg/common/ack/Ack.java create mode 100644 smqtt-common/src/main/java/io/github/quickmsg/common/ack/AckManager.java create mode 100644 smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java create mode 100644 smqtt-common/src/main/java/io/github/quickmsg/common/ack/TimeAckManager.java diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java new file mode 100644 index 00000000..bb5b461b --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java @@ -0,0 +1,67 @@ +package io.github.quickmsg.common.ack; + +import io.netty.util.Timeout; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.TimeUnit; + +/** + * @author luxurong + */ +@Slf4j +public abstract class AbsAck implements Ack { + + private final int maxRetrySize; + + private int count = 1; + + private volatile boolean died = false; + + private final Runnable runnable; + + private final AckManager ackManager; + + private final int period; + + protected AbsAck(int maxRetrySize, int period, Runnable runnable, AckManager ackManager) { + this.maxRetrySize = maxRetrySize; + this.period = period; + this.runnable = runnable; + this.ackManager = ackManager; + } + + @Override + public void run(Timeout timeout) throws Exception { + if (++count <= maxRetrySize+1 && !died ) { + try { + runnable.run(); + ackManager.addAck(this); + } catch (Exception e) { + log.error("Ack error ", e); + } + + } + } + + @Override + public void stop() { + died = true; + ackManager.deleteAck(getId()); + } + + + @Override + public void start() { + this.ackManager.addAck(this); + } + + @Override + public int getTimed() { + return this.period * this.count; + } + + @Override + public TimeUnit getTimeUnit() { + return TimeUnit.SECONDS; + } +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/Ack.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/Ack.java new file mode 100644 index 00000000..822157c0 --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/Ack.java @@ -0,0 +1,23 @@ +package io.github.quickmsg.common.ack; + +import io.netty.util.TimerTask; + +import java.util.concurrent.TimeUnit; + +/** + * @author luxurong + */ +public interface Ack extends TimerTask { + + int getTimed(); + + TimeUnit getTimeUnit(); + + long getId(); + + void start(); + + void stop(); + + +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AckManager.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AckManager.java new file mode 100644 index 00000000..12a5f956 --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AckManager.java @@ -0,0 +1,16 @@ +package io.github.quickmsg.common.ack; + +/** + * @author luxurong + */ +public interface AckManager { + + void addAck(Ack ack); + + Ack getAck(Long id); + + void deleteAck(Long id); + + + +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java new file mode 100644 index 00000000..4ef22809 --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java @@ -0,0 +1,20 @@ +package io.github.quickmsg.common.ack; + +/** + * @author luxurong + */ + +public class RetryAck extends AbsAck { + + private final long id; + + public RetryAck(long id, int maxRetrySize, int period, Runnable runnable, AckManager ackManager) { + super(maxRetrySize, period, runnable, ackManager); + this.id = id; + } + + @Override + public long getId() { + return id; + } +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/TimeAckManager.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/TimeAckManager.java new file mode 100644 index 00000000..c1739a5d --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/TimeAckManager.java @@ -0,0 +1,36 @@ +package io.github.quickmsg.common.ack; + +import io.netty.util.HashedWheelTimer; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * @author luxurong + */ +public class TimeAckManager extends HashedWheelTimer implements AckManager { + + private final Map ackMap = new ConcurrentHashMap<>(); + + public TimeAckManager(long tickDuration, TimeUnit unit, int ticksPerWheel) { + super( tickDuration, unit, ticksPerWheel); + } + + @Override + public void addAck(Ack ack) { + ackMap.put(ack.getId(),ack); + this.newTimeout(ack,ack.getTimed(),ack.getTimeUnit()); + } + + @Override + public Ack getAck(Long id) { + return ackMap.get(id); + } + + @Override + public void deleteAck(Long id) { + ackMap.remove(id); + } + +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java index ff1f6c95..9a6ef40b 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java @@ -1,9 +1,11 @@ package io.github.quickmsg.common.channel; import com.fasterxml.jackson.annotation.JsonIgnore; +import io.github.quickmsg.common.ack.Ack; +import io.github.quickmsg.common.ack.RetryAck; +import io.github.quickmsg.common.ack.TimeAckManager; import io.github.quickmsg.common.enums.ChannelStatus; import io.github.quickmsg.common.topic.SubscribeTopic; -import io.github.quickmsg.common.utils.MessageUtils; import io.netty.handler.codec.mqtt.*; import lombok.Builder; import lombok.Data; @@ -74,6 +76,8 @@ public class MqttChannel { private Disposable closeDisposable; + private TimeAckManager timeAckManager; + public void disposableClose() { if (closeDisposable != null && !closeDisposable.isDisposed()) { closeDisposable.dispose(); @@ -86,7 +90,7 @@ public class MqttChannel { } - public static MqttChannel init(Connection connection) { + public static MqttChannel init(Connection connection, TimeAckManager timeAckManager) { MqttChannel mqttChannel = new MqttChannel(); mqttChannel.setTopics(new CopyOnWriteArraySet<>()); mqttChannel.setAtomicInteger(new AtomicInteger(0)); @@ -97,6 +101,7 @@ public class MqttChannel { mqttChannel.setConnection(connection); mqttChannel.setStatus(ChannelStatus.INIT); mqttChannel.setAddress(connection.address().toString()); + mqttChannel.setTimeAckManager(timeAckManager); return mqttChannel; } @@ -179,6 +184,11 @@ public class MqttChannel { } + public long generateId(MqttMessageType type, Integer messageId) { + return Long.parseLong(connection.channel().id().asLongText()) << 5 | (long) type.value() << 4 | messageId; + } + + /** * 写入消息 * @@ -191,7 +201,7 @@ public class MqttChannel { if (this.getIsMock() && !this.active()) { return Mono.empty(); } else { - return MqttMessageSink.MQTT_SINK.sendMessage(mqttMessage, this, retry, replyMqttMessageMap); + return MqttMessageSink.MQTT_SINK.sendMessage(mqttMessage, this, retry); } } @@ -251,7 +261,7 @@ public class MqttChannel { public static MqttMessageSink MQTT_SINK = new MqttMessageSink(); - public Mono sendMessage(MqttMessage mqttMessage, MqttChannel mqttChannel, boolean retry, Map> replyMqttMessageMap) { + public Mono sendMessage(MqttMessage mqttMessage, MqttChannel mqttChannel, boolean retry) { if (log.isDebugEnabled()) { log.debug("write channel {} message {}", mqttChannel.getConnection(), mqttMessage); } @@ -260,7 +270,10 @@ public class MqttChannel { Increase the reference count of bytebuf, and the reference count of retrybytebuf is 2 mqttChannel.write() method releases a reference count. */ - return mqttChannel.write(Mono.just(mqttMessage)).then(offerReply(getReplyMqttMessage(mqttMessage), mqttChannel, getMessageId(mqttMessage), replyMqttMessageMap)); + Runnable runnable = () -> mqttChannel.write(Mono.just(mqttMessage)).subscribe(); + Ack ack = new RetryAck(mqttChannel.generateId(mqttMessage.fixedHeader().messageType(), getMessageId(mqttMessage)), 5, 5, runnable, mqttChannel.getTimeAckManager()); + ack.start(); + return mqttChannel.write(Mono.just(mqttMessage)).then(); } else { return mqttChannel.write(Mono.just(mqttMessage)); } @@ -318,38 +331,39 @@ public class MqttChannel { } - /** - * Set resend action - * - * @param message {@link MqttMessage} - * @param mqttChannel {@link MqttChannel} - * @param messageId messageId - * @param replyMqttMessageMap 重试缓存 - * @return 空操作符 - */ - public Mono offerReply(MqttMessage message, final MqttChannel mqttChannel, final int messageId, Map> replyMqttMessageMap) { - return Mono.fromRunnable(() -> - replyMqttMessageMap.computeIfAbsent(message.fixedHeader().messageType(), mqttMessageType -> new ConcurrentHashMap<>(8)).put(messageId, - mqttChannel.write(Mono.fromCallable(() -> getDupMessage(message))) - .delaySubscription(Duration.ofSeconds(5)) - .repeat(10,mqttChannel::isActive) - .doOnError(error -> { - MessageUtils.safeRelease(message); - log.error("offerReply", error); - }) - .doOnCancel(() -> MessageUtils.safeRelease(message)) - .subscribe())); - } +// /** +// * Set resend action +// * +// * @param message {@link MqttMessage} +// * @param mqttChannel {@link MqttChannel} +// * @param messageId messageId +// * @param replyMqttMessageMap 重试缓存 +// * @return 空操作符 +// */ +// public Mono offerReply(MqttMessage message, final MqttChannel mqttChannel, final int messageId, Map> replyMqttMessageMap) { +// return Mono.fromRunnable(() -> +// replyMqttMessageMap.computeIfAbsent(message.fixedHeader().messageType(), mqttMessageType -> new ConcurrentHashMap<>(8)).put(messageId, +// mqttChannel.write(Mono.fromCallable(() -> getDupMessage(message))) +// .delaySubscription(Duration.ofSeconds(5)) +// .repeat(10,mqttChannel::isActive) +// .doOnError(error -> { +// MessageUtils.safeRelease(message); +// log.error("offerReply", error); +// }) +// .doOnCancel(() -> MessageUtils.safeRelease(message)) +// .subscribe())); +// } } @Override public String toString() { return "MqttChannel{" + -// " address='" + this.connection.address().toString() + '\'' + + " address='" + this.connection.address().toString() + '\'' + ", clientIdentifier='" + clientIdentifier + '\'' + ", status=" + status + ", keepalive=" + keepalive + ", username='" + username + '}'; } + } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java b/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java index e351c197..21a062f5 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java @@ -1,5 +1,6 @@ package io.github.quickmsg.common.context; +import io.github.quickmsg.common.ack.TimeAckManager; import io.github.quickmsg.common.channel.ChannelRegistry; import io.github.quickmsg.common.channel.MqttChannel; import io.github.quickmsg.common.channel.traffic.TrafficHandlerLoader; @@ -99,5 +100,13 @@ public interface ReceiveContext extends BiConsumer implements private final TrafficHandlerLoader trafficHandlerLoader; + private final TimeAckManager timeAckManager; + public AbstractReceiveContext(T configuration, Transport transport) { AbstractConfiguration abstractConfiguration = castConfiguration(configuration); @@ -94,6 +98,7 @@ public abstract class AbstractReceiveContext implements this.messageRegistry.startUp(abstractConfiguration.getEnvironmentMap()); this.metricManager = metricManager(abstractConfiguration.getMeterConfig()); Optional.ofNullable(abstractConfiguration.getSourceDefinitions()).ifPresent(sourceDefinitions -> sourceDefinitions.forEach(SourceManager::loadSource)); + this.timeAckManager = new TimeAckManager(20, TimeUnit.MILLISECONDS,512); } private TrafficHandlerLoader trafficHandlerLoader() { diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java index 49d07140..f302dae0 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java @@ -44,7 +44,7 @@ public class MqttReceiver extends AbstractSslHandler implements Receiver { .addHandler(MqttEncoder.INSTANCE) .addHandler(new MqttDecoder(mqttConfiguration.getMessageMaxSize())) .addHandler(receiveContext.getTrafficHandlerLoader().get()); - receiveContext.apply(MqttChannel.init(connection)); + receiveContext.apply(MqttChannel.init(connection,receiveContext.getTimeAckManager())); }); } } \ No newline at end of file diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java index a092240f..237135ab 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java @@ -1,5 +1,6 @@ package io.github.quickmsg.core.protocol; +import io.github.quickmsg.common.ack.Ack; import io.github.quickmsg.common.channel.MqttChannel; import io.github.quickmsg.common.context.ReceiveContext; import io.github.quickmsg.common.enums.ChannelStatus; @@ -24,6 +25,7 @@ import reactor.util.context.ContextView; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -48,6 +50,7 @@ public class CommonProtocol implements Protocol { @Override public Mono parseProtocol(SmqttMessage smqttMessage, MqttChannel mqttChannel, ContextView contextView) { + ReceiveContext receiveContext = contextView.get(ReceiveContext.class); MqttMessage message = smqttMessage.getMessage(); switch (message.fixedHeader().messageType()) { case PINGREQ: @@ -64,8 +67,10 @@ public class CommonProtocol implements Protocol { case PUBREC: MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) message.variableHeader(); int messageId = messageIdVariableHeader.messageId(); - return mqttChannel.cancelRetry(MqttMessageType.PUBLISH, messageId) - .then(mqttChannel.write(MqttMessageBuilder.buildPublishRel(messageId), true)); + return Mono.fromRunnable(() -> { + Optional.ofNullable(receiveContext.getTimeAckManager().getAck(mqttChannel.generateId(MqttMessageType.PUBLISH, messageId))) + .ifPresent(Ack::stop); + }).then(mqttChannel.write(MqttMessageBuilder.buildPublishRel(messageId), true)); case PUBREL: MqttMessageIdVariableHeader relMessageIdVariableHeader = (MqttMessageIdVariableHeader) message.variableHeader(); int id = relMessageIdVariableHeader.messageId(); @@ -76,25 +81,30 @@ public class CommonProtocol implements Protocol { */ return mqttChannel.removeQos2Msg(id) .map(msg -> { - ReceiveContext receiveContext = contextView.get(ReceiveContext.class); TopicRegistry topicRegistry = receiveContext.getTopicRegistry(); MessageRegistry messageRegistry = receiveContext.getMessageRegistry(); Set subscribeTopics = topicRegistry.getSubscribesByTopic(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel()); return Mono.when( - subscribeTopics.stream() - .filter(subscribeTopic -> filterOfflineSession(subscribeTopic.getMqttChannel(), messageRegistry, MessageUtils.wrapPublishMessage(msg, subscribeTopic.getQoS(), subscribeTopic.getMqttChannel().generateMessageId()))) - .map(subscribeTopic -> subscribeTopic.getMqttChannel() - .write(MessageUtils.wrapPublishMessage(msg, subscribeTopic.getQoS(), - subscribeTopic.getMqttChannel().generateMessageId()), subscribeTopic.getQoS().value() > 0) - ).collect(Collectors.toList())) - .then(mqttChannel.cancelRetry(MqttMessageType.PUBREC, id)) + subscribeTopics.stream() + .filter(subscribeTopic -> filterOfflineSession(subscribeTopic.getMqttChannel(), messageRegistry, MessageUtils.wrapPublishMessage(msg, subscribeTopic.getQoS(), subscribeTopic.getMqttChannel().generateMessageId()))) + .map(subscribeTopic -> subscribeTopic.getMqttChannel() + .write(MessageUtils.wrapPublishMessage(msg, subscribeTopic.getQoS(), + subscribeTopic.getMqttChannel().generateMessageId()), subscribeTopic.getQoS().value() > 0) + ).collect(Collectors.toList())) + .then(Mono.fromRunnable(() -> { + Optional.ofNullable(receiveContext.getTimeAckManager().getAck(mqttChannel.generateId(MqttMessageType.PUBREC, id))) + .ifPresent(Ack::stop); + })) .then(mqttChannel.write(MqttMessageBuilder.buildPublishComp(id), false)); }).orElseGet(() -> mqttChannel.write(MqttMessageBuilder.buildPublishComp(id), false)); case PUBCOMP: MqttMessageIdVariableHeader messageIdVariableHeader1 = (MqttMessageIdVariableHeader) message.variableHeader(); int compId = messageIdVariableHeader1.messageId(); - return mqttChannel.cancelRetry(MqttMessageType.PUBREL, compId); + return Mono.fromRunnable(() -> { + Optional.ofNullable(receiveContext.getTimeAckManager().getAck(mqttChannel.generateId(MqttMessageType.PUBREL, compId))) + .ifPresent(Ack::stop); + }); case PINGRESP: default: return Mono.empty(); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java index cae7f290..8d7e6b9d 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java @@ -1,6 +1,8 @@ package io.github.quickmsg.core.protocol; +import io.github.quickmsg.common.ack.Ack; import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.context.ReceiveContext; import io.github.quickmsg.common.message.SmqttMessage; import io.github.quickmsg.common.protocol.Protocol; import io.netty.handler.codec.mqtt.MqttConnectMessage; @@ -12,6 +14,7 @@ import reactor.util.context.ContextView; import java.util.ArrayList; import java.util.List; +import java.util.Optional; /** * @author luxurong @@ -28,10 +31,14 @@ public class PublishAckProtocol implements Protocol { @Override public Mono parseProtocol(SmqttMessage smqttMessage, MqttChannel mqttChannel, ContextView contextView) { - MqttPubAckMessage message = smqttMessage.getMessage(); - MqttMessageIdVariableHeader idVariableHeader = message.variableHeader(); - int messageId = idVariableHeader.messageId(); - return mqttChannel.cancelRetry(MqttMessageType.PUBLISH,messageId); + return Mono.fromRunnable(()->{ + ReceiveContext receiveContext = contextView.get(ReceiveContext.class); + MqttPubAckMessage message = smqttMessage.getMessage(); + MqttMessageIdVariableHeader idVariableHeader = message.variableHeader(); + int messageId = idVariableHeader.messageId(); + Optional.ofNullable(receiveContext.getTimeAckManager().getAck(mqttChannel.generateId(MqttMessageType.PUBLISH,messageId))) + .ifPresent(Ack::stop); + }); } @Override diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/websocket/WebSocketMqttReceiver.java b/smqtt-core/src/main/java/io/github/quickmsg/core/websocket/WebSocketMqttReceiver.java index 3f7da321..5f350a3c 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/websocket/WebSocketMqttReceiver.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/websocket/WebSocketMqttReceiver.java @@ -53,7 +53,7 @@ public class WebSocketMqttReceiver extends AbstractSslHandler implements Receive .addHandlerLast(new ByteBufToWebSocketFrameEncoder()) .addHandlerLast(new MqttDecoder(mqttConfiguration.getMessageMaxSize())) .addHandlerLast(MqttEncoder.INSTANCE); - receiveContext.apply(MqttChannel.init(connection)); + receiveContext.apply(MqttChannel.init(connection, receiveContext.getTimeAckManager())); }); } -- Gitee From 8e3afc1c5c0197477ab3d4fc4f3c75dff407108f Mon Sep 17 00:00:00 2001 From: luxurong Date: Tue, 21 Dec 2021 21:54:19 +0800 Subject: [PATCH 07/14] ack --- .../src/main/java/io/github/quickmsg/common/ack/AbsAck.java | 2 ++ .../java/io/github/quickmsg/common/channel/MqttChannel.java | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java index bb5b461b..f6d35a53 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java @@ -34,6 +34,7 @@ public abstract class AbsAck implements Ack { public void run(Timeout timeout) throws Exception { if (++count <= maxRetrySize+1 && !died ) { try { + log.info("task retry send ..........."); runnable.run(); ackManager.addAck(this); } catch (Exception e) { @@ -46,6 +47,7 @@ public abstract class AbsAck implements Ack { @Override public void stop() { died = true; + log.info("retry task stop ..........."); ackManager.deleteAck(getId()); } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java index 9a6ef40b..8553f278 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java @@ -185,7 +185,7 @@ public class MqttChannel { public long generateId(MqttMessageType type, Integer messageId) { - return Long.parseLong(connection.channel().id().asLongText()) << 5 | (long) type.value() << 4 | messageId; + return (long) connection.channel().hashCode() << 5 | (long) type.value() << 4 | messageId; } -- Gitee From 405881b236e5c932a06f3937a0046add56b88601 Mon Sep 17 00:00:00 2001 From: luxurong Date: Tue, 21 Dec 2021 23:38:13 +0800 Subject: [PATCH 08/14] ack --- .../java/io/github/quickmsg/common/ack/AbsAck.java | 13 ++++++++++++- .../io/github/quickmsg/common/ack/RetryAck.java | 7 +++++-- .../github/quickmsg/common/channel/MqttChannel.java | 13 ++++++++++++- 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java index f6d35a53..0922f439 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java @@ -4,6 +4,7 @@ import io.netty.util.Timeout; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * @author luxurong @@ -23,11 +24,17 @@ public abstract class AbsAck implements Ack { private final int period; - protected AbsAck(int maxRetrySize, int period, Runnable runnable, AckManager ackManager) { + + private final Consumer consumer; + + + + protected AbsAck(int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Consumer consumer) { this.maxRetrySize = maxRetrySize; this.period = period; this.runnable = runnable; this.ackManager = ackManager; + this.consumer= consumer; } @Override @@ -35,6 +42,7 @@ public abstract class AbsAck implements Ack { if (++count <= maxRetrySize+1 && !died ) { try { log.info("task retry send ..........."); + consumer.accept(false); runnable.run(); ackManager.addAck(this); } catch (Exception e) { @@ -42,6 +50,9 @@ public abstract class AbsAck implements Ack { } } + else { + consumer.accept(true); + } } @Override diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java index 4ef22809..7f07eda9 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java @@ -1,5 +1,7 @@ package io.github.quickmsg.common.ack; +import java.util.function.Consumer; + /** * @author luxurong */ @@ -8,8 +10,9 @@ public class RetryAck extends AbsAck { private final long id; - public RetryAck(long id, int maxRetrySize, int period, Runnable runnable, AckManager ackManager) { - super(maxRetrySize, period, runnable, ackManager); + + public RetryAck(long id, int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Consumer consumer) { + super(maxRetrySize, period, runnable, ackManager,consumer); this.id = id; } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java index 8553f278..f82f81df 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java @@ -6,6 +6,7 @@ import io.github.quickmsg.common.ack.RetryAck; import io.github.quickmsg.common.ack.TimeAckManager; import io.github.quickmsg.common.enums.ChannelStatus; import io.github.quickmsg.common.topic.SubscribeTopic; +import io.github.quickmsg.common.utils.MessageUtils; import io.netty.handler.codec.mqtt.*; import lombok.Builder; import lombok.Data; @@ -271,7 +272,17 @@ public class MqttChannel { mqttChannel.write() method releases a reference count. */ Runnable runnable = () -> mqttChannel.write(Mono.just(mqttMessage)).subscribe(); - Ack ack = new RetryAck(mqttChannel.generateId(mqttMessage.fixedHeader().messageType(), getMessageId(mqttMessage)), 5, 5, runnable, mqttChannel.getTimeAckManager()); + Consumer consumer = bool -> { + if (bool) { + MessageUtils.safeRelease(mqttMessage); + } else { + if (mqttMessage instanceof MqttPublishMessage) { + ((MqttPublishMessage) mqttMessage).retain(); + } + } + }; + Ack ack = new RetryAck(mqttChannel.generateId(mqttMessage.fixedHeader().messageType(), getMessageId(mqttMessage)), + 5, 5, runnable, mqttChannel.getTimeAckManager(), consumer); ack.start(); return mqttChannel.write(Mono.just(mqttMessage)).then(); } else { -- Gitee From e2bf158a82d8383c6a94e4aabcf35a8fd2c17bf9 Mon Sep 17 00:00:00 2001 From: luxurong Date: Tue, 21 Dec 2021 23:49:54 +0800 Subject: [PATCH 09/14] ack --- .../io/github/quickmsg/common/ack/AbsAck.java | 9 ++++----- .../github/quickmsg/common/ack/RetryAck.java | 4 +--- .../quickmsg/common/channel/MqttChannel.java | 19 ++++++------------- 3 files changed, 11 insertions(+), 21 deletions(-) diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java index 0922f439..63c5fa5a 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java @@ -25,16 +25,16 @@ public abstract class AbsAck implements Ack { private final int period; - private final Consumer consumer; + private final Runnable cleaner; - protected AbsAck(int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Consumer consumer) { + protected AbsAck(int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Runnable cleaner) { this.maxRetrySize = maxRetrySize; this.period = period; this.runnable = runnable; this.ackManager = ackManager; - this.consumer= consumer; + this.cleaner= cleaner; } @Override @@ -42,7 +42,6 @@ public abstract class AbsAck implements Ack { if (++count <= maxRetrySize+1 && !died ) { try { log.info("task retry send ..........."); - consumer.accept(false); runnable.run(); ackManager.addAck(this); } catch (Exception e) { @@ -51,7 +50,7 @@ public abstract class AbsAck implements Ack { } else { - consumer.accept(true); + cleaner.run(); } } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java index 7f07eda9..ef4ca1c8 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java @@ -1,7 +1,5 @@ package io.github.quickmsg.common.ack; -import java.util.function.Consumer; - /** * @author luxurong */ @@ -11,7 +9,7 @@ public class RetryAck extends AbsAck { private final long id; - public RetryAck(long id, int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Consumer consumer) { + public RetryAck(long id, int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Runnable consumer) { super(maxRetrySize, period, runnable, ackManager,consumer); this.id = id; } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java index f82f81df..c06cc08c 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java @@ -271,18 +271,12 @@ public class MqttChannel { Increase the reference count of bytebuf, and the reference count of retrybytebuf is 2 mqttChannel.write() method releases a reference count. */ - Runnable runnable = () -> mqttChannel.write(Mono.just(mqttMessage)).subscribe(); - Consumer consumer = bool -> { - if (bool) { - MessageUtils.safeRelease(mqttMessage); - } else { - if (mqttMessage instanceof MqttPublishMessage) { - ((MqttPublishMessage) mqttMessage).retain(); - } - } - }; - Ack ack = new RetryAck(mqttChannel.generateId(mqttMessage.fixedHeader().messageType(), getMessageId(mqttMessage)), - 5, 5, runnable, mqttChannel.getTimeAckManager(), consumer); + MqttMessage reply = getReplyMqttMessage(mqttMessage); + + Runnable runnable = () -> mqttChannel.write(Mono.just(reply)).subscribe(); + Runnable cleaner = () -> MessageUtils.safeRelease(reply);; + Ack ack = new RetryAck(mqttChannel.generateId(reply.fixedHeader().messageType(), getMessageId(reply)), + 5, 5, runnable, mqttChannel.getTimeAckManager(), cleaner); ack.start(); return mqttChannel.write(Mono.just(mqttMessage)).then(); } else { @@ -308,7 +302,6 @@ public class MqttChannel { } else { return mqttMessage; } - } -- Gitee From e733bef26118c15d42c7cc104bcaa07774832958 Mon Sep 17 00:00:00 2001 From: Easy <1091927336@qq.com> Date: Wed, 22 Dec 2021 09:05:55 +0800 Subject: [PATCH 10/14] json --- .../quickmsg/common/message/HeapMqttMessage.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java index ac27b4a7..eb5d1a5c 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java @@ -1,6 +1,7 @@ package io.github.quickmsg.common.message; import io.github.quickmsg.common.utils.JacksonUtil; +import io.netty.util.internal.StringUtil; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -44,8 +45,17 @@ public class HeapMqttMessage { private Object getJsonObject(String body) { if (body.startsWith("{") && body.endsWith("}")) { return JacksonUtil.json2Bean(body, JsonMap.class); + } else if (body.startsWith("[") && body.endsWith("]")) { + return JacksonUtil.json2List(body, JsonMap.class); } else { - return body; + if (StringUtil.isNullOrEmpty(body)) { + return body; + } + + if (body.startsWith("\"") && body.endsWith("\"")) { + return body; + } + return "\"" + body + "\""; } } } -- Gitee From 5f9d79f32f6afb344d6fe2f86d304cf305c8b7a9 Mon Sep 17 00:00:00 2001 From: luxurong Date: Thu, 23 Dec 2021 00:00:05 +0800 Subject: [PATCH 11/14] spring auth --- .../quickmsg/common/config/BootstrapConfig.java | 9 +++++++++ .../quickmsg/common/config/ConnectModel.java | 9 +++++++++ .../java/io/github/quickmsg/core/Bootstrap.java | 11 ++++++++--- .../quickmsg/starter/AutoMqttConfiguration.java | 16 ++++++++++++++-- 4 files changed, 40 insertions(+), 5 deletions(-) create mode 100644 smqtt-common/src/main/java/io/github/quickmsg/common/config/ConnectModel.java diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java index 806bb646..7bb7ccfe 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java @@ -2,6 +2,7 @@ package io.github.quickmsg.common.config; import ch.qos.logback.classic.Level; import com.fasterxml.jackson.annotation.JsonProperty; +import io.github.quickmsg.common.auth.PasswordAuthentication; import io.github.quickmsg.common.metric.MeterType; import io.github.quickmsg.common.rule.RuleChainDefinition; import io.github.quickmsg.common.rule.source.SourceDefinition; @@ -110,6 +111,9 @@ public class BootstrapConfig { @NoArgsConstructor @AllArgsConstructor public static class TcpConfig { + + @Builder.Default + private ConnectModel connectModel = ConnectModel.UNIQUE; /** * 端口 */ @@ -186,6 +190,11 @@ public class BootstrapConfig { */ Map childOptions; + /** + * PasswordAuthentication + */ + PasswordAuthentication authentication; + } @Data diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/ConnectModel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/ConnectModel.java new file mode 100644 index 00000000..2a703fde --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/ConnectModel.java @@ -0,0 +1,9 @@ +package io.github.quickmsg.common.config; + +/** + * @author luxurong + */ +public enum ConnectModel { + UNIQUE, + KICK +} diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java index 00e2047a..a8acf036 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java @@ -5,7 +5,6 @@ import io.github.quickmsg.common.config.BootstrapConfig; import io.github.quickmsg.common.config.SslContext; import io.github.quickmsg.common.rule.RuleChainDefinition; import io.github.quickmsg.common.rule.source.SourceDefinition; -import io.github.quickmsg.common.spi.DynamicLoader; import io.github.quickmsg.common.transport.Transport; import io.github.quickmsg.common.utils.BannerUtils; import io.github.quickmsg.common.utils.LoggerLevel; @@ -68,8 +67,14 @@ public class Bootstrap { private MqttConfiguration initMqttConfiguration() { MqttConfiguration mqttConfiguration = defaultConfiguration(); - if (tcpConfig.getUsername() != null || tcpConfig.getPassword() != null) { - mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> user.equals(tcpConfig.getUsername()) && new String(pwd).equals(tcpConfig.getPassword())); + if (tcpConfig.getAuthentication() != null) { + mqttConfiguration.setReactivePasswordAuth(tcpConfig.getAuthentication()); + } else { + if (tcpConfig.getUsername() != null || tcpConfig.getPassword() != null) { + mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> user.equals(tcpConfig.getUsername()) && new String(pwd).equals(tcpConfig.getPassword())); + } else { + mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> true); + } } Optional.ofNullable(tcpConfig.getPort()).ifPresent(mqttConfiguration::setPort); Optional.ofNullable(tcpConfig.getLowWaterMark()).ifPresent(mqttConfiguration::setLowWaterMark); diff --git a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java index 2186cce6..60132949 100644 --- a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java +++ b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java @@ -1,6 +1,8 @@ package io.github.quickmsg.starter; import ch.qos.logback.classic.Level; +import io.github.quickmsg.common.auth.PasswordAuthentication; +import io.github.quickmsg.common.config.ConnectModel; import io.github.quickmsg.common.utils.IPUtils; import io.github.quickmsg.core.Bootstrap; import lombok.extern.slf4j.Slf4j; @@ -15,7 +17,7 @@ import org.springframework.context.annotation.Configuration; @Slf4j @Configuration @EnableConfigurationProperties(SpringBootstrapConfig.class) -public class AutoMqttConfiguration { +public class AutoMqttConfiguration { /** @@ -25,7 +27,8 @@ public class AutoMqttConfiguration { * @return {@link Bootstrap} */ @Bean - public Bootstrap startServer(@Autowired SpringBootstrapConfig springBootstrapConfig) { + public Bootstrap startServer(@Autowired SpringBootstrapConfig springBootstrapConfig, @Autowired(required = false) PasswordAuthentication authentication) { + check(springBootstrapConfig,authentication); return Bootstrap.builder() .rootLevel(Level.toLevel(springBootstrapConfig.getLogLevel())) .tcpConfig(springBootstrapConfig.getTcp()) @@ -42,6 +45,15 @@ public class AutoMqttConfiguration { .doOnSuccess(this::printUiUrl).block(); } + private void check(SpringBootstrapConfig springBootstrapConfig, PasswordAuthentication authentication) { + if(springBootstrapConfig.getTcp().getConnectModel() == null){ + springBootstrapConfig.getTcp().setConnectModel(ConnectModel.UNIQUE); + } + if(authentication !=null){ + springBootstrapConfig.getTcp().setAuthentication(authentication); + } + } + public void printUiUrl(Bootstrap bootstrap) { String start = "\n-------------------------------------------------------------\n\t"; start += String.format("Smqtt mqtt connect url %s:%s \n\t", IPUtils.getIP(), bootstrap.getTcpConfig().getPort()); -- Gitee From 5ba3b06f2ac732752550812016cefa65ee0f12d2 Mon Sep 17 00:00:00 2001 From: luxurong Date: Thu, 23 Dec 2021 21:59:20 +0800 Subject: [PATCH 12/14] connect model --- .../quickmsg/common/config/Configuration.java | 2 ++ .../java/io/github/quickmsg/core/Bootstrap.java | 1 + .../quickmsg/core/http/HttpConfiguration.java | 6 ++++++ .../quickmsg/core/mqtt/MqttConfiguration.java | 3 +++ .../quickmsg/core/protocol/ConnectProtocol.java | 14 ++++++++++---- 5 files changed, 22 insertions(+), 4 deletions(-) diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java index a7a27725..ca994d2b 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java @@ -5,6 +5,8 @@ package io.github.quickmsg.common.config; */ public interface Configuration { + ConnectModel getConnectModel(); + /** * netty boss线程数 diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java index a8acf036..304178d8 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java @@ -76,6 +76,7 @@ public class Bootstrap { mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> true); } } + Optional.ofNullable(tcpConfig.getConnectModel()).ifPresent(mqttConfiguration::setConnectModel); Optional.ofNullable(tcpConfig.getPort()).ifPresent(mqttConfiguration::setPort); Optional.ofNullable(tcpConfig.getLowWaterMark()).ifPresent(mqttConfiguration::setLowWaterMark); Optional.ofNullable(tcpConfig.getHighWaterMark()).ifPresent(mqttConfiguration::setHighWaterMark); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java index 9ded36e2..12fc2b00 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java @@ -2,6 +2,7 @@ package io.github.quickmsg.core.http; import io.github.quickmsg.common.config.BootstrapConfig; import io.github.quickmsg.common.config.Configuration; +import io.github.quickmsg.common.config.ConnectModel; import io.github.quickmsg.common.config.SslContext; import lombok.Data; @@ -37,6 +38,11 @@ public class HttpConfiguration implements Configuration { private BootstrapConfig.MeterConfig meterConfig; + @Override + public ConnectModel getConnectModel() { + return null; + } + @Override public Integer getBusinessThreadSize() { return 0; diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java index 1cd1552c..0ac9299c 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java @@ -3,6 +3,7 @@ package io.github.quickmsg.core.mqtt; import io.github.quickmsg.common.auth.PasswordAuthentication; import io.github.quickmsg.common.config.AbstractConfiguration; import io.github.quickmsg.common.config.BootstrapConfig; +import io.github.quickmsg.common.config.ConnectModel; import io.github.quickmsg.common.config.SslContext; import io.github.quickmsg.common.rule.RuleChainDefinition; import io.github.quickmsg.common.rule.source.SourceDefinition; @@ -34,6 +35,8 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon private SslContext sslContext; + private ConnectModel connectModel; + private PasswordAuthentication reactivePasswordAuth = (u, p, c) -> true; private Integer bossThreadSize = Runtime.getRuntime().availableProcessors(); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java index cf17efd5..3790b8a4 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java @@ -3,6 +3,7 @@ package io.github.quickmsg.core.protocol; import io.github.quickmsg.common.auth.PasswordAuthentication; import io.github.quickmsg.common.channel.ChannelRegistry; import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.config.ConnectModel; import io.github.quickmsg.common.context.ReceiveContext; import io.github.quickmsg.common.enums.ChannelStatus; import io.github.quickmsg.common.enums.Event; @@ -64,10 +65,15 @@ public class ConnectProtocol implements Protocol { MetricManager metricManager = mqttReceiveContext.getMetricManager(); PasswordAuthentication passwordAuthentication = mqttReceiveContext.getPasswordAuthentication(); /*check clientIdentifier exist*/ - if (channelRegistry.exists(clientIdentifier)) { - return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), - false).then(mqttChannel.close()); + if (mqttReceiveContext.getConfiguration().getConnectModel() == ConnectModel.UNIQUE) { + if (channelRegistry.exists(clientIdentifier)) { + return mqttChannel.write( + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), + false).then(mqttChannel.close()); + } + } else { + MqttChannel oldMqttChannel = channelRegistry.get(clientIdentifier); + oldMqttChannel.close().subscribe(); } /*protocol version support*/ if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version() -- Gitee From b4c5cd6b6c49fb09909b18799ad9dff1a7d803a0 Mon Sep 17 00:00:00 2001 From: luxurong Date: Thu, 23 Dec 2021 22:05:22 +0800 Subject: [PATCH 13/14] connect model --- config/config.yaml | 5 +++-- .../io/github/quickmsg/core/protocol/ConnectProtocol.java | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 3823b048..ef1618b3 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,6 +1,7 @@ smqtt: logLevel: INFO # 系统日志 tcp: # tcp配置 + connectModel: UNIQUE # UNIQUE 唯一 KICK 互踢 port: 1883 # mqtt端口号 username: smqtt # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口 password: smqtt # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口 @@ -21,7 +22,7 @@ smqtt: key: /user/server.key # 指定ssl文件 默认系统生成 crt: /user/server.crt # 指定ssl文件 默认系统生成 http: # http相关配置 端口固定60000 - enable: true # 开关 + enable: false # 开关 accessLog: true # http访问日志 ssl: # ssl配置 enable: false @@ -30,7 +31,7 @@ smqtt: username: smqtt # 访问用户名 password: smqtt # 访问密码 ws: # websocket配置 - enable: true # 开关 + enable: false # 开关 port: 8999 # 端口 path: /mqtt # ws 的访问path mqtt.js请设置此选项 cluster: # 集群配置 diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java index 3790b8a4..6d17c31a 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java @@ -72,8 +72,8 @@ public class ConnectProtocol implements Protocol { false).then(mqttChannel.close()); } } else { - MqttChannel oldMqttChannel = channelRegistry.get(clientIdentifier); - oldMqttChannel.close().subscribe(); + Optional.ofNullable( channelRegistry.get(clientIdentifier)) + .ifPresent(ch->ch.close().subscribe()); } /*protocol version support*/ if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version() -- Gitee From ef129a8857749a5c7f171f13986ef4ff7dfe89aa Mon Sep 17 00:00:00 2001 From: luxurong Date: Thu, 23 Dec 2021 22:07:58 +0800 Subject: [PATCH 14/14] version --- pom.xml | 2 +- smqtt-bootstrap/pom.xml | 10 +++++----- smqtt-common/pom.xml | 2 +- smqtt-core/pom.xml | 10 +++++----- smqtt-metric/pom.xml | 2 +- smqtt-metric/smqtt-metric-influxdb/pom.xml | 4 ++-- smqtt-metric/smqtt-metric-prometheus/pom.xml | 4 ++-- smqtt-persistent/pom.xml | 2 +- smqtt-persistent/smqtt-persistent-db/pom.xml | 6 +++--- smqtt-persistent/smqtt-persistent-redis/pom.xml | 6 +++--- smqtt-registry/pom.xml | 2 +- smqtt-registry/smqtt-registry-ignite/pom.xml | 2 +- smqtt-registry/smqtt-registry-scube/pom.xml | 4 ++-- smqtt-rule/pom.xml | 2 +- smqtt-rule/smqtt-rule-dsl/pom.xml | 6 +++--- smqtt-rule/smqtt-rule-engine/pom.xml | 16 ++++++++-------- smqtt-rule/smqtt-rule-source/pom.xml | 4 ++-- .../smqtt-rule-source-db/pom.xml | 4 ++-- .../smqtt-rule-source-http/pom.xml | 6 +++--- .../smqtt-rule-source-kafka/pom.xml | 4 ++-- .../smqtt-rule-source-mqtt/pom.xml | 4 ++-- .../smqtt-rule-source-rabbitmq/pom.xml | 4 ++-- .../smqtt-rule-source-rocketmq/pom.xml | 4 ++-- smqtt-spring-boot-starter/pom.xml | 8 ++++---- smqtt-ui/pom.xml | 2 +- 25 files changed, 60 insertions(+), 60 deletions(-) diff --git a/pom.xml b/pom.xml index 24644d0f..2cfcc5b2 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 io.github.quickmsg smqtt - 1.1.0 + 1.1.1 smqtt-common smqtt-core diff --git a/smqtt-bootstrap/pom.xml b/smqtt-bootstrap/pom.xml index 7128c560..9515123b 100644 --- a/smqtt-bootstrap/pom.xml +++ b/smqtt-bootstrap/pom.xml @@ -7,10 +7,10 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-bootstrap - 1.1.0 + 1.1.1 smqtt-bootstrap http://www.example.com @@ -45,17 +45,17 @@ io.github.quickmsg smqtt-core - 1.1.0 + 1.1.1 smqtt-registry-scube io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-ui io.github.quickmsg - 1.1.0 + 1.1.1 diff --git a/smqtt-common/pom.xml b/smqtt-common/pom.xml index d00dca93..57e3ca3a 100644 --- a/smqtt-common/pom.xml +++ b/smqtt-common/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 jar diff --git a/smqtt-core/pom.xml b/smqtt-core/pom.xml index fc094296..779e030f 100644 --- a/smqtt-core/pom.xml +++ b/smqtt-core/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 smqtt-core @@ -14,22 +14,22 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-rule-dsl - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-metric-influxdb - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-metric-prometheus - 1.1.0 + 1.1.1 diff --git a/smqtt-metric/pom.xml b/smqtt-metric/pom.xml index 4bb32c80..cde0fe2a 100644 --- a/smqtt-metric/pom.xml +++ b/smqtt-metric/pom.xml @@ -8,7 +8,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-metric diff --git a/smqtt-metric/smqtt-metric-influxdb/pom.xml b/smqtt-metric/smqtt-metric-influxdb/pom.xml index 74e7dc5a..c16cf20b 100644 --- a/smqtt-metric/smqtt-metric-influxdb/pom.xml +++ b/smqtt-metric/smqtt-metric-influxdb/pom.xml @@ -5,7 +5,7 @@ smqtt-metric io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 @@ -15,7 +15,7 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 diff --git a/smqtt-metric/smqtt-metric-prometheus/pom.xml b/smqtt-metric/smqtt-metric-prometheus/pom.xml index 3b84a9a5..86aad8e4 100644 --- a/smqtt-metric/smqtt-metric-prometheus/pom.xml +++ b/smqtt-metric/smqtt-metric-prometheus/pom.xml @@ -5,7 +5,7 @@ smqtt-metric io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 @@ -19,7 +19,7 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 diff --git a/smqtt-persistent/pom.xml b/smqtt-persistent/pom.xml index 1d30ce8d..13c67c73 100644 --- a/smqtt-persistent/pom.xml +++ b/smqtt-persistent/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 pom diff --git a/smqtt-persistent/smqtt-persistent-db/pom.xml b/smqtt-persistent/smqtt-persistent-db/pom.xml index 370e12fd..025a1325 100644 --- a/smqtt-persistent/smqtt-persistent-db/pom.xml +++ b/smqtt-persistent/smqtt-persistent-db/pom.xml @@ -5,12 +5,12 @@ smqtt-persistent io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 smqtt-persistent-db - 1.1.0 + 1.1.1 3.14.11 @@ -20,7 +20,7 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 compile diff --git a/smqtt-persistent/smqtt-persistent-redis/pom.xml b/smqtt-persistent/smqtt-persistent-redis/pom.xml index 022d243f..9124bac6 100644 --- a/smqtt-persistent/smqtt-persistent-redis/pom.xml +++ b/smqtt-persistent/smqtt-persistent-redis/pom.xml @@ -5,12 +5,12 @@ smqtt-persistent io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 smqtt-persistent-redis - 1.1.0 + 1.1.1 3.15.6 @@ -20,7 +20,7 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 compile diff --git a/smqtt-registry/pom.xml b/smqtt-registry/pom.xml index fcec21ed..a58f471e 100644 --- a/smqtt-registry/pom.xml +++ b/smqtt-registry/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 pom diff --git a/smqtt-registry/smqtt-registry-ignite/pom.xml b/smqtt-registry/smqtt-registry-ignite/pom.xml index 4742dca3..c35895ec 100644 --- a/smqtt-registry/smqtt-registry-ignite/pom.xml +++ b/smqtt-registry/smqtt-registry-ignite/pom.xml @@ -5,7 +5,7 @@ smqtt-registry io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 diff --git a/smqtt-registry/smqtt-registry-scube/pom.xml b/smqtt-registry/smqtt-registry-scube/pom.xml index ea2d4677..0dbe4f4e 100644 --- a/smqtt-registry/smqtt-registry-scube/pom.xml +++ b/smqtt-registry/smqtt-registry-scube/pom.xml @@ -5,7 +5,7 @@ smqtt-registry io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 smqtt-registry-scube @@ -50,7 +50,7 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 provided diff --git a/smqtt-rule/pom.xml b/smqtt-rule/pom.xml index da4262ac..a1a6380c 100644 --- a/smqtt-rule/pom.xml +++ b/smqtt-rule/pom.xml @@ -7,7 +7,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-rule diff --git a/smqtt-rule/smqtt-rule-dsl/pom.xml b/smqtt-rule/smqtt-rule-dsl/pom.xml index 06d143ac..d663114c 100644 --- a/smqtt-rule/smqtt-rule-dsl/pom.xml +++ b/smqtt-rule/smqtt-rule-dsl/pom.xml @@ -5,7 +5,7 @@ smqtt-rule io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 @@ -15,13 +15,13 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 provided smqtt-rule-engine io.github.quickmsg - 1.1.0 + 1.1.1 diff --git a/smqtt-rule/smqtt-rule-engine/pom.xml b/smqtt-rule/smqtt-rule-engine/pom.xml index 82bb16e9..fadf42c2 100644 --- a/smqtt-rule/smqtt-rule-engine/pom.xml +++ b/smqtt-rule/smqtt-rule-engine/pom.xml @@ -7,7 +7,7 @@ smqtt-rule io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-rule-engine @@ -18,44 +18,44 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 provided io.github.quickmsg smqtt-rule-source-kafka - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-rule-source-http - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-rule-source-rocketmq - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-rule-source-rabbitmq - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-rule-source-db - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-rule-source-mqtt - 1.1.0 + 1.1.1 diff --git a/smqtt-rule/smqtt-rule-source/pom.xml b/smqtt-rule/smqtt-rule-source/pom.xml index 6c46a35e..1153ff46 100644 --- a/smqtt-rule/smqtt-rule-source/pom.xml +++ b/smqtt-rule/smqtt-rule-source/pom.xml @@ -8,7 +8,7 @@ smqtt-rule io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-rule-source @@ -33,7 +33,7 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 provided diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml index 84389977..868a9933 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml @@ -5,13 +5,13 @@ smqtt-rule-source io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 io.github.quickmsg smqtt-rule-source-db - 1.1.0 + 1.1.1 3.14.11 diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml index 2dd03f0f..8895ebb3 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml @@ -6,7 +6,7 @@ io.github.quickmsg smqtt-rule-source-http - 1.1.0 + 1.1.1 smqtt-rule-source-http @@ -15,14 +15,14 @@ smqtt-rule-source io.github.quickmsg - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 provided diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml index 243882f9..78e6ccb4 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml @@ -5,12 +5,12 @@ smqtt-rule-source io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 smqtt-rule-source-kafka - 1.1.0 + 1.1.1 https://github.com/quickmsg/smqtt diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml index 197f89d7..b35e4461 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml @@ -5,13 +5,13 @@ smqtt-rule-source io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 io.github.quickmsg smqtt-rule-source-mqtt - 1.1.0 + 1.1.1 diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml index a63c83a6..92921f22 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml @@ -5,13 +5,13 @@ smqtt-rule-source io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 io.github.quickmsg smqtt-rule-source-rabbitmq - 1.1.0 + 1.1.1 diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml index f03aa491..17161390 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml @@ -5,13 +5,13 @@ smqtt-rule-source io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 io.github.quickmsg smqtt-rule-source-rocketmq - 1.1.0 + 1.1.1 diff --git a/smqtt-spring-boot-starter/pom.xml b/smqtt-spring-boot-starter/pom.xml index 56f05fac..9f2a69b2 100644 --- a/smqtt-spring-boot-starter/pom.xml +++ b/smqtt-spring-boot-starter/pom.xml @@ -7,7 +7,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-spring-boot-starter @@ -34,17 +34,17 @@ io.github.quickmsg smqtt-core - 1.1.0 + 1.1.1 smqtt-registry-scube io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-ui io.github.quickmsg - 1.1.0 + 1.1.1 io.projectreactor.netty diff --git a/smqtt-ui/pom.xml b/smqtt-ui/pom.xml index 54513642..1beb6381 100644 --- a/smqtt-ui/pom.xml +++ b/smqtt-ui/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 smqtt-ui -- Gitee