From 5e71e2ad7229f6d25abd6b19d85ec6fc49f56c66 Mon Sep 17 00:00:00 2001 From: andylo25 Date: Tue, 12 Jun 2018 17:58:34 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0block=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E4=BA=8B=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../blockchain/core/repository/MessageRepository.java | 3 --- .../com/mindata/blockchain/core/sqlite/SqliteManager.java | 6 ++++-- .../com/mindata/blockchain/socket/client/ClientStarter.java | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java b/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java index b81ba06..097c7a1 100644 --- a/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java +++ b/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java @@ -1,7 +1,5 @@ package com.mindata.blockchain.core.repository; -import org.springframework.transaction.annotation.Transactional; - import com.mindata.blockchain.core.model.MessageEntity; /** @@ -12,7 +10,6 @@ public interface MessageRepository extends BaseRepository { * 删除一条记录 * @param messageId messageId */ - @Transactional void deleteByMessageId(String messageId); /** diff --git a/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java b/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java index ca69497..af1d790 100644 --- a/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java +++ b/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java @@ -70,11 +70,13 @@ public class SqliteManager { /** * 根据一个block执行sql - * + * 整个block一个事务 + * * @param block * block */ - private void execute(Block block) { + @Transactional + public void execute(Block block) { List instructions = block.getBlockBody().getInstructions(); //InstructionParserImpl类里面执行的是InstructionBase,需要转成InstructionBase for (Instruction instruction : instructions) { diff --git a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java index b14fc56..43acf29 100644 --- a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java +++ b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java @@ -54,7 +54,7 @@ public class ClientStarter { private String appId; @Value("${name}") private String name; - @Value("${singeNode}") + @Value("${singeNode:false}") private Boolean singeNode; private Logger logger = LoggerFactory.getLogger(getClass()); -- Gitee From f1dc99df60dfc5d86d548391aedf2476e6616eba Mon Sep 17 00:00:00 2001 From: andylo25 Date: Fri, 15 Jun 2018 11:16:26 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E9=87=8D=E6=9E=84connect=E8=8A=82=E7=82=B9?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=8A=A0=E8=8A=82=E7=82=B9=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E5=AE=8C=E6=88=90=E5=9B=9E=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socket/client/BlockClientAioListener.java | 16 ++-- .../socket/client/ClientStarter.java | 91 ++++++++++++------- 2 files changed, 68 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java b/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java index b73f46d..33d4742 100644 --- a/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java +++ b/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java @@ -7,7 +7,8 @@ import org.tio.core.Aio; import org.tio.core.ChannelContext; import org.tio.core.intf.Packet; -import static com.mindata.blockchain.socket.common.Const.GROUP_NAME; +import com.mindata.blockchain.ApplicationContextProvider; +import com.mindata.blockchain.core.event.NodesConnectedEvent; /** * client端对各个server连接的情况回调。

@@ -20,12 +21,13 @@ public class BlockClientAioListener implements ClientAioListener { @Override public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception { - if (isConnected) { - logger.info("连接成功:server地址为-" + channelContext.getServerNode()); - Aio.bindGroup(channelContext, GROUP_NAME); - } else { - logger.info("连接失败:server地址为-" + channelContext.getServerNode()); - } +// if (isConnected) { +// logger.info("连接成功:server地址为-" + channelContext.getServerNode()); +// Aio.bindGroup(channelContext, Const.GROUP_NAME); +// } else { +// logger.info("连接失败:server地址为-" + channelContext.getServerNode()); +// } + ApplicationContextProvider.publishEvent(new NodesConnectedEvent(channelContext)); } @Override diff --git a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java index 43acf29..4043821 100644 --- a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java +++ b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java @@ -1,19 +1,19 @@ package com.mindata.blockchain.socket.client; -import com.mindata.blockchain.common.AppId; -import com.mindata.blockchain.common.CommonUtil; -import com.mindata.blockchain.core.bean.Member; -import com.mindata.blockchain.core.bean.MemberData; -import com.mindata.blockchain.core.bean.Permission; -import com.mindata.blockchain.core.bean.PermissionData; -import com.mindata.blockchain.core.manager.PermissionManager; -import com.mindata.blockchain.socket.common.Const; -import com.mindata.blockchain.socket.packet.BlockPacket; -import com.mindata.blockchain.socket.packet.NextBlockPacketBuilder; +import static com.mindata.blockchain.socket.common.Const.GROUP_NAME; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.stream.Collectors; + +import javax.annotation.Resource; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -26,14 +26,18 @@ import org.tio.core.ChannelContext; import org.tio.core.Node; import org.tio.utils.lock.SetWithLock; -import javax.annotation.Resource; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.stream.Collectors; - -import static com.mindata.blockchain.socket.common.Const.GROUP_NAME; +import com.google.common.collect.Maps; +import com.mindata.blockchain.common.AppId; +import com.mindata.blockchain.common.CommonUtil; +import com.mindata.blockchain.core.bean.Member; +import com.mindata.blockchain.core.bean.MemberData; +import com.mindata.blockchain.core.bean.Permission; +import com.mindata.blockchain.core.bean.PermissionData; +import com.mindata.blockchain.core.event.NodesConnectedEvent; +import com.mindata.blockchain.core.manager.PermissionManager; +import com.mindata.blockchain.socket.common.Const; +import com.mindata.blockchain.socket.packet.BlockPacket; +import com.mindata.blockchain.socket.packet.NextBlockPacketBuilder; /** * @author wuweifeng wrote on 2018/3/18. @@ -59,7 +63,11 @@ public class ClientStarter { private Logger logger = LoggerFactory.getLogger(getClass()); - private static Set nodes = new HashSet<>(); + private Set nodes = new HashSet<>(); + + // 节点连接状态 + private Map nodesStatus = Maps.newConcurrentMap(); + private volatile boolean isNodesReady = false; // 节点是否已准备好 /** * 从麦达区块链管理端获取已登记的各服务器ip @@ -129,14 +137,13 @@ public class ClientStarter { */ @Scheduled(fixedRate = 30000) public void heartBeat() { + if(!isNodesReady)return; logger.info("---------开始心跳包--------"); BlockPacket blockPacket = NextBlockPacketBuilder.build(); packetSender.sendGroup(blockPacket); } - @EventListener(ApplicationReadyEvent.class) - public void fetchNextBlock() throws InterruptedException { - Thread.sleep(6000); + public void onNodesReady() { logger.info("开始群发信息获取next Block"); //在这里发请求,去获取group别人的新区块 BlockPacket nextBlockPacket = NextBlockPacketBuilder.build(); @@ -181,21 +188,41 @@ public class ClientStarter { try { AioClient aioClient = new AioClient(clientGroupContext); logger.info("开始绑定" + ":" + serverNode.toString()); - ClientChannelContext clientChannelContext = aioClient.connect(serverNode, 2); - if (clientChannelContext == null) { - logger.info("绑定" + serverNode.toString() + "失败"); - return; - } - //绑group是将要连接的各个服务器节点做为一个group - Aio.bindGroup(clientChannelContext, GROUP_NAME); + aioClient.asynConnect(serverNode); } catch (Exception e) { logger.info("异常"); } } + + @EventListener(NodesConnectedEvent.class) + public void onConnected(NodesConnectedEvent connectedEvent){ + ChannelContext channelContext = connectedEvent.getSource(); + Node node = channelContext.getServerNode(); + if (channelContext.isClosed()) { + logger.info("连接" + node.toString() + "失败"); + nodesStatus.put(node.getIp(), -1); + return; + }else{ + logger.info("连接" + node.toString() + "成功"); + nodesStatus.put(node.getIp(), 1); + //绑group是将要连接的各个服务器节点做为一个group + Aio.bindGroup(channelContext, GROUP_NAME); + + int csize = Aio.getAllChannelContexts(clientGroupContext).size(); + if(csize >= pbftAgreeCount()){ + synchronized (nodesStatus) { + if(!isNodesReady){ + isNodesReady = true; + onNodesReady(); + } + } + } + } + } public int halfGroupSize() { - SetWithLock setWithLock = clientGroupContext.groups.clients(clientGroupContext, Const.GROUP_NAME); - return ((Set) setWithLock.getObj()).size() / 2; + SetWithLock setWithLock = clientGroupContext.groups.clients(clientGroupContext, Const.GROUP_NAME); + return setWithLock.getObj().size() / 2; } /** -- Gitee From 4173a30a8bb45b66c32a1e550d75fa48ba710d7c Mon Sep 17 00:00:00 2001 From: andylo25 Date: Fri, 15 Jun 2018 11:17:05 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E5=AE=8C=E6=88=90=E6=A0=87=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/event/NodesConnectedEvent.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 src/main/java/com/mindata/blockchain/core/event/NodesConnectedEvent.java diff --git a/src/main/java/com/mindata/blockchain/core/event/NodesConnectedEvent.java b/src/main/java/com/mindata/blockchain/core/event/NodesConnectedEvent.java new file mode 100644 index 0000000..d269013 --- /dev/null +++ b/src/main/java/com/mindata/blockchain/core/event/NodesConnectedEvent.java @@ -0,0 +1,21 @@ +package com.mindata.blockchain.core.event; + +import org.springframework.context.ApplicationEvent; +import org.tio.core.ChannelContext; + +/** + * 节点连接完成时会触发该Event + * @author andylo25 wrote on 2018/6/15. + */ +public class NodesConnectedEvent extends ApplicationEvent { + private static final long serialVersionUID = 526755692642414178L; + + public NodesConnectedEvent(ChannelContext channelContext) { + super(channelContext); + } + + public ChannelContext getSource() { + return (ChannelContext) source; + } + +} -- Gitee