徒手撸了一个API网关,理解更透彻了,代码已上传github,自取~
发布日期:2021-10-11 21:51:32 浏览次数:7 分类:技术文章

本文共 27955 字,大约阅读时间需要 93 分钟。

点击上方 阿拉奇学Java,选择 设为星标

优质文章,及时送达!

一、背景

最近在github上看了soul网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关。经过两周时间的开发,我的网关ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差没有管理后台????。

二、设计

2.1 技术选型

网关是所有请求的入口,所以要求有很高的吞吐量,为了实现这点可以使用请求异步化来解决。目前一般有以下两种方案:

  • Tomcat/Jetty+NIO+Servlet3

Servlet3已经支持异步,这种方案使用比较多,京东,有赞和Zuul,都用的是这种方案。

  • Netty+NIO

Netty为高并发而生,目前唯品会的网关使用这个策略,在唯品会的技术文章中在相同的情况下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己处理HTTP协议,这一块比较麻烦。

后面发现Soul网关是基于Spring WebFlux(底层Netty)的,不用太关心HTTP协议的处理,于是决定也用Spring WebFlux。

网关的第二个特点是具备可扩展性,比如Netflix Zuul有preFilters,postFilters等在不同的阶段方便处理不同的业务,基于责任链模式将请求进行链式处理即可实现。

在微服务架构下,服务都会进行多实例部署来保证高可用,请求到达网关时,网关需要根据URL找到所有可用的实例,这时就需要服务注册和发现功能,即注册中心。

现在流行的注册中心有Apache的Zookeeper和阿里的Nacos两种(consul有点小众),因为之前写RPC框架时已经用过了Zookeeper,所以这次就选择了Nacos。

2.2 需求清单

首先要明确目标,即开发一个具备哪些特性的网关,总结下后如下:

自定义路由规则

可基于version的路由规则设置,路由对象包括DEFAUL,HEADER和QUERY三种,匹配方式包括=、regex、like三种。

跨语言

HTTP协议天生跨语言

高性能

Netty本身就是一款高性能的通信框架,同时server将一些路由规则等数据缓存到JVM内存避免请求admin服务。

高可用

支持集群模式防止单节点故障,无状态。

灰度发布

灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式。在其上可以进行A/B testing,即让一部分用户继续用产品特性A,一部分用户开始用产品特性B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面来。通过特性一可以实现。

接口鉴权

基于责任链模式,用户开发自己的鉴权插件即可。

负载均衡

支持多种负载均衡算法,如随机,轮询,加权轮询等。利用SPI机制可以根据配置进行动态加载。

2.3 架构设计

在参考了一些优秀的网关Zuul,Spring Cloud Gateway,Soul后,将项目划分为以下几个模块。

它们之间的关系如图:

网关设计

注意: 这张图与实际实现有点出入,Nacos push到本地缓存的那个环节没有实现,目前只有ship-sever定时轮询pull的过程。ship-admin从Nacos获取注册服务信息的过程,也改成了ServiceA启动时主动发生HTTP请求通知ship-admin。

2.4 表结构设计

三、编码

3.1 ship-client-spring-boot-starter

首先创建一个spring-boot-starter命名为ship-client-spring-boot-starter,不知道如何自定义starter的可以看我以前写的《开发自己的starter》。

其核心类 AutoRegisterListener 就是在项目启动时做了两件事:

1.将服务信息注册到Nacos注册中心

2.通知ship-admin服务上线了并注册下线hook。

代码如下:

