(精华)2021年02月19日 .NET Core 多线程底层详解(自旋锁)

tech2024-12-15  25

一:自旋锁的概念

自旋锁:自旋锁(Spinlock)是最简单的线程锁,基于原子操作实现,它使用一个数值来表示锁是否已经被获取,0表示未被获取,1表示已经获取,获取锁时会先使用原子操作设置数值为1,然后检查修改前的值是否为0,如果为0则代表获取成功,否则继续重试直到成功为止,释放锁时会设置数值为0,其他正在获取锁的线程会在下一次重试时成功获取,使用原子操作的原因是,它可以保证多个线程同时把数值0修改到1时,只有一个线程可以观察到修改前的值为0,其他线程观察到修改前的值为1

.NET 可以使用以下的类实现自旋锁:

System.Threading.Thread.SpinWait System.Threading.SpinWait System.Threading.SpinLock

使用自旋锁有个需要注意的问题,自旋锁保护的代码应该在非常短的时间内执行完毕,如果代码长时间运行则其他需要获取锁的线程会不断重试并占用逻辑核心,影响其他线程运行,此外,如果 CPU 只有一个逻辑核心,自旋锁在获取失败时应该立刻调用 Thread.Yield 函数提示操作系统切换到其他线程,因为一个逻辑核心同一时间只能运行一个线程,在切换线程之前其他线程没有机会运行,也就是切换线程之前自旋锁没有机会被释放

二:自旋锁的使用

