【RPC系列】5、向Zookeeper上注册服务(用netty、zk手写RPC第二步)
发布日期:2021-06-24 15:28:53 浏览次数:2 分类:技术文章

本文共 5817 字,大约阅读时间需要 19 分钟。

上一篇使用自定义xsd定义了发布、订阅的标签

这一篇就是用标签的解析并想zk上注册服务
废话不多说,直接上注册中心的代码

package com.kaer.rpc.netty.register.zk;import java.util.Iterator;import java.util.concurrent.ConcurrentMap;import org.apache.curator.framework.CuratorFramework;...import org.slf4j.LoggerFactory;import com.kaer.rpc.demo.config.ProviderConfig;/** * Zookeeper注册中心处理类 *  * @author kaer * */public class ZkRegistryCenter {
public static final Logger logger = LoggerFactory.getLogger(ZkRegistryCenter.class);日志打印 private static String basePath = "/kaer-rpc";//zk是以目录的形式创建节点 private static CuratorFramework client;//Curator操作zk的客户端 private final ConcurrentMap
services = Maps.newConcurrentMap();//服务的集合,暂时只发布一个服务,后面可扩展 private static final ConnectionStateListener connectionStateListener;//监听器对zk的状态实时处理,暂不实现 static {
connectionStateListener = new ConnectionStateListener() {
@Override public void stateChanged(CuratorFramework client, ConnectionState newState) {
// 状态的变化处理 logger.info("zkClient state change to " + newState); } }; } public static void start(String zookeeperUrl) {
//序列化工具 //初始化连接客户端,即连接到了本地zk的127.0.0.1:2181 client = CuratorFrameworkFactory.newClient(zookeeperUrl, new ExponentialBackoffRetry(1000, 3)); client.getConnectionStateListenable().addListener(ZkRegistryCenter.connectionStateListener); client.start();//zk客户端启动 try {
//zk是以目录的形式存储数据,而且顶向下一节一节实现的 if((Stat) client.checkExists().forPath(basePath) == null) {
((ACLBackgroundPathAndBytesable
) client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT)).forPath(basePath); } } catch (Exception e) {
e.printStackTrace(); } logger.info("start service register......"); } //注册服务,这个发生的时间是在容器的bean加载,还记得之前自定义的标签么,有对provider标签的解析类ProviderBean public static void registerService(String serviceId, String alias, byte[] info) {
//创建目录节点,类似kaer-rpc/{api},数据目录为kaer-rpc/{api}/data String nodePath = ZKPaths.makePath(basePath, serviceId); String fullPath = ZKPaths.makePath(nodePath, "data"); //存放目录数据 try {
Stat stat = (Stat) client.checkExists().forPath(nodePath); if(stat == null) {
((ACLBackgroundPathAndBytesable
) client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT)).forPath(nodePath); ((ACLBackgroundPathAndBytesable
) client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT)).forPath(fullPath); }else {
stat = (Stat) client.checkExists().forPath(fullPath); if(stat == null) {
((ACLBackgroundPathAndBytesable
) client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT)).forPath(fullPath); } } client.setData().forPath(fullPath, info);//这里的info使用Json序列化的,具体要见ProviderConfig } catch (Exception e) {
e.printStackTrace(); } } //删除服务,暂不实现 public void unRegisterService(String serviceId) {
String path = basePath + "serviceId"; try {
((ChildrenDeletable) client.delete().guaranteed()).forPath(path); } catch (Exception e) {
e.printStackTrace(); logger.error("Could not unregister instance: s%", serviceId); } } public void close() {
ExceptionAccumulator accumulator = new ExceptionAccumulator(); Iterator
arg2 = this.services.values().iterator(); while (arg2.hasNext()) {
ProviderConfig service = (ProviderConfig) arg2.next(); unRegisterService(service.getApi()); } client.getConnectionStateListenable().removeListener(ZkRegistryCenter.connectionStateListener); CloseableUtils.closeQuietly(client); accumulator.propagate(); }}

由上一篇中分别对三个自定义标签进行了解析处理

下面是注册服务的处理逻辑

package com.kaer.rpc.demo.config;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.alibaba.fastjson.JSON;import com.kaer.rpc.netty.register.zk.ZkRegistryCenter;/** * 服务提供者配置 *  * @author kaer * */public class ProviderConfig {
public static Logger logger = LoggerFactory.getLogger(ProviderConfig.class); private String api; private String mapper; private String alias; //发布,假设模拟一个服务 protected void doExport() {
logger.info("生产者信息:端口[" + api + "],映射[" + mapper + "],别名[" + alias + "]。"); RpcProviderConfig rpcProviderConfig = new RpcProviderConfig(); rpcProviderConfig.setApi(api); rpcProviderConfig.setMapper(mapper); rpcProviderConfig.setAlias(alias); rpcProviderConfig.setHost("127.0.0.1"); rpcProviderConfig.setPort(9999); //注册生产者// long count = RedisRegistryCenter.registryProvider(api, alias, JSON.toJSONString(rpcProviderConfig)); ZkRegistryCenter.registerService(api, alias, JSON.toJSONBytes(rpcProviderConfig)); logger.info("注册生产者:{} {} {}", api, alias, null); } public String getApi() {
return api; } public void setApi(String api) {
this.api = api; } public String getMapper() {
return mapper; } public void setMapper(String mapper) {
this.mapper = mapper; } public String getAlias() {
return alias; } public void setAlias(String alias) {
this.alias = alias; }}

消费者的配置(ConsumerConfig)类似提供者的这个配置,暂不实现。

/** * 实现服务提供的依赖注入 * @author kaer * */public class ProviderBean extends ProviderConfig implements ApplicationContextAware {
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 发布 doExport(); }}

其中有需要依赖的包(Curator对ZK的支持):

org.apache.curator
curator-framework
4.0.0

本地启动zookeeper

测试类如下:

public class ApiTest {
@SuppressWarnings("resource") public static void main(String[] args) throws UnknownHostException, IOException {
String[] configs = {
"rpc/test.xml" }; new ClassPathXmlApplicationContext(configs); }}

执行完了,通过可视化界面去看看是否注册成功(当然也可以用命令去操作,zkCli.exe)。

这里用到了zkUI的一个工具,需要的在附件中获取。
在这里插入图片描述
可以登陆进去可以看到有代码中创建的目录了
在这里插入图片描述
一层一层点进去,可以看到最终注册上去的服务的信息,在实际开发中,应获取应用本机的ip和端口,再附加服务的唯一性数据,订阅就是个获取并解析然后再进行netty通讯的步骤
在这里插入图片描述
本文结束,后面使用consul进行服务注册玩玩


附件:

转载地址:https://blog.csdn.net/weixin_33602978/article/details/105292948 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:【Python问题】解决pip install xxx出现Cannot open ...\venv\Scripts\pip-script.py问题
下一篇:【RPC系列】4、自定义xml标签(用netty、zk手写RPC第一步)

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2024年04月20日 03时27分07秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章