* Created by 2YSP on 2020/12/21*/public class AutoRegisterListener implements ApplicationListener
 {   private final static Logger LOGGER = LoggerFactory.getLogger(AutoRegisterListener.class);   private volatile AtomicBoolean registered = new AtomicBoolean(false);   private final ClientConfigProperties properties;   @NacosInjected   private NamingService namingService;   @Autowired   private RequestMappingHandlerMapping handlerMapping;   private final ExecutorService pool;   /*** url list to ignore*/   private static List
 ignoreUrlList = new LinkedList<>();   static {       ignoreUrlList.add("/error");   }   public AutoRegisterListener(ClientConfigProperties properties) {       if (!check(properties)) {           LOGGER.error("client config port,contextPath,appName adminUrl and version can't be empty!");           throw new ShipException("client config port,contextPath,appName adminUrl and version can't be empty!");       }       this.properties = properties;       pool = new ThreadPoolExecutor(1, 4, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());   }   /*** check the ClientConfigProperties** @param properties* @return*/   private boolean check(ClientConfigProperties properties) {       if (properties.getPort() == null| properties.getContextPath() == null              | properties.getVersion() == null| properties.getAppName() == null              | properties.getAdminUrl() == null) {           return false;       }       return true;   }   @Override   public void onApplicationEvent(ContextRefreshedEvent event) {       if (!registered.compareAndSet(false, true)) {           return;       }       doRegister();       registerShutDownHook();   }   /*** send unregister request to admin when jvm shutdown*/   private void registerShutDownHook() {       final String url = "http://" + properties.getAdminUrl() + AdminConstants.UNREGISTER_PATH;       final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO();       unregisterAppDTO.setAppName(properties.getAppName());       unregisterAppDTO.setVersion(properties.getVersion());       unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());       unregisterAppDTO.setPort(properties.getPort());       Runtime.getRuntime().addShutdownHook(new Thread(() -> {           OkhttpTool.doPost(url, unregisterAppDTO);           LOGGER.info("[{}:{}] unregister from ship-admin success!", unregisterAppDTO.getAppName(), unregisterAppDTO.getVersion());       }));   }   /*** register all interface info to register center*/   private void doRegister() {       Instance instance = new Instance();       instance.setIp(IpUtil.getLocalIpAddress());       instance.setPort(properties.getPort());       instance.setEphemeral(true);       Map
 metadataMap = new HashMap<>();       metadataMap.put("version", properties.getVersion());       metadataMap.put("appName", properties.getAppName());       instance.setMetadata(metadataMap);       try {           namingService.registerInstance(properties.getAppName(), NacosConstants.APP_GROUP_NAME, instance);       } catch (NacosException e) {           LOGGER.error("register to nacos fail", e);           throw new ShipException(e.getErrCode(), e.getErrMsg());       }       LOGGER.info("register interface info to nacos success!");       // send register request to ship-admin       String url = "http://" + properties.getAdminUrl() + AdminConstants.REGISTER_PATH;       RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance);       OkhttpTool.doPost(url, registerAppDTO);       LOGGER.info("register to ship-admin success!");   }   private RegisterAppDTO buildRegisterAppDTO(Instance instance) {       RegisterAppDTO registerAppDTO = new RegisterAppDTO();       registerAppDTO.setAppName(properties.getAppName());       registerAppDTO.setContextPath(properties.getContextPath());       registerAppDTO.setIp(instance.getIp());       registerAppDTO.setPort(instance.getPort());       registerAppDTO.setVersion(properties.getVersion());       return registerAppDTO;   }}

3.2 ship-server

ship-sever项目主要包括了两个部分内容, 1.请求动态路由的主流程 2.本地缓存数据和ship-admin及nacos同步,这部分在后面3.3再讲。

ship-server实现动态路由的原理是利用WebFilter拦截请求,然后将请求教给plugin chain去链式处理。

PluginFilter根据URL解析出appName,然后将启用的plugin组装成plugin chain。

public class PluginFilter implements WebFilter {   private ServerConfigProperties properties;   public PluginFilter(ServerConfigProperties properties) {       this.properties = properties;   }   @Override   public Mono
 filter(ServerWebExchange exchange, WebFilterChain chain) {       String appName = parseAppName(exchange);       if (CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName))) {           throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);       }       PluginChain pluginChain = new PluginChain(properties, appName);       pluginChain.addPlugin(new DynamicRoutePlugin(properties));       pluginChain.addPlugin(new AuthPlugin(properties));       return pluginChain.execute(exchange, pluginChain);   }   private String parseAppName(ServerWebExchange exchange) {       RequestPath path = exchange.getRequest().getPath();       String appName = path.value().split("/")[1];       return appName;   }}```PluginChain继承了AbstractShipPlugin并持有所有要执行的插件。```java* @Author: Ship* @Description:* @Date: Created in 2020/12/25*/public class PluginChain extends AbstractShipPlugin {   /*** the pos point to current plugin*/   private int pos;   /*** the plugins of chain*/   private List
 plugins;   private final String appName;   public PluginChain(ServerConfigProperties properties, String appName) {       super(properties);       this.appName = appName;   }   /*** add enabled plugin to chain** @param shipPlugin*/   public void addPlugin(ShipPlugin shipPlugin) {       if (plugins == null) {           plugins = new ArrayList<>();       }       if (!PluginCache.isEnabled(appName, shipPlugin.name())) {           return;       }       plugins.add(shipPlugin);       // order by the plugin's order       plugins.sort(Comparator.comparing(ShipPlugin::order));   }   @Override   public Integer order() {       return null;   }   @Override   public String name() {       return null;   }   @Override   public Mono
 execute(ServerWebExchange exchange, PluginChain pluginChain) {       if (pos == plugins.size()) {           return exchange.getResponse().setComplete();       }       return pluginChain.plugins.get(pos++).execute(exchange, pluginChain);   }   public String getAppName() {       return appName;   }}

AbstractShipPlugin实现了ShipPlugin接口,并持有ServerConfigProperties配置对象。

public abstract class AbstractShipPlugin implements ShipPlugin {   protected ServerConfigProperties properties;   public AbstractShipPlugin(ServerConfigProperties properties) {       this.properties = properties;   }}```ShipPlugin接口定义了所有插件必须实现的三个方法order(),name()和execute()。```javapublic interface ShipPlugin {   /*** lower values have higher priority** @return*/   Integer order();   /*** return current plugin name** @return*/   String name();   Mono
 execute(ServerWebExchange exchange,PluginChain pluginChain);}```DynamicRoutePlugin继承了抽象类AbstractShipPlugin,包含了动态路由的主要业务逻辑。```java* @Author: Ship* @Description:* @Date: Created in 2020/12/25*/public class DynamicRoutePlugin extends AbstractShipPlugin {   private final static Logger LOGGER = LoggerFactory.getLogger(DynamicRoutePlugin.class);   private static WebClient webClient;   private static final Gson gson = new GsonBuilder().create();   static {       HttpClient httpClient = HttpClient.create()               .tcpConfiguration(client ->                       client.doOnConnected(conn ->                               conn.addHandlerLast(new ReadTimeoutHandler(3))                                       .addHandlerLast(new WriteTimeoutHandler(3)))                               .option(ChannelOption.TCP_NODELAY, true)               );       webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient))               .build();   }   public DynamicRoutePlugin(ServerConfigProperties properties) {       super(properties);   }   @Override   public Integer order() {       return ShipPluginEnum.DYNAMIC_ROUTE.getOrder();   }   @Override   public String name() {       return ShipPluginEnum.DYNAMIC_ROUTE.getName();   }   @Override   public Mono
 execute(ServerWebExchange exchange, PluginChain pluginChain) {       String appName = pluginChain.getAppName();       ServiceInstance serviceInstance = chooseInstance(appName, exchange.getRequest());//        LOGGER.info("selected instance is [{}]", gson.toJson(serviceInstance));       // request service       String url = buildUrl(exchange, serviceInstance);       return forward(exchange, url);   }   /*** forward request to backend service** @param exchange* @param url* @return*/   private Mono
 forward(ServerWebExchange exchange, String url) {       ServerHttpRequest request = exchange.getRequest();       ServerHttpResponse response = exchange.getResponse();       HttpMethod method = request.getMethod();       WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(url).headers((headers) -> {           headers.addAll(request.getHeaders());       });       WebClient.RequestHeadersSpec
 reqHeadersSpec;       if (requireHttpBody(method)) {           reqHeadersSpec = requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));       } else {           reqHeadersSpec = requestBodySpec;       }       // nio->callback->nio       return reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))               .onErrorResume(ex -> {                   return Mono.defer(() -> {                       String errorResultJson = "";                       if (ex instanceof TimeoutException) {                           errorResultJson = "{\"code\":5001,\"message\":\"network timeout\"}";                       } else {                           errorResultJson = "{\"code\":5000,\"message\":\"system error\"}";                       }                       return ShipResponseUtil.doResponse(exchange, errorResultJson);                   }).then(Mono.empty());               }).flatMap(backendResponse -> {                   response.setStatusCode(backendResponse.statusCode());                   response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());                   return response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));               });   }   /*** weather the http method need http body** @param method* @return*/   private boolean requireHttpBody(HttpMethod method) {       if (method.equals(HttpMethod.POST)| method.equals(HttpMethod.PUT)| method.equals(HttpMethod.PATCH)) {           return true;       }       return false;   }   private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance) {       ServerHttpRequest request = exchange.getRequest();       String query = request.getURI().getQuery();       String path = request.getPath().value().replaceFirst("/" + serviceInstance.getAppName(), "");       String url = "http://" + serviceInstance.getIp() + ":" + serviceInstance.getPort() + path;       if (!StringUtils.isEmpty(query)) {           url = url + "?" + query;       }       return url;   }   /*** choose an ServiceInstance according to route rule config and load balancing algorithm** @param appName* @param request* @return*/   private ServiceInstance chooseInstance(String appName, ServerHttpRequest request) {       List
 serviceInstances = ServiceCache.getAllInstances(appName);       if (CollectionUtils.isEmpty(serviceInstances)) {           LOGGER.error("service instance of {} not find", appName);           throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);       }       String version = matchAppVersion(appName, request);       if (StringUtils.isEmpty(version)) {           throw new ShipException("match app version error");       }       // filter serviceInstances by version       List
 instances = serviceInstances.stream().filter(i -> i.getVersion().equals(version)).collect(Collectors.toList());       //Select an instance based on the load balancing algorithm       LoadBalance loadBalance = LoadBalanceFactory.getInstance(properties.getLoadBalance(), appName, version);       ServiceInstance serviceInstance = loadBalance.chooseOne(instances);       return serviceInstance;   }   private String matchAppVersion(String appName, ServerHttpRequest request) {       List
 rules = RouteRuleCache.getRules(appName);       rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());       for (AppRuleDTO rule : rules) {           if (match(rule, request)) {               return rule.getVersion();           }       }       return null;   }   private boolean match(AppRuleDTO rule, ServerHttpRequest request) {       String matchObject = rule.getMatchObject();       String matchKey = rule.getMatchKey();       String matchRule = rule.getMatchRule();       Byte matchMethod = rule.getMatchMethod();       if (MatchObjectEnum.DEFAULT.getCode().equals(matchObject)) {           return true;       } else if (MatchObjectEnum.QUERY.getCode().equals(matchObject)) {           String param = request.getQueryParams().getFirst(matchKey);           if (!StringUtils.isEmpty(param)) {               return StringTools.match(param, matchMethod, matchRule);           }       } else if (MatchObjectEnum.HEADER.getCode().equals(matchObject)) {           HttpHeaders headers = request.getHeaders();           String headerValue = headers.getFirst(matchKey);           if (!StringUtils.isEmpty(headerValue)) {               return StringTools.match(headerValue, matchMethod, matchRule);           }       }       return false;   }}

