(精华)2020年9月22日 微服务 Consul工具层封装和使用
发布日期:2021-06-29 15:12:19
浏览次数:2
分类:技术文章
本文共 19812 字,大约阅读时间需要 66 分钟。
封装
Cluster文件夹
////// 负载均衡抽象实现 /// public abstract class AbstractLoadBalance : ILoadBalance { public ServiceUrl Select(IListserviceUrls) { if (serviceUrls == null || serviceUrls.Count ==0) return null; if (serviceUrls.Count == 1) return serviceUrls[0]; return DoSelect(serviceUrls); } /// /// 子类去实现 /// /// ///public abstract ServiceUrl DoSelect(IList serviceUrls); }
////// 服务负载均衡 /// public interface ILoadBalance { ////// 服务选择 /// /// ///ServiceUrl Select(IList serviceUrls); }
////// 随机负载均衡 /// 1、还可以实现加权轮询 /// public class RandomLoadBalance : AbstractLoadBalance { private readonly Random random = new Random(); public override ServiceUrl DoSelect(IListserviceUrls) { // 1、获取随机数 var index = random.Next(serviceUrls.Count); // 2、选择一个服务进行连接 return serviceUrls[index]; } }
HttpClientConsul文件夹
////// consul httpclient扩展 /// public class ConsulHttpClient { private readonly IServiceDiscovery serviceDiscovery; private readonly ILoadBalance loadBalance; private readonly IHttpClientFactory httpClientFactory; public ConsulHttpClient(IServiceDiscovery serviceDiscovery, ILoadBalance loadBalance, IHttpClientFactory httpClientFactory) { this.serviceDiscovery = serviceDiscovery; this.loadBalance = loadBalance; this.httpClientFactory = httpClientFactory; } ////// Get方法 /// ////// param name="ServiceSchme">服务名称:(http/https) /// 服务名称 /// 服务路径 /// public async Task GetAsync (string Serviceshcme, string ServiceName,string serviceLink) { // 1、获取服务 IList serviceUrls = await serviceDiscovery.Discovery(ServiceName); // 2、负载均衡服务 ServiceUrl serviceUrl = loadBalance.Select(serviceUrls); // 3、建立请求 HttpClient httpClient = httpClientFactory.CreateClient("mrico"); HttpResponseMessage response = await httpClient.GetAsync(serviceUrl.Url + serviceLink); // 3.1json转换成对象 if (response.StatusCode == HttpStatusCode.OK) { string json = await response.Content.ReadAsStringAsync(); return JsonConvert.DeserializeObject (json); } else { throw new Exception($"{ServiceName}服务调用错误"); } } }
////// HttpClientFactory conusl下的扩展 /// public static class ConsulHttpClientServiceCollectionExtensions { ////// 添加consul /// ////// /// public static IServiceCollection AddHttpClientConsul (this IServiceCollection services) where ConsulHttpClient : class { // 1、注册consul services.AddConsulDiscovery(); // 2、注册服务负载均衡 services.AddSingleton (); // 3、注册httpclient services.AddSingleton (); return services; } }
HttpClientPolly文件夹
////// HttpClient熔断降级策略选项 /// public class PollyHttpClientOptions { ////// 超时时间设置,单位为秒 /// public int TimeoutTime { set; get; } ////// 失败重试次数 /// public int RetryCount { set; get; } ////// 执行多少次异常,开启短路器(例:失败2次,开启断路器) /// public int CircuitBreakerOpenFallCount { set; get; } ////// 断路器开启的时间(例如:设置为2秒,短路器两秒后自动由开启到关闭) /// public int CircuitBreakerDownTime { set; get; } ////// 降级处理(将异常消息封装成为正常消息返回,然后进行响应处理,例如:系统正在繁忙,请稍后处理.....) /// public HttpResponseMessage httpResponseMessage { set; get; } }
////// 微服务中HttpClient熔断,降级策略扩展 /// public static class PollyHttpClientServiceCollectionExtensions { ////// Httpclient扩展方法 /// /// ioc容器 /// HttpClient 名称(针对不同的服务进行熔断,降级) /// 熔断降级配置 /// 降级处理错误的结果 ///public static IServiceCollection AddPollyHttpClient(this IServiceCollection services, string name,Action action) { // 1、创建选项配置类 PollyHttpClientOptions options = new PollyHttpClientOptions(); action(options); // 2、配置httpClient,熔断降级策略 services.AddHttpClient(name) //1.1 降级策略 .AddPolicyHandler(Policy .HandleInner ().FallbackAsync(options.httpResponseMessage, async b => { // 1、降级打印异常 Console.WriteLine($"服务{name}开始降级,异常消息:{b.Exception.Message}"); // 2、降级后的数据 Console.WriteLine($"服务{name}降级内容响应:{options.httpResponseMessage.Content.ToString()}"); await Task.CompletedTask; })) // 1.2 断路器策略 .AddPolicyHandler(Policy .Handle ().CircuitBreakerAsync(options.CircuitBreakerOpenFallCount, TimeSpan.FromSeconds(options.CircuitBreakerDownTime), (ex, ts) => { Console.WriteLine($"服务{name}断路器开启,异常消息:{ex.Exception.Message}"); Console.WriteLine($"服务{name}断路器开启时间:{ts.TotalSeconds}s"); }, () => { Console.WriteLine($"服务{name}断路器关闭"); }, () => { Console.WriteLine($"服务{name}断路器半开启(时间控制,自动开关)"); })) // 1.3 重试策略 .AddPolicyHandler(Policy .Handle () .RetryAsync(options.RetryCount) ) // 1.4 超时策略 .AddPolicyHandler(Policy.TimeoutAsync (TimeSpan.FromSeconds(options.TimeoutTime))); return services; } }
Registry文件夹
////// consul服务发现实现 /// public class ConsulServiceDiscovery : IServiceDiscovery { private readonly IConfiguration configuration; public ConsulServiceDiscovery(IConfiguration configuration) { this.configuration = configuration; } public async Task> Discovery(string serviceName) { ServiceDiscoveryConfig serviceDiscoveryConfig = configuration.GetSection("ConsulDiscovery").Get (); // 1、创建consul客户端连接 var consulClient = new ConsulClient(configuration => { //1.1 建立客户端和服务端连接 configuration.Address = new Uri(serviceDiscoveryConfig.RegistryAddress); }); // 2、consul查询服务,根据具体的服务名称查询 var queryResult = await consulClient.Catalog.Service(serviceName); // 3、将服务进行拼接 var list = new List (); foreach (var service in queryResult.Response) { list.Add(new ServiceUrl { Url = service.ServiceAddress + ":" + service.ServicePort }); } return list; } }
////// consul服务注册实现 /// public class ConsulServiceRegistry : IServiceRegistry { ////// 注册服务 /// /// public void Register(ServiceRegistryConfig serviceNode) { // 1、创建consul客户端连接 var consulClient = new ConsulClient(configuration => { //1.1 建立客户端和服务端连接 configuration.Address = new Uri(serviceNode.RegistryAddress); }); // 2、获取服务内部地址 // 3、创建consul服务注册对象 var registration = new AgentServiceRegistration() { ID = serviceNode.Id, Name = serviceNode.Name, Address = serviceNode.Address, Port = serviceNode.Port, Tags = serviceNode.Tags, Check = new AgentServiceCheck { // 3.1、consul健康检查超时间 Timeout = TimeSpan.FromSeconds(10), // 3.2、服务停止5秒后注销服务 DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5), // 3.3、consul健康检查地址 HTTP = serviceNode.HealthCheckAddress, // 3.4 consul健康检查间隔时间 Interval = TimeSpan.FromSeconds(10), } }; // 4、注册服务 consulClient.Agent.ServiceRegister(registration).Wait(); // 5、关闭连接 consulClient.Dispose(); } ////// 注销服务 /// /// public void Deregister(ServiceRegistryConfig serviceNode) { // 1、创建consul客户端连接 var consulClient = new ConsulClient(configuration => { //1.1 建立客户端和服务端连接 configuration.Address = new Uri(serviceNode.RegistryAddress); }); // 2、注销服务 consulClient.Agent.ServiceDeregister(serviceNode.Id); // 3、关闭连接 consulClient.Dispose(); } }
////// 服务发现 /// public interface IServiceDiscovery { ////// 服务发现 /// /// 服务名称 ///Task > Discovery(string serviceName); }
////// 服务注册 /// public interface IServiceRegistry { ////// 注册服务 /// void Register(ServiceRegistryConfig serviceNode); ////// 撤销服务 /// void Deregister(ServiceRegistryConfig serviceNode); }
////// 服务发现配置 /// public class ServiceDiscoveryConfig { ////// 服务注册地址 /// public string RegistryAddress { set; get; } }
////// 服务注册节点 /// public class ServiceRegistryConfig { // 服务ID public string Id { get; set; } // 服务名称 public string Name { get; set; } // 服务标签(版本) public string[] Tags { set; get; } // 服务地址(可以选填 === 默认加载启动路径) public string Address { set; get; } // 服务端口号(可以选填 === 默认加载启动路径端口) public int Port { set;get; } // 服务注册地址 public string RegistryAddress { get; set; } // 服务健康检查地址 public string HealthCheckAddress { get; set; } }
////// 服务url /// public class ServiceUrl { public string Url { set; get; } }
使用的扩展方法
////// 微服务注册发现使用扩展 /// public static class MicroServiceConsulApplicationBuilderExtensions { public static IApplicationBuilder UseConsulRegistry(this IApplicationBuilder app) { // 1、从IOC容器中获取Consul服务注册配置 var serviceNode = app.ApplicationServices.GetRequiredService>().Value; // 2、获取应用程序生命周期 var lifetime = app.ApplicationServices.GetRequiredService (); // 2.1 获取服务注册实例 var serviceRegistry = app.ApplicationServices.GetRequiredService (); // 3、获取服务地址 var features = app.Properties["server.Features"] as FeatureCollection; var address = features.Get ().Addresses.First(); var uri = new Uri(address); // 4、注册服务 serviceNode.Id = Guid.NewGuid().ToString(); serviceNode.Address = $"{uri.Scheme}://{uri.Host}"; serviceNode.Port = uri.Port; serviceNode.HealthCheckAddress = $"{uri.Scheme}://{uri.Host}:{uri.Port}{serviceNode.HealthCheckAddress}"; serviceRegistry.Register(serviceNode); // 5、服务器关闭时注销服务 lifetime.ApplicationStopping.Register(() => { serviceRegistry.Deregister(serviceNode); }); return app; } }
////// Console 注册中心扩展(加载配置) /// public static class MicroServiceConsulServiceCollectionExtensions { // consul服务注册 public static IServiceCollection AddConsulRegistry(this IServiceCollection services, IConfiguration configuration) { // 1、加载Consul服务注册配置 services.Configure(configuration.GetSection("ConsulRegistry")); // 2、注册consul注册 services.AddSingleton (); return services; } // consul服务发现 public static IServiceCollection AddConsulDiscovery(this IServiceCollection services) { // 1、加载Consul服务发现配置 // services.Configure (configuration.GetSection("ConsulDiscovery")); // 2、注册consul服务发现 services.AddSingleton (); return services; } }
服务注册
首先在配置文件中添加如下配置
"ConsulRegistry": { "Name": "TeamService", "RegistryAddress": "http://127.0.0.1:8500", "HealthCheckAddress": "/HealthCheck" }
public class Startup { public Startup(IConfiguration configuration) { Configuration = configuration; } public IConfiguration Configuration { get; } // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { // 1、注册上下文到IOC容器 services.AddDbContext(options => { options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")); }); // 2、注册团队service services.AddScoped (); // 3、注册团队仓储 services.AddScoped (); // 4、添加映射 //services.AddAutoMapper(); // 5、添加服务注册条件 services.AddConsulRegistry(Configuration); services.AddControllers(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseHttpsRedirection(); // 1、consul服务注册 app.UseConsulRegistry(); app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); } }
聚合层发现服务
"ConsulDiscovery": { "RegistryAddress": "http://127.0.0.1:8500" }
public class Startup { public Startup(IConfiguration configuration) { Configuration = configuration; } public IConfiguration Configuration { get; } // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { // 1、自定义异常处理(用缓存处理) var fallbackResponse = new HttpResponseMessage { Content = new StringContent("系统正繁忙,请稍后重试"),// 内容,自定义内容 StatusCode = HttpStatusCode.GatewayTimeout // 504 }; //services.AddHttpClient().AddHttpClientConsul(); #region polly配置 { /*services.AddHttpClient("mrico") // 1.1 降级(捕获异常,进行自定义处理) .AddPolicyHandler(Policy .Handle ().FallbackAsync(fallbackResponse, async b => { // 1、降级打印异常 Console.WriteLine($"开始降级,异常消息:{b.Exception.Message}"); // 2、降级后的数据 //Console.WriteLine($"降级内容响应:{}"); await Task.CompletedTask; })) // 1.2 熔断机制 .AddPolicyHandler(Policy .Handle ().CircuitBreakerAsync(3, TimeSpan.FromSeconds(10), (ex, ts) => { Console.WriteLine($"断路器开启,异常消息:{ex.Exception.Message}"); Console.WriteLine($"断路器开启时间:{ts.TotalSeconds}s"); }, () => { Console.WriteLine($"断路器重置"); }, () => { Console.WriteLine($"断路器半开启(一会开,一会关)"); })) // 1.3 失败重试 .AddPolicyHandler(Policy .Handle () .RetryAsync(3) ) //1.4、超时 .AddPolicyHandler(Policy.TimeoutAsync (TimeSpan.FromSeconds(2)));*/ } #endregion // 1.2 封装之后的调用PollyHttpClient services.AddPollyHttpClient("mrico", options => { options.TimeoutTime = 1; // 1、超时时间 options.RetryCount = 3;// 2、重试次数 options.CircuitBreakerOpenFallCount = 2;// 3、熔断器开启(多少次失败开启) options.CircuitBreakerDownTime = 100;// 4、熔断器开启时间 options.httpResponseMessage = fallbackResponse;// 5、降级处理 }) // 2、consul封装 .AddHttpClientConsul (); /*// 2、注册consul服务发现 services.AddConsulDiscovery(); // 3、注册负载均衡 services.AddSingleton ();*/ // 4、注册team服务 services.AddSingleton (); services.AddControllers(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseHttpsRedirection(); app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); } }
////// 服务调用实现 /// public class HttpTeamServiceClient : ITeamServiceClient { /*public readonly IServiceDiscovery serviceDiscovery; public readonly ILoadBalance loadBalance;*/ private readonly IHttpClientFactory httpClientFactory; private readonly string ServiceSchme = "https"; private readonly string ServiceName = "teamservice"; //服务名称 private readonly string ServiceLink = "/Teams"; //服务名称 //private readonly ConsulHttpClient consulHttpClient; public HttpTeamServiceClient(/*IServiceDiscovery serviceDiscovery, ILoadBalance loadBalance,*/ IHttpClientFactory httpClientFactory/*, ConsulHttpClient consulHttpClient*/) { /*this.serviceDiscovery = serviceDiscovery; this.loadBalance = loadBalance;*/ this.httpClientFactory = httpClientFactory; //this.consulHttpClient = consulHttpClient; } public async Task> GetTeams() { // 1、获取服务 /* IList serviceUrls = await serviceDiscovery.Discovery(ServiceName); // 2、负载均衡服务 ServiceUrl serviceUrl = loadBalance.Select(serviceUrls); // string name = "https://localhost:5001";*/ // 3、建立请求 for (int i =0;i < 100;i++) { try { Thread.Sleep(1000); HttpClient httpClient = httpClientFactory.CreateClient("mrico"); HttpResponseMessage response = await httpClient.GetAsync("https://localhost:5001" + ServiceLink); // 3.1json转换成对象 IList teams = null; if (response.StatusCode == HttpStatusCode.OK) { string json = await response.Content.ReadAsStringAsync(); teams = JsonConvert.DeserializeObject
>(json); } else { Console.WriteLine($"降级处理:{await response.Content.ReadAsStringAsync()}"); } } catch (Exception e) { Console.WriteLine($"异常捕获:{e.Message}"); } } // List teams = await consulHttpClient.GetAsync
>(ServiceSchme, ServiceName, ServiceLink); return null; } }
转载地址:https://codeboy.blog.csdn.net/article/details/108458350 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
初次前来,多多关照!
[***.217.46.12]2024年04月12日 18时59分45秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
UR5e机械臂运行一直阻塞在waitForServer
2019-04-29
ROS把pkg1下的某个头文件和源文件生成动态链接库供pkg2调用
2019-04-29
使用urdf_tutorial快速可视化urdf文件
2019-04-29
SQl 数据完整性(随堂博客)
2019-04-29
左连接、右连接、内连接
2019-04-29
MySQL DQL语句基础(随堂博客)
2019-04-29
MySQL基础练习
2019-04-29
利用MySQL进行数据复杂查询(1)
2019-04-29
利用MySQL进行数据复杂查询(2)
2019-04-29
MySQL 表与表之间的关系
2019-04-29
Python数据处理
2019-04-29
Java练习题(面向对象)
2019-04-29
Python 利用os和shutil复制系统文件
2019-04-29
Python 循环输出菱形字符串
2019-04-29
MySQL常见错误总结
2019-04-29
pymysql 的基础应用
2019-04-29
Html+Css实现 启橙装饰网 项目
2019-04-29
JavaScript 实现哥德巴赫猜想
2019-04-29
JavaScript DOM
2019-04-29
Python 管理程序改进——连接MYSQL
2019-04-29