多线程合集(二)---异步的那些事,async和await原理抛析
发布日期:2022-04-11 08:53:00 浏览次数:12 分类:博客文章

本文共 16645 字,大约阅读时间需要 55 分钟。

引言

       在c#中,异步的async和await原理,以及运行机制,可以说是老生常谈,经常在各个群里看到有在讨论这个的,而且网上看到的也只是对异步状态机的一些讲解,甚至很多人说异步状态机的时候,他们说的是在运行时去构建状态机对线程状态进行调度,实际上异步状态机是属于编译期间,通过生成dll,然后我们使用反编译工具查看,是可以看到IL构建了异步状态机,并且在运行时添加了两个特性,其中比较重要的是AsyncStateMachine特性这个特性接受的是一个type类型的参数,即指定用的是哪一个异步状态机。所以在写多线程的时候,前面第一篇主要写线程方面的一些具体的使用,以及实现自定义的一些操作,接下来的这篇可能会注重原理方面的讲解,以及结合一些代码实现自定义状态机。

Part 1

       在c#中,有的关键字的使用实际上是由对应的类去进行封装的,那例如Lock关键字,是基于Monitor的Enter和Exit两个方法进行封装的,那对应的async和await关键字也是有对应的类或者结构体或者接口去进行封装的,上篇文章中,我们写了自定义的await,可以看到实际上await关键字的限制就是必须继承ICriticalNotifyCompletion, INotifyCompletion这两个接口,然后必须实现它接口的方法,这里有个缺陷就是,await自定义是必须有实现GetResult的方法的这个方法,但是实现那两个接口是没有这个方法的,所以GetResult方法必须是自己手动去实现,返回值的话可以根据自己的情况去写,可以是泛型T  也可以是void类型,然后需要实现一个拓展方法,拓展方法返回类型是你自定义的await,拓展方法是你需要使用await关键字的具体类型;那对应的async的关键字,也是有一个结构体进行封装的AsyncTaskMethodBuilder这个结构体是一个泛型,也有一个不是泛型的,这个可以对标你的自定义await 如果你的await是有返回值的是泛型的,那这个builder也必须是泛型,对标你的返回值类型。

public CustomAwaiter(Func
obj) { Obj = obj; } private bool bIsFinesh; private Timer Timer { get; set; } public bool IsCompleted { get { return bIsFinesh; } } private SpinLock SpinLock = new SpinLock(); private string Result { get; set; } public Func
Obj { get; } public void OnCompleted(Action continuation) { Timer = new Timer(s => { var action = s as Action; var bIsEnter = false; SpinLock.TryEnter(ref bIsEnter); if (bIsEnter) { Result = Obj.Invoke(5, 10); SpinLock.Exit(false); } Thread.Sleep(5000); action?.Invoke(); bIsFinesh = true; }, continuation, 0, int.MaxValue); } public void UnsafeOnCompleted(Action continuation) { Timer = new Timer(s => { var action = s as Action; var bIsEnter = false; SpinLock.TryEnter(ref bIsEnter); if (bIsEnter) { Result = Obj.Invoke(5, 10); SpinLock.Exit(false); } action?.Invoke(); bIsFinesh = true; }, continuation, 5000, int.MaxValue); } public string GetResult() { return Result; }
public static CustomAwaiter GetAwaiter(this Func
obj) { return new CustomAwaiter(obj); }

Part 2

       在第一部分中,我们找到了async 和await对应的结构体以及接口,那我们接下来看看实际上的异步的运行方式,下面这一段代码相信大家看起来很熟悉,感觉似曾相识,实际上异步方法加上async和await关键字的时候生成的IL代码转为c#代码基本上就是这个样子的。

GetResult方法,去调用异步状态机

       可以看到我们在这里定义了一个方法GetResult,这里面去执行一个异步状态机,这里可以看看自定义状态机的代码,实现了IAsyncStateMachine这个接口,重写了MoveNext的方法和SetStateMachine的两个方法,这里着重讲解MoveNext方法,在c#异步中,都是使用MoveNext方法来进行调度,通过定义的State来判断执行那一步,结合第一段代码片段,可以看到我们刚开始的时候设置的状态是-1,然后调用了Builder的Start方法,这个方法需要传入一个状态机的参数,所以我们传入我们自定义的状态机,

public static Task
GetResult() { CustomAsyncStateMechine customAsyncStateMechine = new CustomAsyncStateMechine(); customAsyncStateMechine.builder = AsyncTaskMethodBuilder
.Create(); customAsyncStateMechine.State = -1; customAsyncStateMechine.builder.Start(ref customAsyncStateMechine); return customAsyncStateMechine.builder.Task; }

需要执行的异步方法