3.3 数据同步

app数据同步

后台服务(如订单服务)启动时,只将服务名,版本,ip地址和端口号注册到了Nacos,并没有实例的权重和启用的插件信息怎么办?

一般在线的实例权重和插件列表都是在管理界面配置,然后动态生效的,所以需要ship-admin定时更新实例的权重和插件信息到注册中心。

对应代码ship-admin的NacosSyncListener

* @Author: Ship* @Description:* @Date: Created in 2020/12/30*/@Configurationpublic class NacosSyncListener implements ApplicationListener
 {   private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncListener.class);   private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,           new ShipThreadFactory("nacos-sync", true).create());   @NacosInjected   private NamingService namingService;   @Value("${nacos.discovery.server-addr}")   private String baseUrl;   @Resource   private AppService appService;   @Override   public void onApplicationEvent(ContextRefreshedEvent event) {       if (event.getApplicationContext().getParent() != null) {           return;       }       String url = "http://" + baseUrl + NacosConstants.INSTANCE_UPDATE_PATH;       scheduledPool.scheduleWithFixedDelay(new NacosSyncTask(namingService, url, appService), 0, 30L, TimeUnit.SECONDS);   }   class NacosSyncTask implements Runnable {       private NamingService namingService;       private String url;       private AppService appService;       private Gson gson = new GsonBuilder().create();       public NacosSyncTask(NamingService namingService, String url, AppService appService) {           this.namingService = namingService;           this.url = url;           this.appService = appService;       }       /*** Regular update weight,enabled plugins to nacos instance*/       @Override       public void run() {           try {               // get all app names               ListView
 services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);               if (CollectionUtils.isEmpty(services.getData())) {                   return;               }               List
 appNames = services.getData();               List
 appInfos = appService.getAppInfos(appNames);               for (AppInfoDTO appInfo : appInfos) {                   if (CollectionUtils.isEmpty(appInfo.getInstances())) {                       continue;                   }                   for (ServiceInstance instance : appInfo.getInstances()) {                       Map
 queryMap = buildQueryMap(appInfo, instance);                       String resp = OkhttpTool.doPut(url, queryMap, "");                       LOGGER.debug("response :{}", resp);                   }               }           } catch (Exception e) {               LOGGER.error("nacos sync task error", e);           }       }       private Map
 buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) {           Map
 map = new HashMap<>();           map.put("serviceName", appInfo.getAppName());           map.put("groupName", NacosConstants.APP_GROUP_NAME);           map.put("ip", instance.getIp());           map.put("port", instance.getPort());           map.put("weight", instance.getWeight().doubleValue());           NacosMetadata metadata = new NacosMetadata();           metadata.setAppName(appInfo.getAppName());           metadata.setVersion(instance.getVersion());           metadata.setPlugins(String.join(",", appInfo.getEnabledPlugins()));           map.put("metadata", StringTools.urlEncode(gson.toJson(metadata)));           map.put("ephemeral", true);           return map;       }   }}