class Program { static void Main(string[] args) { var count = 0; var taskList = new Task[10]; Stopwatch sp = new Stopwatch(); sp.Start(); // 不要意外复制。每个实例都是独立的。 SpinLock _spinLock = new SpinLock(); for (int i = 0; i < taskList.Length; i++) { taskList[i] = Task.Run(() => { bool _lock = false; for (int j = 0; j < 10_000_000; j++) { _spinLock.Enter(ref _lock); count++; _spinLock.Exit(); _lock = false; } }); } sp.Stop(); Task.WaitAll(taskList); Console.WriteLine($"完成! 耗时:{sp.ElapsedTicks}"); Console.WriteLine($"结果:{count}"); } }

自旋锁的简单用法

public static class SpinlockSample { private static int _lock = 0; private static int _counterA = 0; private static int _counterB = 0; public static void IncrementCounter() { var spinwait = new SpinWait(); // 获取锁 while (Interlocked.Exchange(ref _lock, 1) != 0) { // 汇编指令pause Thread.SpinWait(1); spinwait.SpinOnce(); // 一定次数以内,核心大于1,Thread.SpinWait // 超过一定次数,核心等于1,交替使用Thread.Sleep(0)和Thread.Yield方法 // 再超过一定次数,Thread.Sleep(1) // Sleep(0)实际上调用SleepEx系统函数 // Yield()调用SwitchToThread的系统函数 } // 锁保护区域 { _counterA++; _counterB++; } // 释放锁 Interlocked.Exchange(ref _lock, 0); } } using System; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Sample5_4_spinlock { class Program { private static int _TaskNum = 3; private static Task[] _Tasks; private static StringBuilder _StrBlder; private const int RUN_LOOP = 50; private static SpinLock m_spinlock; private static void Work1(int TaskID) { int i = 0; string log = ""; bool lockToken = false; while (i < RUN_LOOP) { log = String.Format("Time: {0} Task : #{1} Value: {2} =====\n", DateTime.Now.TimeOfDay, TaskID, i); i++; try { lockToken = false; m_spinlock.Enter(ref lockToken); _StrBlder.Append(log); } finally { if (lockToken) m_spinlock.Exit(false); } } } private static void Work2(int TaskID) { int i = 0; string log = ""; bool lockToken = false; while (i < RUN_LOOP) { log = String.Format("Time: {0} Task : #{1} Value: {2} *****\n", DateTime.Now.TimeOfDay, TaskID, i); i++; try { lockToken = false; m_spinlock.Enter(ref lockToken); _StrBlder.Append(log); } finally { if (lockToken) m_spinlock.Exit(false); } } } private static void Work3(int TaskID) { int i = 0; string log = ""; bool lockToken = false; while (i < RUN_LOOP) { log = String.Format("Time: {0} Task : #{1} Value: {2} ~~~~~\n", DateTime.Now.TimeOfDay, TaskID, i); i++; try { lockToken = false; m_spinlock.Enter(ref lockToken); _StrBlder.Append(log); } finally { if (lockToken) m_spinlock.Exit(false); } } } static void Main(string[] args) { _Tasks = new Task[_TaskNum]; _StrBlder = new StringBuilder(); m_spinlock = new SpinLock(); _Tasks[0] = Task.Factory.StartNew((num) => { var taskid = (int)num; Work1(taskid); }, 0); _Tasks[1] = Task.Factory.StartNew((num) => { var taskid = (int)num; Work2(taskid); }, 1); _Tasks[2] = Task.Factory.StartNew((num) => { var taskid = (int)num; Work3(taskid); }, 2); var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) => { Task.WaitAll(_Tasks); Console.WriteLine("=========================================================="); Console.WriteLine("All Phase is completed"); Console.WriteLine("=========================================================="); Console.WriteLine(_StrBlder); }); try { finalTask.Wait(); } catch (AggregateException aex) { Console.WriteLine("Task failed And Canceled" + aex.ToString()); } finally { } Console.ReadLine(); } } }

在每个任务的finally块中,会调用SpinLock的release,否则在SpinLock.Enter中,程序会在循环中不断的尝试获得锁,造成死锁。一旦获得了锁,ref 的 LockToken会被变成 true。

超时的用法:SpinLock 同样也提供了超时机制供开发使用。一个程序示例,worker 1会造成超时,work 2 和work 3则在程序中捕获超时产生的异常,终止运行。

using System; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Sample5_5_spinlock_timeout { class Program { private static int _TaskNum = 3; private static Task[] _Tasks; private static StringBuilder _StrBlder; private const int RUN_LOOP = 50; private static SpinLock m_spinlock; private static void Work1(int TaskID) { int i = 0; string log = ""; bool lockToken = false; while (i < RUN_LOOP) { log = String.Format("Time: {0} Task : #{1} Value: {2} =====\n", DateTime.Now.TimeOfDay, TaskID, i); i++; try { lockToken = false; m_spinlock.TryEnter(2000, ref lockToken); if (!lockToken) { Console.WriteLine("Work1 TIMEOUT!! Will throw Exception"); throw new TimeoutException("Work1 TIMEOUT!!"); } System.Threading.Thread.Sleep(5000); _StrBlder.Append(log); } finally { if (lockToken) m_spinlock.Exit(false); } } } private static void Work2(int TaskID) { int i = 0; string log = ""; bool lockToken = false; while (i < RUN_LOOP) { log = String.Format("Time: {0} Task : #{1} Value: {2} *****\n", DateTime.Now.TimeOfDay, TaskID, i); i++; try { lockToken = false; m_spinlock.TryEnter(2000, ref lockToken); if (!lockToken) { Console.WriteLine("Work2 TIMEOUT!! Will throw Exception"); throw new TimeoutException("Work2 TIMEOUT!!"); } _StrBlder.Append(log); } finally { if (lockToken) m_spinlock.Exit(false); } } } private static void Work3(int TaskID) { int i = 0; string log = ""; bool lockToken = false; while (i < RUN_LOOP) { log = String.Format("Time: {0} Task : #{1} Value: {2} ~~~~~\n", DateTime.Now.TimeOfDay, TaskID, i); i++; try { lockToken = false; m_spinlock.TryEnter(2000, ref lockToken); if (!lockToken) { Console.WriteLine("Work3 TIMEOUT!! Will throw Exception"); throw new TimeoutException("Work3 TIMEOUT!!"); } _StrBlder.Append(log); } finally { if (lockToken) m_spinlock.Exit(false); } } } static void Main(string[] args) { _Tasks = new Task[_TaskNum]; _StrBlder = new StringBuilder(); m_spinlock = new SpinLock(); _Tasks[0] = Task.Factory.StartNew((num) => { var taskid = (int)num; Work1(taskid); }, 0); _Tasks[1] = Task.Factory.StartNew((num) => { var taskid = (int)num; Work2(taskid); }, 1); _Tasks[2] = Task.Factory.StartNew((num) => { var taskid = (int)num; Work3(taskid); }, 2); var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) => { Task.WaitAll(_Tasks); Console.WriteLine("=========================================================="); Console.WriteLine("All Phase is completed"); Console.WriteLine("=========================================================="); Console.WriteLine(_StrBlder); }); try { finalTask.Wait(); } catch (AggregateException aex) { Console.WriteLine("Task failed And Canceled" + aex.ToString()); } finally { } Console.ReadLine(); } } }

操作 SpinWait 的另一种方式 – 实例化 SpinWait。

SpinWait 提供了两个方法和两个只读属性。

方法:

SpinWait.Reset() : 重置自旋计数器,将计数器置 0。效果就好像没调用过SpinOnce一样。 SpinWait.Once() : 执行一次自旋。当SpinWait自旋达到一定次数后,如果有必要当前线程会让出底层的时间片并触发上下文切换。 属性:

SpinWait.Count:方法执行单次自旋的次数。 SpinWait.NextSpinWillYield:一个bool值,表示下一次通过SpinOnce方法自旋是否会让出底层线程的时间片并发生上下文切换。

如果需要等待某个条件满足的时间很短,而且不希望发生上下文切换,基于自旋的【等待】是一种很好的解决方案。

SpinWait : 自旋等待 SpinUntil : 等待某个条件发生 如果发生了长时间的自旋,SpinWait会让出底层的时间片,并触发上下文切换。因为长时间的自旋会阻塞优先级更高的线程。当一个线程自旋时,它会将一个内核放入到一个繁忙的循环中,而且它不会让出处理器时间片的剩余部分。SpinWait的智能逻辑中会在自旋达到足够长的时间时停止自旋并让出处理器。当然可以考虑调用Thread.Sleep()方法,它会让出处理器时间,但开销比较大。

示例程序:这里通过使用SpinWait 来控制3个Task的执行顺序。

using System; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Sample5_6_spinwait { class Program { private static int _TaskNum = 3; private static Task[] _Tasks; private static StringBuilder _StrBlder; private const int RUN_LOOP = 10; private static bool m_IsWork2Start = false; private static bool m_IsWork3Start = false; private static void Work1(int TaskID) { int i = 0; string log = ""; while (i < RUN_LOOP) { log = String.Format("Time: {0} Task : #{1} Value: {2} =====\n", DateTime.Now.TimeOfDay, TaskID, i); i++; try { _StrBlder.Append(log); } finally { m_IsWork2Start = true; } } } private static void Work2(int TaskID) { int i = 0; string log = ""; System.Threading.SpinWait.SpinUntil(() => m_IsWork2Start); while ((i < RUN_LOOP) && (m_IsWork2Start)) { log = String.Format("Time: {0} Task : #{1} Value: {2} *****\n", DateTime.Now.TimeOfDay, TaskID, i); i++; try { _StrBlder.Append(log); } finally { m_IsWork3Start = true; } } } private static void Work3(int TaskID) { int i = 0; string log = ""; System.Threading.SpinWait.SpinUntil(() => m_IsWork3Start); while (i < RUN_LOOP) { log = String.Format("Time: {0} Task : #{1} Value: {2} ~~~~~\n", DateTime.Now.TimeOfDay, TaskID, i); i++; try { _StrBlder.Append(log); } finally { } } } static void Main(string[] args) { _Tasks = new Task[_TaskNum]; _StrBlder = new StringBuilder(); _Tasks[0] = Task.Factory.StartNew((num) => { var taskid = (int)num; Work1(taskid); }, 0); _Tasks[1] = Task.Factory.StartNew((num) => { var taskid = (int)num; Work2(taskid); }, 1); _Tasks[2] = Task.Factory.StartNew((num) => { var taskid = (int)num; Work3(taskid); }, 2); var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) => { Task.WaitAll(_Tasks); Console.WriteLine("=========================================================="); Console.WriteLine("All Phase is completed"); Console.WriteLine("=========================================================="); Console.WriteLine(_StrBlder); }); try { finalTask.Wait(); } catch (AggregateException aex) { Console.WriteLine("Task failed And Canceled" + aex.ToString()); } finally { } Console.ReadLine(); } } }

总结:

自旋锁可用于叶级锁, 在这种情况Monitor下, 通过使用、大小或由于垃圾回收压力而隐含的对象分配的成本非常高。 旋转锁定有助于避免阻塞;但是, 如果你预计会有大量的阻塞, 则可能由于旋转过多而无法使用自旋锁。 当锁的粒度较大且数值较大 (例如, 链接列表中的每个节点都有一个锁) 以及锁保留时间始终极短时, 旋转可能非常有利。 通常, 在持有自旋锁时, 应避免使用以下任何操作:

1.堵塞

2.调用自身可能会阻止的任何内容,

3.同时保留多个自旋锁,

4.进行动态调度的调用 (interface 和虚方法),

5.对任何代码进行静态调度调用, 而不是任何代码, 或

6.分配内存。

SpinLock仅应在确定这样做后使用才能改善应用程序的性能。 出于性能方面的考虑, 还SpinLock必须注意, 是值类型。 出于此原因, 必须注意不要意外复制SpinLock实例, 因为两个实例 (原始和副本) 将完全独立, 这可能会导致应用程序出现错误的行为。 如果必须传递实例, 则它应按引用而不是按值传递。 SpinLock

不要在只读SpinLock字段中存储实例。

最新回复(0)