public async static Task
Tests() { return await Task.Run(() => { return "hELLO"; }); }

 

自定义异步状态机

public class CustomAsyncStateMechine : IAsyncStateMachine    {        public AsyncTaskMethodBuilder
builder; public TaskAwaiter
awaiter; public int State; public void MoveNext() { TaskAwaiter
taskAwaiter=default; int num = State; CustomAsyncStateMechine state; string Result = string.Empty; switch (num) { case -1: taskAwaiter = Program.Tests().GetAwaiter(); if (!taskAwaiter.IsCompleted) { num = State = 0; awaiter = taskAwaiter; state = this; builder.AwaitUnsafeOnCompleted(ref taskAwaiter, ref state); return; } break; case 0: taskAwaiter = awaiter; awaiter = default(TaskAwaiter
); num = State = -1; break; } Result = taskAwaiter.GetResult(); builder.SetResult(Result); } public void SetStateMachine(IAsyncStateMachine stateMachine) { } }

 

Start方法

       可以在下面的代码段看到Start的方法代码,在我们调用了这个方法之后会构建一个用于切换线程上下文的对象,然后调用线程上下文的方法去进行一些操作,这里看一下,这个方法调用了状态机的MoveNext方法,这是第一次执行MoveNext的方法,可以看到我们第一次执行MoveNext方法的时候我们去获取了一下Tests的GetAwaiter,获取的时候实际上这个Tests方法已经执行了,然后我们去判断是否完成,如果没有完成,我们需要去进行下一步操作,在全局变量定义一个Awaiter,需要将Tests的Awaiter保存起来,然后切换State的状态推进到下一步,然后我们调用了Builder的AwaitUnsafeOnCompleted这个方法,