ship-server再定时从Nacos拉取app数据更新到本地Map缓存。

* @Author: Ship* @Description: sync data to local cache* @Date: Created in 2020/12/25*/@Configurationpublic class DataSyncTaskListener implements ApplicationListener
 {   private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,           new ShipThreadFactory("service-sync", true).create());   @NacosInjected   private NamingService namingService;   @Autowired   private ServerConfigProperties properties;   @Override   public void onApplicationEvent(ContextRefreshedEvent event) {       if (event.getApplicationContext().getParent() != null) {           return;       }       scheduledPool.scheduleWithFixedDelay(new DataSyncTask(namingService)               , 0L, properties.getCacheRefreshInterval(), TimeUnit.SECONDS);       WebsocketSyncCacheServer websocketSyncCacheServer = new WebsocketSyncCacheServer(properties.getWebSocketPort());       websocketSyncCacheServer.start();   }   class DataSyncTask implements Runnable {       private NamingService namingService;       public DataSyncTask(NamingService namingService) {           this.namingService = namingService;       }       @Override       public void run() {           try {               // get all app names               ListView
 services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);               if (CollectionUtils.isEmpty(services.getData())) {                   return;               }               List
 appNames = services.getData();               // get all instances               for (String appName : appNames) {                   List
 instanceList = namingService.getAllInstances(appName, NacosConstants.APP_GROUP_NAME);                   if (CollectionUtils.isEmpty(instanceList)) {                       continue;                   }                   ServiceCache.add(appName, buildServiceInstances(instanceList));                   List
 pluginNames = getEnabledPlugins(instanceList);                   PluginCache.add(appName, pluginNames);               }               ServiceCache.removeExpired(appNames);               PluginCache.removeExpired(appNames);           } catch (NacosException e) {               e.printStackTrace();           }       }       private List
 getEnabledPlugins(List
 instanceList) {           Instance instance = instanceList.get(0);           Map
 metadata = instance.getMetadata();           // plugins: DynamicRoute,Auth           String plugins = metadata.getOrDefault("plugins", ShipPluginEnum.DYNAMIC_ROUTE.getName());           return Arrays.stream(plugins.split(",")).collect(Collectors.toList());       }       private List
 buildServiceInstances(List
 instanceList) {           List
 list = new LinkedList<>();           instanceList.forEach(instance -> {               Map
 metadata = instance.getMetadata();               ServiceInstance serviceInstance = new ServiceInstance();               serviceInstance.setAppName(metadata.get("appName"));               serviceInstance.setIp(instance.getIp());               serviceInstance.setPort(instance.getPort());               serviceInstance.setVersion(metadata.get("version"));               serviceInstance.setWeight((int) instance.getWeight());               list.add(serviceInstance);           });           return list;       }   }}

