SpringBoot 使用 WebSocket 监控本地日志

1. 简介

  • WebSocket 协议是基于 TCP 的一种新的网络协议,它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端,这样就可以实现从客户端发送消息到服务器,而服务器又可以转发消息到客户端,这样就能够实现客户端之间的交互
  • 对于 WebSocket 的开发,Spring 也提供了良好的支持,目前很多浏览器已经实现了 WebSocket 协议,但是依旧存在着很多浏览器没有实现该协议,为了兼容那些没有实现该协议的浏览器,往往还需要通过 STOMP 协议来完成这些兼容

Ajax

WebSocket

2. 添加依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

3. SpringBoot 整合 WebSocket

  • 配置开启 WebSocket
1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
@EnableWebSocket
public class WebSocketConfig {

/**
* 自动注册 @ServerEndpoint 注解声明的端点
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
  • 定义一个端点服务类,让 Spring 创建 WebSocket 的服务端点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@ServerEndpoint("/websocket/{sid}")
@Component
public class WebSocketServer {

private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);

private static final AtomicInteger onlineCount = new AtomicInteger(0);
private static final ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();

private Session session;
private String sid;

/**
* 客户端打开 WebSocket 服务端点调用方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("sid") String sid) {
this.session = session;
this.sid = sid;
if (webSocketMap.containsKey(sid)) {
webSocketMap.remove(sid);
webSocketMap.put(sid, this);
} else {
webSocketMap.put(sid, this);
onlineCount.getAndIncrement();
}
log.info("开启连接: {}, 当前连接数: {}", sid, onlineCount);
sendMessage("连接成功");
}

/**
* 客户端关闭 WebSocket 服务端点调用方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(sid)) {
webSocketMap.remove(sid);
onlineCount.getAndDecrement();
}
log.info("关闭连接: {}, 当前连接数: {}", sid, onlineCount);
}

/**
* 客户端发送消息,WebSocket 服务端点调用方法
*/
@OnMessage
public void onMessage(Session session, String message) {
log.info("接收客户端消息: {}, {}", sid, message);
if ("START".equals(message)) {
new FileMonitorTask(session, "E:\\1.txt", 1000L).start();
}
}

/**
* 客户端请求 WebSocket 服务端点发生异常调用方法
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("连接错误: {}", sid, error);
}

/**
* 服务端主动推送数据到客户端
*/
public void sendMessage(String message) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("发送消息失败: {}, {}", sid, message);
}
}
}
  • 文件监听线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class FileMonitorTask extends Thread {

private static final Logger log = LoggerFactory.getLogger(FileMonitorTask.class);

private final Session session;
private final String filepath;
private final Long interval;

private boolean isRunning = true;
private final ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 100);
private final CharBuffer charBuffer = CharBuffer.allocate(1024 * 50);
private final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();

public FileMonitorTask(Session session, String filepath, Long interval) {
this.session = session;
this.filepath = filepath;
this.interval = interval;
}

@Override
public void run() {
File file = new File(filepath);
FileChannel channel;
try {
channel = new FileInputStream(file).getChannel();
channel.position(channel.size());
} catch (IOException e) {
log.error("can't monitor file: {}", filepath, e);
closeSession();
return;
}
long lastModified = file.lastModified();
while (isRunning) {
long now = file.lastModified();
if (now != lastModified) {
log.info("monitoring file: {}, {}", Thread.currentThread().getName(), filepath);
String message = getNewContent(channel);
if (message != null) {
sendMessage(message);
}
lastModified = now;
}

try {
TimeUnit.MILLISECONDS.sleep(interval);
} catch (InterruptedException e) {
log.error("sleep interrupted", e);
closeSession();
}

isRunning = session.isOpen();
}
}

private void closeSession() {
try {
session.close();
} catch (IOException e) {
log.error("close session error", e);
}
}

private void sendMessage(String message) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("send message error", e);
}
}

private String getNewContent(FileChannel channel) {
try {
byteBuffer.clear();
charBuffer.clear();
int length = channel.read(byteBuffer);
if (length != -1) {
byteBuffer.flip();
decoder.decode(byteBuffer, charBuffer, true);
charBuffer.flip();
return charBuffer.toString();
} else {
channel.position(channel.size());
}
} catch (Exception e) {
log.error("getNewContent error", e);
closeSession();
}
return null;
}
}

参考