public void Start
(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { // See comment on AsyncMethodBuilderCore.Start // AsyncMethodBuilderCore.Start(ref stateMachine); if (stateMachine == null) throw new ArgumentNullException("stateMachine"); Contract.EndContractBlock(); // Run the MoveNext method within a copy-on-write ExecutionContext scope. // This allows us to undo any ExecutionContext changes made in MoveNext, // so that they won't "leak" out of the first await. ExecutionContextSwitcher ecs = default(ExecutionContextSwitcher); RuntimeHelpers.PrepareConstrainedRegions(); try { ExecutionContext.EstablishCopyOnWriteScope(ref ecs); stateMachine.MoveNext(); } finally { ecs.Undo(); } }

AwaitUnsafeOnCompleted方法

       可以看到这个方法内部有调用了一个GetCompletionAction方法

public void AwaitUnsafeOnCompleted
( ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine { try { AsyncMethodBuilderCore.MoveNextRunner runnerToInitialize = null; var continuation = m_coreState.GetCompletionAction(AsyncCausalityTracer.LoggingOn ? this.Task : null, ref runnerToInitialize); Contract.Assert(continuation != null, "GetCompletionAction should always return a valid action."); // If this is our first await, such that we've not yet boxed the state machine, do so now. if (m_coreState.m_stateMachine == null) { // Force the Task to be initialized prior to the first suspending await so // that the original stack-based builder has a reference to the right Task. var builtTask = this.Task; // Box the state machine, then tell the boxed instance to call back into its own builder, // so we can cache the boxed reference. Contract.Assert(!Object.ReferenceEquals((object)stateMachine, (object)stateMachine), "Expected an unboxed state machine reference"); m_coreState.PostBoxInitialization(stateMachine, runnerToInitialize, builtTask); } awaiter.UnsafeOnCompleted(continuation); } catch (Exception e) { AsyncMethodBuilderCore.ThrowAsync(e, targetContext: null); } }

 

GetCompletionAction方法

       这里我们着重看一下runner.run方法可以看到Run方法里面不管是怎么去进行操作,最后都是要去执行MoveNext方法,接下来看一下上面的AwaitUnsafeOnCompleted方法,还记得上一篇文章中,我卖了一个关子,询问大家OnCompleted和UnsafeOnCompleted方法里面的Action是哪一个方法,现在已经很明了了,这个Action执行的是状态机的MoveNext方法,它是在Task完成之后,去执行OnCompleted和UnSafeOnCompleted方法的,这里为了方便大家理解,需要结合上一篇文章中自定义任务调度TaskScheduler去给大家演示,最好是希望阅读文章后去下载最新的代码进行调试就会很明白,

internal Action GetCompletionAction(Task taskForTracing, ref MoveNextRunner runnerToInitialize)        {            Contract.Assert(m_defaultContextAction == null || m_stateMachine != null,                "Expected non-null m_stateMachine on non-null m_defaultContextAction");             // Alert a listening debugger that we can't make forward progress unless it slips threads.            // If we don't do this, and a method that uses "await foo;" is invoked through funceval,            // we could end up hooking up a callback to push forward the async method's state machine,            // the debugger would then abort the funceval after it takes too long, and then continuing            // execution could result in another callback being hooked up.  At that point we have            // multiple callbacks registered to push the state machine, which could result in bad behavior.            Debugger.NotifyOfCrossThreadDependency();             // The builder needs to flow ExecutionContext, so capture it.            var capturedContext = ExecutionContext.FastCapture(); // ok to use FastCapture as we haven't made any permission demands/asserts             // If the ExecutionContext is the default context, try to use a cached delegate, creating one if necessary.            Action action;            MoveNextRunner runner;            if (capturedContext != null && capturedContext.IsPreAllocatedDefault)            {                // Get the cached delegate, and if it's non-null, return it.                action = m_defaultContextAction;                if (action != null)                {                    Contract.Assert(m_stateMachine != null, "If the delegate was set, the state machine should have been as well.");                    return action;                }                 // There wasn't a cached delegate, so create one and cache it.                // The delegate won't be usable until we set the MoveNextRunner's target state machine.                runner = new MoveNextRunner(capturedContext, m_stateMachine);                 action = new Action(runner.Run);                if (taskForTracing != null)                {                    m_defaultContextAction = action = OutputAsyncCausalityEvents(taskForTracing, action);                }                else                {                    m_defaultContextAction = action;                }            }            // Otherwise, create an Action that flows this context.  The context may be null.            // The delegate won't be usable until we set the MoveNextRunner's target state machine.            else            {                runner = new MoveNextRunner(capturedContext, m_stateMachine);                action = new Action(runner.Run);                 if (taskForTracing != null)                {                    action = OutputAsyncCausalityEvents(taskForTracing, action);                }                 // NOTE: If capturedContext is null, we could create the Action to point directly                // to m_stateMachine.MoveNext.  However, that follows a much more expensive                // delegate creation path.            }             if (m_stateMachine == null)                runnerToInitialize = runner;             return action;        }
internal void Run()            {                Contract.Assert(m_stateMachine != null, "The state machine must have been set before calling Run.");                 if (m_context != null)                {                    try                    {                        // Get the callback, lazily initializing it as necessary                        ContextCallback callback = s_invokeMoveNext;                        if (callback == null) { s_invokeMoveNext = callback = InvokeMoveNext; }                         // Use the context and callback to invoke m_stateMachine.MoveNext.                        ExecutionContext.Run(m_context, callback, m_stateMachine, preserveSyncCtx: true);                    }                    finally { m_context.Dispose(); }                }                else                {                    m_stateMachine.MoveNext();                }            }             /// Cached delegate used with ExecutionContext.Run.            [SecurityCritical]            private static ContextCallback s_invokeMoveNext; // lazily-initialized due to SecurityCritical attribution             /// Invokes the MoveNext method on the supplied IAsyncStateMachine.            /// The IAsyncStateMachine machine instance.            [SecurityCritical] // necessary for ContextCallback in CoreCLR            private static void InvokeMoveNext(object stateMachine)            {                ((IAsyncStateMachine)stateMachine).MoveNext();            }

CustomScheduler 和CustomAwaiter 以及自定义状态机的结合使用,

foreach (var item in Enumerable.Range(0, 1))                {                   await Task.Run(async () =>                    {                        var i = item;                        var ts = new Func
((s, b) => { return Guid.NewGuid().ToString(); }); //var t= await ts; var tash = new TaskCustomScheduler(); var factory = new TaskFactory(tash); await factory.StartNew(async () => { var state = new CustomAsyncStateMechines(); state.State = -1; state.awaiter = ts.GetAwaiter(); state.builder = AsyncTaskMethodBuilder
.Create(); state.builder.Start(ref state); var result = await state.builder.Task; Console.WriteLine(result); }); }); }

       在上一篇文章中,我们讲解了自定义调度的几个比较重要的方法,我们在使用factory去进行指定了调度器之后,调用了StartNew方法,去执行一段代码,这里的是,实际上在StartNew执行之前,他会先咋自定义任务调度里面添加Task,他会走到QueueTask将Task添加到自己定义的任务池里面去,然后再去RunWork,去通过ThreadPool去执行Task,TryExecuteTask是抽象类提供且内部实现的一个方法,是去执行Task,然后Task执行结束后,我们把它从任务调度池里面移除,那Task结束之后,就会走到自定义Awaiter里面UnSafeOnCompleted方法里面去,然后在这里面再去写执行完成的回调,将状态机向前推进,然后在Movenext方法里面,我们在去获取awaiter的结果,这里就是刚开始所说的就是自定义Awaiter需要自己写的GetResult方法,然后获取到结果之后,我们需要将结果赋值到Task中,就需要调用builder的SetResult方法,实际上对于Task的异常处理也是有SetException方法去进行设置异常的,就需要在MoveNext方法中添加Try Catch  然后捕获之后去迪奥用SetException方法设置异常,这就是async和await异步执行的相关过程,对于内部更深层次的,我目前也是一知半解,但是大体意思都是知道。

 

public class TaskCustomScheduler : TaskScheduler    {        private SpinLock SpinLock = new SpinLock();        public TaskCustomScheduler()        {        }        private ConcurrentQueue
Tasks = new ConcurrentQueue
(); protected override IEnumerable
GetScheduledTasks() { return Tasks.ToList(); } protected override void QueueTask(Task task) { Tasks.Enqueue(task); RunWork(); } protected override bool TryDequeue(Task task) { return Tasks.TryDequeue(out task); } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return TryExecuteTask(task); } private void RunWork() { ThreadPool.UnsafeQueueUserWorkItem(_ => { try { foreach (var item in Tasks) { var task = item; var isEnter = false; SpinLock.TryEnter(ref isEnter); TryExecuteTask(task); if (isEnter) { Tasks.TryDequeue(out task); SpinLock.Exit(false); } } } finally { } }, null); } }

 

Part 2

       c#中,实际上所有的Task都是基于ThreadPoolScheduler去进行运行的,这个类开发者是没有办法去new的,但是在TaskScheduler中有一个属性Default实际上它返回的就是这个类,然后Task的时候都是运行在这个类上面,由这个类去进行调度,至于有的人说异步多线程,有的时候异步是多线程有的时候不是多线程,在这里,可以肯定的是async和await的异步是多线程的,但是对于一些类提供的Begin开头的异步,这种的 ,我的观点是,不是多线程的,如果我说的不对的话,希望各位大佬能够进行指正,代码的话,我会放在Gitee里面去,家里的网络上不去Github。抱歉,

总结

       多线程方面的文章就讲解到这里,后续可能会出一些,winform方面自绘或者Net Core自定义配置结合Options进行的自定义,敬请各位大佬进行关注,如果对文章或者代码有不懂的地方,可以看自己所在的群里有没有叫四川观察的,那基本上就是我了,或者加QQ群6406277,找我也可以,在这里,谢谢大家的支持,以后会多发表开发方面的知识,大家一起学习,一起进步。  ditee地址  :https://gitee.com/cxd199645/Thread

 

转载地址:https://www.cnblogs.com/1996-Chinese-Chen/p/15594498.html 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:多线程同时new一个类的实例,该类会被初始化多次么?
下一篇:多线程合集(一)---信号量,锁,以及并发编程,自定义任务调度和awaiter

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年03月13日 12时31分12秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

linux服务器怎么添加站点,如何增加站点或虚拟主机及文件说明 2019-04-21
linux系统输入指令,Linux系统基础 - 基本操作命令 2019-04-21
linux设备管理命令,Linux命令(设备管理).doc 2019-04-21
linux 中文utf-8转gbk编码,Linux平台下 GBK编码转UTF-8编码 2019-04-21
linux安装软件在boot,在Linux系统上安装Spring boot应用的教程详解 2019-04-21
linux进入用户user1主目录,Linux系统命令提示符为[user1@localhost root]当前用户所在目录为( )... 2019-04-21
取消linux自动登录,linuxdeepin 如何取消自动登录啊? 2019-04-21
linux线程存储,Linux系统编程手册:线程:线程安全和每线程存储 2019-04-21
linux批处理模式,巧用linux-top的批处理模式 2019-04-21
linux信号量机制例题,第二章 信号量机制及几个经典例题 2019-04-21
linux ba 模拟,在你的 Python 游戏中模拟引力 | Linux 中国 2019-04-21
c语言表达式3649的值是,535个C语言经典实例目录.doc 2019-04-21
c语言Wndproc未定义,小弟我用c语言写了一个windows窗口,为什么有提示未定义的变量类型... 2019-04-21
c语言中malloc数组,如何在C中对malloc()数组进行一行赋值? 2019-04-21
c语言调存储过程,写留言板–调用存储过程出问题 2019-04-21
c语言编程max,C语言编程题及答案.doc 2019-04-21
android测试页面,自动执行界面测试 | Android 开发者 | Android Developers 2019-04-21
android 图片点击变色,Android开发实现ListView点击item改变颜色功能示例 2019-04-21
android增删改查布局,Android之父_增删改查 2019-04-21
vowifi android开关,如何配置VoLTE, ViLTE and VoWifi(IMS config for VoLTE, ViLTE and VoWifi) 2019-04-21