路由规则数据同步

同时,如果用户在管理后台更新了路由规则,ship-admin需要推送规则数据到ship-server,这里参考了soul网关的做法利用websocket在第一次建立连接后进行全量同步,此后路由规则发生变更就只作增量同步。

服务端WebsocketSyncCacheServer:

* @Author: Ship* @Description:* @Date: Created in 2020/12/28*/public class WebsocketSyncCacheServer extends WebSocketServer {   private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheServer.class);   private Gson gson = new GsonBuilder().create();   private MessageHandler messageHandler;   public WebsocketSyncCacheServer(Integer port) {       super(new InetSocketAddress(port));       this.messageHandler = new MessageHandler();   }   @Override   public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {       LOGGER.info("server is open");   }   @Override   public void onClose(WebSocket webSocket, int i, String s, boolean b) {       LOGGER.info("websocket server close...");   }   @Override   public void onMessage(WebSocket webSocket, String message) {       LOGGER.info("websocket server receive message:\n[{}]", message);       this.messageHandler.handler(message);   }   @Override   public void onError(WebSocket webSocket, Exception e) {   }   @Override   public void onStart() {       LOGGER.info("websocket server start...");   }   class MessageHandler {       public void handler(String message) {           RouteRuleOperationDTO operationDTO = gson.fromJson(message, RouteRuleOperationDTO.class);           if (CollectionUtils.isEmpty(operationDTO.getRuleList())) {               return;           }           Map
> map = operationDTO.getRuleList()                   .stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));           if (OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())                  | OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType())) {               RouteRuleCache.add(map);           } else if (OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType())) {               RouteRuleCache.remove(map);           }       }   }}

