feat:增加设备属性websocket推送功能

This commit is contained in:
yanshikui 2024-12-27 17:24:37 +08:00
parent d1c6019166
commit aaff6b7478
5 changed files with 208 additions and 12 deletions

View File

@ -42,11 +42,19 @@ logging:
# Spring配置 # Spring配置
spring: spring:
rabbitmq:
host: 192.168.1.194 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: mes # RabbitMQ 服务的账号
password: mes # RabbitMQ 服务的密码
virtual-host: / # 默认虚拟主机
consumer:
batchSize: 100 # 消费者每次拉取消息的数量
# 资源信息 # 资源信息
messages: messages:
# 国际化资源文件路径 # 国际化资源文件路径
basename: i18n/messages basename: i18n/messages
profiles: profiles:
active: druid active: druid
# 文件上传 # 文件上传
servlet: servlet:
@ -63,13 +71,13 @@ spring:
# redis 配置 # redis 配置
redis: redis:
# 地址 # 地址
host: localhost host: 192.168.1.254
# 端口默认为6379 # 端口默认为6379
port: 6379 port: 6379
# 数据库索引 # 数据库索引
database: 0 database: 0
# 密码 # 密码
password: password:
# 连接超时时间 # 连接超时时间
timeout: 10s timeout: 10s
lettuce: lettuce:
@ -91,7 +99,7 @@ token:
secret: abcdefghijklmnopqrstuvwxyz secret: abcdefghijklmnopqrstuvwxyz
# 令牌有效期默认30分钟 # 令牌有效期默认30分钟
expireTime: 30 expireTime: 30
# MyBatis配置 # MyBatis配置
mybatis: mybatis:
# 搜索指定包别名 # 搜索指定包别名
@ -102,10 +110,10 @@ mybatis:
configLocation: classpath:mybatis/mybatis-config.xml configLocation: classpath:mybatis/mybatis-config.xml
# PageHelper分页插件 # PageHelper分页插件
pagehelper: pagehelper:
helperDialect: mysql helperDialect: mysql
supportMethodsArguments: true supportMethodsArguments: true
params: count=countSql params: count=countSql
# Swagger配置 # Swagger配置
swagger: swagger:
@ -116,7 +124,7 @@ swagger:
pathMapping: /dev-api pathMapping: /dev-api
# 防止XSS攻击 # 防止XSS攻击
xss: xss:
# 过滤开关 # 过滤开关
enabled: true enabled: true
# 排除链接(多个用逗号分隔) # 排除链接(多个用逗号分隔)
@ -126,7 +134,7 @@ xss:
#Mino配置 #Mino配置
minio: minio:
url: http://127.0.0.1:9000 url: http://192.168.1.254:9000
accessKey: your_key accessKey: minioadmin
secretKey: your_secret secretKey: minioadmin
bucketName: ktg-mes bucketName: lx-mes

View File

@ -60,6 +60,16 @@
<groupId>commons-collections</groupId> <groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId> <artifactId>commons-collections</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--webSocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.2.13.RELEASE</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,26 @@
package com.ktg.mes.rabbitmq;
import com.ktg.mes.websocket.MesWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 直连队列 处理器
*
* @author raft
*/
@Slf4j
@Component
public class DirectQueueHandler {
@RabbitListener(queues = "iot.original.mes")
public void directHandlerRegister_origin(String message) {
try {
// log.info("Mes:消息队列,拉取原始数据:{}", message);
MesWebSocket.pushData(message);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,46 @@
package com.ktg.mes.rabbitmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RabbitMqConfig {
/**
* 设备事件上报队列
*/
@Bean
public Queue deviceEventQueue() {
return new Queue("iot.original.mes");
}
/**
* RabbitMQ默认的topic交换机
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("amq.topic");
}
/**
* 绑定队列和交换机
* @param topicExchange
* @param deviceEventQueue
* @return
*/
@Bean
public Binding bindingDeviceEventQueue(TopicExchange topicExchange, Queue deviceEventQueue) {
return BindingBuilder.bind(deviceEventQueue).to(topicExchange).with("iot.origin"); // 替换为实际的路由键
}
}

View File

@ -0,0 +1,106 @@
package com.ktg.mes.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* mes websocket服务端 为组态大屏提供数据推送
*/
@ServerEndpoint("/websocket/mes")
@Slf4j
@Component
public class MesWebSocket {
private static Map<String, Session> sessionMap = new ConcurrentHashMap<>();
public final static String WEBSOCKET_HEARTBEAT = "heartbeat-iot";
/**
* 客户端成功连接时调用
*
* @param session 存储的session
*/
@OnOpen
public void openSession(Session session) {
log.warn("Mes WebSocket{} 已连接", session.getId());
sessionMap.put(session.getId(),session);
}
/**
* 服务端接收到消息
*
* @param session 会话每个访问对象都会有一个单独的会话
* @param message 服务端接收到的消息 格式 deviceCode:nodeCode:pointCode1,pointCode2,pointCode3
*/
@OnMessage(maxMessageSize = 10485760)
public void onMessage(Session session, String message) {
log.warn("Mes WebSocket接收到 {} 的消息 {}", session.getId(), message);
if(WEBSOCKET_HEARTBEAT.equals(message)){
sendMessage(session,"OK");
}
}
/**
* 客户端断开连接时调用
*
* @param session 存储的session
*/
@OnClose
public void onClose(Session session) {
log.warn("Mes WebSocket{} 断开连接", session.getId());
removeSessionData(session);
}
@OnError
public void onError(Session session, Throwable throwable) {
removeSessionData(session);
}
private void removeSessionData(Session session) {
String sessionId = session.getId();
sessionMap.remove(sessionId);
}
/**
* 向指定Session(用户)发送message
*
* @param session 用session进行判断
* @param message 发送的消息
*/
private static void sendMessage(Session session, String message) {
try {
session.getBasicRemote().sendText(message);
log.warn("Mes sendData {}", message);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 给对应的测点session客户端集合发送测点实时数据
*
* @param message 发送消息
* @param sessionMap 获取sessionIds发送实时数据
*/
private static synchronized void sendMessageAll(String message, Map<String, Session> sessionMap) {
for (String key : sessionMap.keySet()) {
log.info("sessionId: {}", key);
sendMessage(sessionMap.get(key), message);
}
}
/**
* 检查是否有客户端监听此设备的实时数据
*
* @param message 发送消息内容
*/
public static void pushData(String message) {
if (ObjectUtils.isNotEmpty(sessionMap)) {
sendMessageAll(message, sessionMap);
}
}
}