Skip to content

Commit 8be55d5

Browse files
author
pedro
committed
feat:添加个人用户推送
1 parent 1cdfe4b commit 8be55d5

5 files changed

Lines changed: 79 additions & 4 deletions

File tree

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.github.talelin.merak.modules.message;
2+
3+
/**
4+
* websocket 模块常量
5+
*/
6+
public class MessageConsts {
7+
public static final String USER_KEY = "user";
8+
}

src/main/java/io/github/talelin/merak/modules/message/README.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ WsHandler 是 lin-cms 提供给开发者的消息推送接口,该接口有许
188188

189189
### sendMessage
190190

191-
给某个会话发送消息,有两个重载方法,如下:
191+
给某个会话或某个用户发送消息,有四个重载方法,如下:
192192

193193
```
194194
/**
@@ -208,10 +208,31 @@ void sendMessage(WebSocketSession session, String message) throws IOException;
208208
* @throws IOException 发送io异常
209209
*/
210210
void sendMessage(WebSocketSession session, TextMessage message) throws IOException;
211+
212+
/**
213+
* 发送消息
214+
*
215+
* @param userId 用户id
216+
* @param message 要发送的消息
217+
* @throws IOException 发送io异常
218+
*/
219+
void sendMessage(Long userId, TextMessage message) throws IOException;
220+
221+
/**
222+
* 发送消息
223+
*
224+
* @param userId 用户id
225+
* @param message 要发送的消息
226+
* @throws IOException 发送io异常
227+
*/
228+
void sendMessage(Long userId, String message) throws IOException;
211229
```
212230

213231
方法的第二个参数可以直接接收字符串,也可以接收 TextMessage 类型的消息。
214232

233+
当给用户发送消息时,只需要传入用户 id 即可,但一定要确保该用户通过令牌进行了
234+
连接。
235+
215236
### broadCast
216237

217238
广播,给所有连接用户发送消息,也有两个重载方法,第一个直接发送字符串,

src/main/java/io/github/talelin/merak/modules/message/WebSocketInterceptor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import java.nio.charset.Charset;
1919
import java.util.Map;
2020

21+
import static io.github.talelin.merak.modules.message.MessageConsts.USER_KEY;
22+
2123
@SuppressWarnings("Duplicates")
2224
public class WebSocketInterceptor implements HandshakeInterceptor {
2325
@Autowired
@@ -58,7 +60,7 @@ public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse res
5860
writeMessageToBody(response, "user is not found");
5961
return false;
6062
}
61-
attributes.put("user", user);
63+
attributes.put(USER_KEY, user);
6264
return true;
6365
}
6466
return false;

src/main/java/io/github/talelin/merak/modules/message/WsHandler.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,24 @@ public interface WsHandler {
3939
*/
4040
void sendMessage(WebSocketSession session, String message) throws IOException;
4141

42+
/**
43+
* 发送消息
44+
*
45+
* @param userId 用户id
46+
* @param message 要发送的消息
47+
* @throws IOException 发送io异常
48+
*/
49+
void sendMessage(Long userId, TextMessage message) throws IOException;
50+
51+
/**
52+
* 发送消息
53+
*
54+
* @param userId 用户id
55+
* @param message 要发送的消息
56+
* @throws IOException 发送io异常
57+
*/
58+
void sendMessage(Long userId, String message) throws IOException;
59+
4260
/**
4361
* 发送消息
4462
*

src/main/java/io/github/talelin/merak/modules/message/WsHandlerImpl.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
import org.springframework.web.socket.TextMessage;
88
import org.springframework.web.socket.WebSocketSession;
99

10+
import static io.github.talelin.merak.modules.message.MessageConsts.USER_KEY;
11+
1012
import java.io.IOException;
1113
import java.util.List;
1214
import java.util.Map;
15+
import java.util.Optional;
1316
import java.util.concurrent.CopyOnWriteArraySet;
1417
import java.util.concurrent.atomic.AtomicInteger;
1518

@@ -47,6 +50,29 @@ public void sendMessage(WebSocketSession session, String message) throws IOExcep
4750
this.sendMessage(session, new TextMessage(message));
4851
}
4952

53+
@Override
54+
public void sendMessage(Long userId, TextMessage message) throws IOException {
55+
Optional<WebSocketSession> userSession = sessions.stream().filter(session -> {
56+
if (!session.isOpen()) {
57+
return false;
58+
}
59+
Map<String, Object> attributes = session.getAttributes();
60+
if (!attributes.containsKey(USER_KEY)) {
61+
return false;
62+
}
63+
UserDO user = (UserDO) attributes.get(USER_KEY);
64+
return user.getId().equals(userId);
65+
}).findFirst();
66+
if (userSession.isPresent()) {
67+
userSession.get().sendMessage(message);
68+
}
69+
}
70+
71+
@Override
72+
public void sendMessage(Long userId, String message) throws IOException {
73+
sendMessage(userId, new TextMessage(message));
74+
}
75+
5076
@Override
5177
public void sendMessage(WebSocketSession session, TextMessage message) throws IOException {
5278
session.sendMessage(message);
@@ -82,9 +108,9 @@ public void broadCastToGroup(Long groupId, TextMessage message) throws IOExcepti
82108
if (!session.isOpen())
83109
continue;
84110
Map<String, Object> attributes = session.getAttributes();
85-
if (!attributes.containsKey("user"))
111+
if (!attributes.containsKey(USER_KEY))
86112
continue;
87-
UserDO user = (UserDO) attributes.get("user");
113+
UserDO user = (UserDO) attributes.get(USER_KEY);
88114
boolean matched = userIds.stream().anyMatch(id -> id.equals(user.getId()));
89115
if (!matched)
90116
continue;

0 commit comments

Comments
 (0)