客户端WebsocketSyncCacheClient:

* @Author: Ship* @Description:* @Date: Created in 2020/12/28*/@Componentpublic class WebsocketSyncCacheClient {   private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheClient.class);   private WebSocketClient client;   private RuleService ruleService;   private Gson gson = new GsonBuilder().create();   public WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}") String serverWebSocketUrl,RuleService ruleService) {       if (StringUtils.isEmpty(serverWebSocketUrl)) {           throw new ShipException(ShipExceptionEnum.CONFIG_ERROR);       }       this.ruleService = ruleService;       ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,               new ShipThreadFactory("websocket-connect", true).create());       try {           client = new WebSocketClient(new URI(serverWebSocketUrl)) {               @Override               public void onOpen(ServerHandshake serverHandshake) {                   LOGGER.info("client is open");                   List
 list = ruleService.getEnabledRule();                   String msg = gson.toJson(new RouteRuleOperationDTO(OperationTypeEnum.INSERT, list));                   send(msg);               }               @Override               public void onMessage(String s) {               }               @Override               public void onClose(int i, String s, boolean b) {               }               @Override               public void onError(Exception e) {                   LOGGER.error("websocket client error", e);               }           };           client.connectBlocking();           //使用调度线程池进行断线重连,30秒进行一次           executor.scheduleAtFixedRate(() -> {               if (client != null && client.isClosed()) {                   try {                       client.reconnectBlocking();                   } catch (InterruptedException e) {                       LOGGER.error("reconnect server fail", e);                   }               }           }, 10, 30, TimeUnit.SECONDS);       } catch (Exception e) {           LOGGER.error("websocket sync cache exception", e);           throw new ShipException(e.getMessage());       }   }   public 
 void send(T t) {       while (!client.getReadyState().equals(ReadyState.OPEN)) {           LOGGER.debug("connecting ...please wait");       }       client.send(gson.toJson(t));   }}

