From aaff6b7478a762ba66aeb24164b1d44d8ad35283 Mon Sep 17 00:00:00 2001 From: yanshikui Date: Fri, 27 Dec 2024 17:24:37 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E5=A2=9E=E5=8A=A0=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E5=B1=9E=E6=80=A7websocket=E6=8E=A8=E9=80=81=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ktg-admin/src/main/resources/application.yml | 30 +++-- ktg-mes/pom.xml | 12 +- .../ktg/mes/rabbitmq/DirectQueueHandler.java | 26 +++++ .../com/ktg/mes/rabbitmq/RabbitMqConfig.java | 46 ++++++++ .../com/ktg/mes/websocket/MesWebSocket.java | 106 ++++++++++++++++++ 5 files changed, 208 insertions(+), 12 deletions(-) create mode 100644 ktg-mes/src/main/java/com/ktg/mes/rabbitmq/DirectQueueHandler.java create mode 100644 ktg-mes/src/main/java/com/ktg/mes/rabbitmq/RabbitMqConfig.java create mode 100644 ktg-mes/src/main/java/com/ktg/mes/websocket/MesWebSocket.java diff --git a/ktg-admin/src/main/resources/application.yml b/ktg-admin/src/main/resources/application.yml index 55c8f44..50848e4 100644 --- a/ktg-admin/src/main/resources/application.yml +++ b/ktg-admin/src/main/resources/application.yml @@ -42,11 +42,19 @@ logging: # Spring配置 spring: + rabbitmq: + host: 192.168.1.194 # RabbitMQ 服务的地址 + port: 5672 # RabbitMQ 服务的端口 + username: mes # RabbitMQ 服务的账号 + password: mes # RabbitMQ 服务的密码 + virtual-host: / # 默认虚拟主机 + consumer: + batchSize: 100 # 消费者每次拉取消息的数量 # 资源信息 messages: # 国际化资源文件路径 basename: i18n/messages - profiles: + profiles: active: druid # 文件上传 servlet: @@ -63,13 +71,13 @@ spring: # redis 配置 redis: # 地址 - host: localhost + host: 192.168.1.254 # 端口,默认为6379 port: 6379 # 数据库索引 database: 0 # 密码 - password: + password: # 连接超时时间 timeout: 10s lettuce: @@ -91,7 +99,7 @@ token: secret: abcdefghijklmnopqrstuvwxyz # 令牌有效期(默认30分钟) expireTime: 30 - + # MyBatis配置 mybatis: # 搜索指定包别名 @@ -102,10 +110,10 @@ mybatis: configLocation: classpath:mybatis/mybatis-config.xml # PageHelper分页插件 -pagehelper: +pagehelper: helperDialect: mysql supportMethodsArguments: true - params: count=countSql + params: count=countSql # Swagger配置 swagger: @@ -116,7 +124,7 @@ swagger: pathMapping: /dev-api # 防止XSS攻击 -xss: +xss: # 过滤开关 enabled: true # 排除链接(多个用逗号分隔) @@ -126,7 +134,7 @@ xss: #Mino配置 minio: - url: http://127.0.0.1:9000 - accessKey: your_key - secretKey: your_secret - bucketName: ktg-mes \ No newline at end of file + url: http://192.168.1.254:9000 + accessKey: minioadmin + secretKey: minioadmin + bucketName: lx-mes diff --git a/ktg-mes/pom.xml b/ktg-mes/pom.xml index 4f1392d..bf4c791 100644 --- a/ktg-mes/pom.xml +++ b/ktg-mes/pom.xml @@ -60,6 +60,16 @@ commons-collections commons-collections + + org.springframework.boot + spring-boot-starter-amqp + + + + org.springframework.boot + spring-boot-starter-websocket + 2.2.13.RELEASE + - \ No newline at end of file + diff --git a/ktg-mes/src/main/java/com/ktg/mes/rabbitmq/DirectQueueHandler.java b/ktg-mes/src/main/java/com/ktg/mes/rabbitmq/DirectQueueHandler.java new file mode 100644 index 0000000..bdb6972 --- /dev/null +++ b/ktg-mes/src/main/java/com/ktg/mes/rabbitmq/DirectQueueHandler.java @@ -0,0 +1,26 @@ +package com.ktg.mes.rabbitmq; + +import com.ktg.mes.websocket.MesWebSocket; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +/** + * 直连队列 处理器 + * + * @author raft + */ +@Slf4j +@Component +public class DirectQueueHandler { + + @RabbitListener(queues = "iot.original.mes") + public void directHandlerRegister_origin(String message) { + try { +// log.info("Mes:消息队列,拉取原始数据:{}", message); + MesWebSocket.pushData(message); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/ktg-mes/src/main/java/com/ktg/mes/rabbitmq/RabbitMqConfig.java b/ktg-mes/src/main/java/com/ktg/mes/rabbitmq/RabbitMqConfig.java new file mode 100644 index 0000000..706ea87 --- /dev/null +++ b/ktg-mes/src/main/java/com/ktg/mes/rabbitmq/RabbitMqConfig.java @@ -0,0 +1,46 @@ +package com.ktg.mes.rabbitmq; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Slf4j +@Configuration +public class RabbitMqConfig { + + /** + * 设备事件上报队列 + */ + @Bean + public Queue deviceEventQueue() { + return new Queue("iot.original.mes"); + } + + /** + * RabbitMQ默认的topic交换机 + * @return + */ + @Bean + public TopicExchange topicExchange() { + return new TopicExchange("amq.topic"); + } + + /** + * 绑定队列和交换机 + * @param topicExchange + * @param deviceEventQueue + * @return + */ + @Bean + public Binding bindingDeviceEventQueue(TopicExchange topicExchange, Queue deviceEventQueue) { + return BindingBuilder.bind(deviceEventQueue).to(topicExchange).with("iot.origin"); // 替换为实际的路由键 + } +} diff --git a/ktg-mes/src/main/java/com/ktg/mes/websocket/MesWebSocket.java b/ktg-mes/src/main/java/com/ktg/mes/websocket/MesWebSocket.java new file mode 100644 index 0000000..16d03a7 --- /dev/null +++ b/ktg-mes/src/main/java/com/ktg/mes/websocket/MesWebSocket.java @@ -0,0 +1,106 @@ +package com.ktg.mes.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.*; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * mes websocket服务端 为组态、大屏、提供数据推送 + */ +@ServerEndpoint("/websocket/mes") +@Slf4j +@Component +public class MesWebSocket { + private static Map sessionMap = new ConcurrentHashMap<>(); + public final static String WEBSOCKET_HEARTBEAT = "heartbeat-iot"; + /** + * 客户端成功连接时调用 + * + * @param session 存储的session + */ + @OnOpen + public void openSession(Session session) { + log.warn("Mes WebSocket:{} 已连接", session.getId()); + sessionMap.put(session.getId(),session); + } + + /** + * 服务端接收到消息 + * + * @param session 会话,每个访问对象都会有一个单独的会话 + * @param message 服务端接收到的消息 格式: deviceCode:nodeCode:pointCode1,pointCode2,pointCode3 + */ + @OnMessage(maxMessageSize = 10485760) + public void onMessage(Session session, String message) { + log.warn("Mes WebSocket:接收到 {} 的消息 {}", session.getId(), message); + if(WEBSOCKET_HEARTBEAT.equals(message)){ + sendMessage(session,"OK"); + } + } + + /** + * 客户端断开连接时调用 + * + * @param session 存储的session + */ + @OnClose + public void onClose(Session session) { + log.warn("Mes WebSocket:{} 断开连接", session.getId()); + removeSessionData(session); + } + + @OnError + public void onError(Session session, Throwable throwable) { + removeSessionData(session); + } + + private void removeSessionData(Session session) { + String sessionId = session.getId(); + sessionMap.remove(sessionId); + } + + /** + * 向指定Session(用户)发送message + * + * @param session 用session进行判断 + * @param message 发送的消息 + */ + private static void sendMessage(Session session, String message) { + try { + session.getBasicRemote().sendText(message); + log.warn("Mes sendData {}", message); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 给对应的测点session客户端集合发送测点实时数据 + * + * @param message 发送消息 + * @param sessionMap 获取sessionIds发送实时数据 + */ + private static synchronized void sendMessageAll(String message, Map sessionMap) { + for (String key : sessionMap.keySet()) { + log.info("sessionId: {}", key); + sendMessage(sessionMap.get(key), message); + } + } + + /** + * 检查是否有客户端监听此设备的实时数据 + * + * @param message 发送消息内容 + */ + public static void pushData(String message) { + if (ObjectUtils.isNotEmpty(sessionMap)) { + sendMessageAll(message, sessionMap); + } + } +}