diff --git a/ktg-admin/src/main/resources/application.yml b/ktg-admin/src/main/resources/application.yml
index 55c8f44..50848e4 100644
--- a/ktg-admin/src/main/resources/application.yml
+++ b/ktg-admin/src/main/resources/application.yml
@@ -42,11 +42,19 @@ logging:
# Spring配置
spring:
+ rabbitmq:
+ host: 192.168.1.194 # RabbitMQ 服务的地址
+ port: 5672 # RabbitMQ 服务的端口
+ username: mes # RabbitMQ 服务的账号
+ password: mes # RabbitMQ 服务的密码
+ virtual-host: / # 默认虚拟主机
+ consumer:
+ batchSize: 100 # 消费者每次拉取消息的数量
# 资源信息
messages:
# 国际化资源文件路径
basename: i18n/messages
- profiles:
+ profiles:
active: druid
# 文件上传
servlet:
@@ -63,13 +71,13 @@ spring:
# redis 配置
redis:
# 地址
- host: localhost
+ host: 192.168.1.254
# 端口,默认为6379
port: 6379
# 数据库索引
database: 0
# 密码
- password:
+ password:
# 连接超时时间
timeout: 10s
lettuce:
@@ -91,7 +99,7 @@ token:
secret: abcdefghijklmnopqrstuvwxyz
# 令牌有效期(默认30分钟)
expireTime: 30
-
+
# MyBatis配置
mybatis:
# 搜索指定包别名
@@ -102,10 +110,10 @@ mybatis:
configLocation: classpath:mybatis/mybatis-config.xml
# PageHelper分页插件
-pagehelper:
+pagehelper:
helperDialect: mysql
supportMethodsArguments: true
- params: count=countSql
+ params: count=countSql
# Swagger配置
swagger:
@@ -116,7 +124,7 @@ swagger:
pathMapping: /dev-api
# 防止XSS攻击
-xss:
+xss:
# 过滤开关
enabled: true
# 排除链接(多个用逗号分隔)
@@ -126,7 +134,7 @@ xss:
#Mino配置
minio:
- url: http://127.0.0.1:9000
- accessKey: your_key
- secretKey: your_secret
- bucketName: ktg-mes
\ No newline at end of file
+ url: http://192.168.1.254:9000
+ accessKey: minioadmin
+ secretKey: minioadmin
+ bucketName: lx-mes
diff --git a/ktg-mes/pom.xml b/ktg-mes/pom.xml
index 4f1392d..bf4c791 100644
--- a/ktg-mes/pom.xml
+++ b/ktg-mes/pom.xml
@@ -60,6 +60,16 @@
commons-collections
commons-collections
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+
+
+
+ org.springframework.boot
+ spring-boot-starter-websocket
+ 2.2.13.RELEASE
+
-
\ No newline at end of file
+
diff --git a/ktg-mes/src/main/java/com/ktg/mes/rabbitmq/DirectQueueHandler.java b/ktg-mes/src/main/java/com/ktg/mes/rabbitmq/DirectQueueHandler.java
new file mode 100644
index 0000000..bdb6972
--- /dev/null
+++ b/ktg-mes/src/main/java/com/ktg/mes/rabbitmq/DirectQueueHandler.java
@@ -0,0 +1,26 @@
+package com.ktg.mes.rabbitmq;
+
+import com.ktg.mes.websocket.MesWebSocket;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * 直连队列 处理器
+ *
+ * @author raft
+ */
+@Slf4j
+@Component
+public class DirectQueueHandler {
+
+ @RabbitListener(queues = "iot.original.mes")
+ public void directHandlerRegister_origin(String message) {
+ try {
+// log.info("Mes:消息队列,拉取原始数据:{}", message);
+ MesWebSocket.pushData(message);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/ktg-mes/src/main/java/com/ktg/mes/rabbitmq/RabbitMqConfig.java b/ktg-mes/src/main/java/com/ktg/mes/rabbitmq/RabbitMqConfig.java
new file mode 100644
index 0000000..706ea87
--- /dev/null
+++ b/ktg-mes/src/main/java/com/ktg/mes/rabbitmq/RabbitMqConfig.java
@@ -0,0 +1,46 @@
+package com.ktg.mes.rabbitmq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.TopicExchange;
+import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Slf4j
+@Configuration
+public class RabbitMqConfig {
+
+ /**
+ * 设备事件上报队列
+ */
+ @Bean
+ public Queue deviceEventQueue() {
+ return new Queue("iot.original.mes");
+ }
+
+ /**
+ * RabbitMQ默认的topic交换机
+ * @return
+ */
+ @Bean
+ public TopicExchange topicExchange() {
+ return new TopicExchange("amq.topic");
+ }
+
+ /**
+ * 绑定队列和交换机
+ * @param topicExchange
+ * @param deviceEventQueue
+ * @return
+ */
+ @Bean
+ public Binding bindingDeviceEventQueue(TopicExchange topicExchange, Queue deviceEventQueue) {
+ return BindingBuilder.bind(deviceEventQueue).to(topicExchange).with("iot.origin"); // 替换为实际的路由键
+ }
+}
diff --git a/ktg-mes/src/main/java/com/ktg/mes/websocket/MesWebSocket.java b/ktg-mes/src/main/java/com/ktg/mes/websocket/MesWebSocket.java
new file mode 100644
index 0000000..16d03a7
--- /dev/null
+++ b/ktg-mes/src/main/java/com/ktg/mes/websocket/MesWebSocket.java
@@ -0,0 +1,106 @@
+package com.ktg.mes.websocket;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * mes websocket服务端 为组态、大屏、提供数据推送
+ */
+@ServerEndpoint("/websocket/mes")
+@Slf4j
+@Component
+public class MesWebSocket {
+ private static Map sessionMap = new ConcurrentHashMap<>();
+ public final static String WEBSOCKET_HEARTBEAT = "heartbeat-iot";
+ /**
+ * 客户端成功连接时调用
+ *
+ * @param session 存储的session
+ */
+ @OnOpen
+ public void openSession(Session session) {
+ log.warn("Mes WebSocket:{} 已连接", session.getId());
+ sessionMap.put(session.getId(),session);
+ }
+
+ /**
+ * 服务端接收到消息
+ *
+ * @param session 会话,每个访问对象都会有一个单独的会话
+ * @param message 服务端接收到的消息 格式: deviceCode:nodeCode:pointCode1,pointCode2,pointCode3
+ */
+ @OnMessage(maxMessageSize = 10485760)
+ public void onMessage(Session session, String message) {
+ log.warn("Mes WebSocket:接收到 {} 的消息 {}", session.getId(), message);
+ if(WEBSOCKET_HEARTBEAT.equals(message)){
+ sendMessage(session,"OK");
+ }
+ }
+
+ /**
+ * 客户端断开连接时调用
+ *
+ * @param session 存储的session
+ */
+ @OnClose
+ public void onClose(Session session) {
+ log.warn("Mes WebSocket:{} 断开连接", session.getId());
+ removeSessionData(session);
+ }
+
+ @OnError
+ public void onError(Session session, Throwable throwable) {
+ removeSessionData(session);
+ }
+
+ private void removeSessionData(Session session) {
+ String sessionId = session.getId();
+ sessionMap.remove(sessionId);
+ }
+
+ /**
+ * 向指定Session(用户)发送message
+ *
+ * @param session 用session进行判断
+ * @param message 发送的消息
+ */
+ private static void sendMessage(Session session, String message) {
+ try {
+ session.getBasicRemote().sendText(message);
+ log.warn("Mes sendData {}", message);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 给对应的测点session客户端集合发送测点实时数据
+ *
+ * @param message 发送消息
+ * @param sessionMap 获取sessionIds发送实时数据
+ */
+ private static synchronized void sendMessageAll(String message, Map sessionMap) {
+ for (String key : sessionMap.keySet()) {
+ log.info("sessionId: {}", key);
+ sendMessage(sessionMap.get(key), message);
+ }
+ }
+
+ /**
+ * 检查是否有客户端监听此设备的实时数据
+ *
+ * @param message 发送消息内容
+ */
+ public static void pushData(String message) {
+ if (ObjectUtils.isNotEmpty(sessionMap)) {
+ sendMessageAll(message, sessionMap);
+ }
+ }
+}