四、测试

4.1动态路由测试

1、本地启动nacos ,sh startup.sh -m standalone

2、启动ship-admin

3、本地启动两个ship-example实例。

实例1配置:

ship: http:   app-name: order   version: gray_1.0   context-path: /order   port: 8081   admin-url: 127.0.0.1:9001 server: port: 8081 nacos: discovery:   server-addr: 127.0.0.1:8848

实例2配置:

ship: http:   app-name: order   version: prod_1.0   context-path: /order   port: 8082   admin-url: 127.0.0.1:9001 server: port: 8082 nacos: discovery:   server-addr: 127.0.0.1:8848

4、在数据库添加路由规则配置,该规则表示当http header 中的name=ship时请求路由到gray_1.0版本的节点。

5、启动ship-server,看到以下日志时则可以进行测试了。

2021-01-02 19:57:09.159  INFO 30413 --- [SocketWorker-29] cn.sp.sync.WebsocketSyncCacheServer      : websocket server receive message: [{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}]

6、用Postman请求http://localhost:9000/order/user/add,POST方式,header设置name=ship,可以看到只有实例1有日志显示。

==========add user,version:gray_1.0

4.2性能压测

压测环境:

  • MacBook Pro 13英寸

  • 处理器 2.3 GHz 四核Intel Core i7

  • 内存 16 GB 3733 MHz LPDDR4X

  • 后端节点个数一个

  • 压测工具:wrk

  • 压测结果:20个线程,500个连接数,吞吐量大概每秒9400个请求。

压测结果

五、总结

千里之行始于足下,开始以为写一个网关会很难,但当你实际开始行动时就会发现其实没那么难,所以迈出第一步很重要。过程中也遇到了很多问题,还在github上给soul和nacos这两个开源项目提了两个issue,后来发现是自己的问题,尴尬????。

本文代码已全部上传到 https://github.com/2YSP/ship-gate

来源 | https://www.cnblogs.com/2YSP/p/14223892.html

回复「进群」即可进入无广告技术交流群。同时送上250本电子书+学习视频作为见面 有你想看的精彩 【收藏】神器 Nginx 的学习手册总结零散的 MySQL 基础知识教你玩转 统一异常处理VS Code真的会一统江湖吗?牛啊!全球当下最厉害的 14 位程序员炸了!炸了!微信十周年炸裂更新(附安卓版)Springboot + Vue + shiro 实现前后端分离、权限控制JetBrains遭美国调查!微服务架构 - Gateway网关限流目前5000+ 人已关注加入我们             爱点赞的人,运气都不会太差

转载地址:https://blog.csdn.net/qq_36189144/article/details/113750210 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:一文搞定 Spring Bean 的创建全过程!
下一篇:【收藏】神器 Nginx 的学习手册

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.207.175.100]2024年04月16日 13时15分14秒