高并发websocket发送的后端限流措施
发布日期:2021-10-06 02:38:08 浏览次数:4 分类:技术文章

本文共 9153 字,大约阅读时间需要 30 分钟。

       JAVA 中常用 webSocket 进行前后端的通信,比如在适当的时候通知前端进行刷新操作。通常情况下没有问题(消息传递可靠性不做考虑),但是一旦后端突然接收到大量请求,需要向前端发送大量 socket 通知其刷新。这种情况下会给前端带去很大的压力,很有可能刷新不过来,造成前端页面卡死。

       本文通过对“向同一客户端发送的大量同类socket消息“进行过滤来进行限流操作。

核心代码

package com.ysu.ems.web;import java.io.IOException;import javax.annotation.Resource;import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSONObject;import com.ysu.ems.pojo.RedisUtil;/** * 2019/08/21 * */@Componentpublic class HighConcurrencyWebSocketServer {		private static WebSocketServer websocketServer;//普通发送socket消息的方法类,代码附后	@Resource 	public void setWebSocketServer(WebSocketServer websocketServer) { 		this.websocketServer = websocketServer; 	}	private static RedisUtil redisUtil;//连接redis的工具类,代码附后	@Resource 	public void setRedisUtil(RedisUtil redisUtil) { 		this.redisUtil = redisUtil; 	}		public static void sendInfo(Object ob,String message,String userId) throws IOException  {		StringBuffer keyBuffer=new StringBuffer();//redis缓存的key值		JSONObject jSONObject=(JSONObject)ob;		keyBuffer.append("highConWebSocketKey-");//key值前缀		keyBuffer.append(jSONObject.get("type").toString());//socket的type值		keyBuffer.append("-"+userId);//socket将要送达的客户端的userId		if(!redisUtil.exists(keyBuffer.toString())){//如果缓存中不存在			//存入缓存			redisUtil.set(keyBuffer.toString(),"1",1L);//1秒后失效,value值无意义			//开定时器,1秒之后再发出去			Thread socketSender=new Thread(()->{				try {					Thread.sleep(1000);//先休眠1秒					websocketServer.sendInfo(ob, message, userId);//发送socket									} catch (Exception e) {					e.printStackTrace();				}			});			socketSender.start();		}	}}

上述代码中的 Object ob对象的值类似这样:

JSONObject jsonObject = new JSONObject();jsonObject.put("type", 6);//socket类型,和前端约定好的,不同值代表不同意义jsonObject.put("info", "这是要发送的socket解释信息");

核心思路就是:把短时间内(比如1秒内),向同一用户发送的同类socket消息过滤掉不发送,并确保该段时间内的最新消息(因为是同类的,所以最先到达的消息也等同于最新的)能够发送到前端,避免最后到达的那条信息被过滤而没被发送,休眠时间可以比缓存失效时间长一点点。

比如1秒内向 A 用户客户端发送了 100 条同样的信息要求前端刷新,这样过滤后前端就只会收到第一条消息(它和最后一条最新的消息是等价的),后面的99条冗余消息就被后端过滤掉了,前端在1秒内顶多只刷新一次(而且一定是最新的消息通知的),不会导致页面卡死。

 

连接redis

package com.ysu.ems.pojo;import java.io.Serializable;import java.util.Map;import java.util.Set;import java.util.concurrent.TimeUnit; import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.core.ValueOperations;import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;import org.springframework.data.redis.serializer.StringRedisSerializer;import org.springframework.stereotype.Component; /** *  * @Description: spring boot 的redis工具类   * 创建日期:2019/05/26 * @function redisTemplateInit 配置Redis防止乱码   2019/08/21 */@SuppressWarnings("unchecked")@Componentpublic class RedisUtil {	 @SuppressWarnings("rawtypes")	    @Autowired	    private RedisTemplate redisTemplate;	 	 	@Bean	    public RedisTemplate redisTemplateInit() {	        //设置序列化Key的实例化对象	        redisTemplate.setKeySerializer(new StringRedisSerializer());	        //设置序列化Value的实例化对象	        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());	        return redisTemplate;	    }	    /**	     * 批量删除对应的value	     * 	     * @param keys	     */	    public void remove(final String... keys) {	        for (String key : keys) {	            remove(key);	        }	    }	 	    /**	     * 批量删除key	     * 	     * @param pattern	     */	    public void removePattern(final String pattern) {	        Set
keys = redisTemplate.keys(pattern); if (keys.size() > 0) redisTemplate.delete(keys); } /** * 删除对应的value * * @param key */ public void remove(final String key) { if (exists(key)) { redisTemplate.delete(key); } } /** * 判断缓存中是否有对应的value * * @param key * @return */ public boolean exists(final String key) { return redisTemplate.hasKey(key); } /** * 读取缓存 * * @param key * @return */ public String get(final String key) { Object result = null; redisTemplate.setValueSerializer(new StringRedisSerializer()); ValueOperations
operations = redisTemplate.opsForValue(); result = operations.get(key); if (result == null) { return null; } return result.toString(); } /** * 写入缓存 * * @param key * @param value * @return */ public boolean set(final String key, Object value) { boolean result = false; try { ValueOperations
operations = redisTemplate.opsForValue(); operations.set(key, value); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 写入缓存 * * @param key * @param value * @return */ public boolean set(final String key, Object value, Long expireTime) { boolean result = false; try { ValueOperations
operations = redisTemplate.opsForValue(); operations.set(key, value); redisTemplate.expire(key, expireTime, TimeUnit.SECONDS); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } public boolean hmset(String key, Map
value) { boolean result = false; try { redisTemplate.opsForHash().putAll(key, value); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } public Map
hmget(String key) { Map
result = null; try { result = redisTemplate.opsForHash().entries(key); } catch (Exception e) { e.printStackTrace(); } return result; }}

我用maven管理的,服务器那边redis需要你们自己配置一下,redis连接池我用spring-boot管理的。在配置文件中写一下就行

#redis缓存配置spring.redis.host=服务器ipspring.redis.port=6379#spring.redis.password=#spring.redis.database=1#spring.redis.pool.max-active=80#spring.redis.pool.max-wait=-1#spring.redis.pool.max-idle=500#spring.redis.pool.min-idle=0#spring.redis.timeout=100000

webSocket实现

文中的 websocketServer.sendInfo(ob, message, userId);就是普通的发送socket消息的实现方法

package com.ysu.ems.web;import java.io.IOException;import java.io.OutputStream;import java.net.Socket;import java.util.List;import java.util.concurrent.CopyOnWriteArraySet;import javax.annotation.Resource;import javax.websocket.EncodeException;import javax.websocket.OnClose;import javax.websocket.OnError;import javax.websocket.OnMessage;import javax.websocket.OnOpen;import javax.websocket.Session;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.util.Date;import java.text.SimpleDateFormat;import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSONObject;@ServerEndpoint("/websocket/{sid}")@Componentpublic class WebSocketServer {    private static int onlineCount = 0;    private static CopyOnWriteArraySet
webSocketSet = new CopyOnWriteArraySet
(); private Session session; private String sid=""; @OnOpen public void onOpen(Session session,@PathParam("sid") String sid) { this.session = session; webSocketSet.add(this); addOnlineCount(); this.sid=sid; } @OnClose public void onClose() { webSocketSet.remove(this); subOnlineCount(); } @OnMessage public void onMessage(String message, Session session) { //log.info("收到来自窗口"+sid+"的信息:"+message); for (WebSocketServer item : webSocketSet) { try { item.sendMessage(item.sid); } catch (IOException e) { e.printStackTrace(); } } } @OnError public void onError(Session session, Throwable error) { //log.error("发生错误"); error.printStackTrace(); } public void sendMessage(String message) throws IOException { this.session.getAsyncRemote().sendText(message); } public static void sendInfo(Object ob,String message,@PathParam("sid") String sid) throws IOException { //log.info("推送消息到窗口"+sid+",推送内容:"+message); for (WebSocketServer item : webSocketSet) { try { //这里可以设定只推送给这个sid的,为null则全部推送 if(sid==null) { if(message!=null) item.sendMessage(message); if(ob!=null) item.sendMessage(JSONObject.toJSONString(ob)); }else if(item.sid.equals(sid)){ if(message!=null) item.sendMessage(message); if(ob!=null) item.sendMessage(JSONObject.toJSONString(ob)); } } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; }}

    

转载地址:https://blog.csdn.net/Luxuff/article/details/99988457 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:VS code的WSL 下git clone
下一篇:利用 mysqlbinlog 查看 mysql 日志

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年03月23日 20时27分23秒