System.Threading.Channels - منتج عالي الأداء - مستهلك وغير متزامن دون تخصيص وغوص مكدس

مرحبا مجددا. منذ بعض الوقت كتبت عن أداة أخرى غير معروفة للهواة ذوي الأداء العالي - System.IO.Pipelines . في جوهرها ، فإن System.Threading.Channels قيد النظر (المشار إليها فيما يلي باسم "القنوات") مبنية على مبادئ مماثلة لخطوط الأنابيب ، وتحل نفس المشكلة - المنتج - المستهلك. ومع ذلك ، يحتوي في بعض الأحيان على واجهة برمجة تطبيقات أبسط تتناسب بشكل أنيق مع أي نوع من رموز المؤسسة. يستخدم غير متزامن بدون تخصيصات وبدون غوص مكدس حتى في الحالة غير المتزامنة! (ليس دائمًا ، ولكن غالبًا).







جدول المحتويات







المقدمة



يواجه المنتج / المستهلك مهمة المبرمجين في كثير من الأحيان وليس خلال السنوات العشر الأولى. كان لـ Edsger Dijkstra نفسه يد المساعدة في حل هذه المشكلة - فقد توصل إلى فكرة استخدام الإشارات لمزامنة الخيوط عند تنظيم العمل على أساس المنتج / المستهلك. وعلى الرغم من أن حلها في أبسط أشكاله معروف وتافه نوعًا ما ، إلا أن هذا النمط (المنتج / المستهلك) في العالم الحقيقي يمكن أن يحدث بشكل أكثر تعقيدًا. أيضا ، معايير البرمجة الحديثة تترك علاماتها ، الرمز مكتوب بطريقة أكثر بساطة ويتم تقسيمه لمزيد من إعادة الاستخدام. يتم عمل كل شيء لخفض الحد الأدنى لكتابة كود الجودة وتبسيط هذه العملية. ومساحة الاسم المعنية - System.Threading.Channels - هي خطوة أخرى نحو هذا الهدف.



منذ بعض الوقت كنت أنظر إلى أنظمة System.IO.Pipelines. تطلب الأمر عملًا أكثر يقظة وفهمًا أعمق للمسألة ، تم استخدام Span و Memory ، وللعمل الفعال كان مطلوبًا عدم استدعاء طرق واضحة (لتجنب تخصيصات الذاكرة غير الضرورية) والتفكير المستمر بالبايت. وبسبب هذا ، كانت واجهة برمجة خطوط الأنابيب غير تافهة وليست بديهية.



System.Threading.Channels يقدم للمستخدم واجهة برمجة تطبيقات أبسط للعمل معه. من الجدير بالذكر أنه على الرغم من بساطة واجهة برمجة التطبيقات ، إلا أن هذه الأداة محسنة للغاية ولن تقوم على الأرجح بتخصيص ذاكرة أثناء عملها. ربما يرجع ذلك إلى حقيقة أنه تحت الغطاء يتم استخدام ValueTask في كل مكان ، وحتى في حالة عدم التزامن الحقيقي ، يتم استخدام IValueTaskSource، الذي يعاد استخدامه لمزيد من العمليات. هذا هو بالضبط الاهتمام الكامل في تنفيذ القنوات.



يتم تعميم القنوات ، ونوع التعميم ، كما قد تتوقع ، هو النوع الذي سيتم إنتاج مثيلاته واستهلاكها. ومن المثير للاهتمام أن تنفيذ فئة القناة ، والذي يتناسب مع سطر واحد (مصدر جيثب ):



namespace System.Threading.Channels
{
    public abstract class Channel<T> : Channel<T, T> { }
}


وبالتالي ، يتم تحديد معلمات الفئة الرئيسية للقنوات بنوعين - بشكل منفصل لقنوات المنتج والمستهلك. ولكن بالنسبة للقنوات المحققة ، لا يتم استخدام هذا.

بالنسبة لأولئك المطلعين على خطوط الأنابيب ، سيبدو النهج العام للبدء مألوفًا. يسمى. نقوم بإنشاء فئة مركزية واحدة نسحب منها المنتجين ( ChannelWriter ) والمستهلكين ( ChannelReader ) بشكل منفصل . على الرغم من الأسماء ، تجدر الإشارة إلى أن هذا هو المنتج / المستهلك ، وليس القارئ / الكاتب من مهمة كلاسيكية أخرى متعددة الخيوط بنفس الاسم. يغير ChannelReader حالة القناة العامة (يسحب القيمة) ، التي لم تعد متاحة. هذا يعني أنه لا يقرأ ، بل يستهلك. لكننا سنتعرف على التنفيذ لاحقًا.



بداية العمل. قناة



تبدأ الخطوات الأولى مع القنوات بفئة Channel <T> المجردة وفئة Channel الثابتة ، التي تنشئ التنفيذ الأكثر ملاءمة. علاوة على ذلك ، من هذه القناة الشائعة ، يمكنك الحصول على ChannelWriter للكتابة إلى القناة و ChannelReader للاستهلاك من القناة. القناة عبارة عن مستودع للمعلومات العامة لـ ChannelWriter و ChannelReader ، لذلك يتم تخزين جميع البيانات فيها. وبالفعل ، فإن منطق تسجيلها أو استهلاكها مشتت في ChannelWriter و ChannelReader. تقليديا ، يمكن تقسيم القنوات إلى مجموعتين - غير محدودة ومحدودة. أولها أسهل في التنفيذ ، يمكنك الكتابة فيها بلا حدود (طالما سمحت الذاكرة بذلك). أما الثانية فتحدها قيمة قصوى معينة لعدد السجلات.



هذا هو المكان الذي تختلف فيه طبيعة التزامن قليلاً. في القنوات غير المقيدة ، ستكتمل عملية الكتابة دائمًا بشكل متزامن ، ولا يوجد شيء لإيقاف الكتابة إلى القناة. بالنسبة للقنوات المحدودة ، يختلف الوضع. مع السلوك القياسي (الذي يمكن استبداله) ، ستنتهي عملية الكتابة بشكل متزامن طالما هناك مساحة لمثيلات جديدة في القناة. بمجرد امتلاء القناة ، لن تنتهي عملية الكتابة حتى تصبح المساحة خالية (بعد استهلاك المستهلك للمستهلك). لذلك ، ستكون العملية هنا غير متزامنة حقًا مع تغيير الخيوط والتغييرات المرتبطة بها (أو بدون تغيير ، والتي سيتم وصفها بعد ذلك بقليل).



بالنسبة للجزء الأكبر ، فإن سلوك القراء هو نفسه - إذا كان هناك شيء في القناة ، فإن القارئ ببساطة يقرأه وينتهي في المزامنة. إذا لم يكن هناك شيء ، فإنه ينتظر أن يكتب شخص ما شيئًا.



يحتوي فئة القناة الثابتة على 4 طرق لإنشاء القنوات المذكورة أعلاه:



Channel<T> CreateUnbounded<T>();
Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
Channel<T> CreateBounded<T>(int capacity);
Channel<T> CreateBounded<T>(BoundedChannelOptions options);


إذا كنت ترغب في ذلك ، يمكنك تحديد خيارات أكثر دقة لإنشاء قناة ستساعد في تحسينها لتلبية الاحتياجات المحددة. يحتوي



UnboundedChannelOptions على 3 خصائص قيمتها الافتراضية هي false:



  1. AllowSynchronousContinuations — , , . -. , . , , , . , , , . , - - , ;
  2. SingleReader — , . , ;
  3. SingleWriter — , ;


يحتوي BoundedChannelOptions على نفس الخصائص 3 و 2 أخرى في الأعلى



  1. AllowSynchronousContinuations - نفس الشيء ؛
  2. SingleReader هو نفسه ؛
  3. SingleWriter هو نفسه ؛
  4. السعة - عدد السجلات الموضوعة في القناة. هذه المعلمة هي أيضًا معلمة مُنشئ ؛
  5. FullMode - تعداد BoundedChannelFullMode ، الذي يحتوي على 4 خيارات ، يحدد السلوك عند محاولة الكتابة إلى قناة ممتلئة:

    • انتظر - ينتظر مساحة حرة لإكمال العملية غير المتزامنة
    • DropNewest - العنصر الذي تتم كتابته يستبدل أحدث عنصر موجود ، ينتهي بشكل متزامن
    • DropOldest - عنصر قابل للتسجيل يستبدل أقدم النهايات الموجودة بشكل متزامن
    • DropWrite - العنصر المكتوب غير مكتوب ، ينتهي بشكل متزامن




اعتمادا على المعلمات التي تم تمريرها ودعا الأسلوب، واحدة من 3 تطبيقات سيتم إنشاء: SingleConsumerUnboundedChannel ، UnboundedChannel ، BoundedChannel . لكن هذا ليس مهمًا جدًا ، لأننا سنستخدم القناة من خلال الفئة الأساسية Channel <TWrite، TRead>.



لديها 2 خصائص:



  • ChannelReader <TRead> Reader {get؛ مجموعة محمية }}
  • ChannelWriter <TWrite> كاتب {get؛ مجموعة محمية }}


وأيضًا ، 2 عامل صب ضمني إلى ChannelReader <TRead> و ChannelWriter <TWrite>.



مثال على البدء في القنوات:



Channel<int> channel = Channel.CreateUnbounded<int>();
//  
ChannelWriter<int> writer = channel.Writer;
ChannelReader<int> reader = channel.Reader; 
// 
ChannelWriter<int> writer = channel;
ChannelReader<int> reader = channel;


يتم تخزين البيانات في قائمة الانتظار. بالنسبة إلى 3 أنواع ، يتم استخدام 3 قوائم انتظار مختلفة - ConcurrentQueue <T> و Deque <T> و SingleProducerSingleConsumerQueue <T>. في هذه المرحلة ، بدا لي أنني عفا عليها الزمن وفاتتني مجموعة من أبسط المجموعات الجديدة. لكنني سارع إلى الانزعاج - فهي ليست للجميع. تم وضع علامة داخلية ، لذا لن يعمل استخدامها. ولكن إذا كنت في حاجة إليها فجأة للبيع، يمكنك العثور عليها هنا (SingleProducerConsumerQueue) و هنا (صف مزدوج الذيل) . تنفيذ هذا الأخير بسيط للغاية. أنصحك بالتعرف ، يمكن تعلمه بسرعة كبيرة.



لذا ، دعنا نبدأ الدراسة مباشرة ChannelReader و ChannelWriter ، بالإضافة إلى تفاصيل التنفيذ المثيرة للاهتمام. كلهم يتلخصون في عدم التزامن دون تخصيص الذاكرة باستخدام IValueTaskSource.



ChannelReader - المستهلك



عند طلب كائن عميل ، يتم إرجاع أحد تطبيقات الفئة ChannelReader المجردة <T>. مرة أخرى ، على عكس خطوط الأنابيب ، فإن واجهات برمجة التطبيقات بسيطة وهناك طرق قليلة. ما عليك سوى معرفة قائمة الطرق لفهم كيفية استخدام هذا عمليًا.



طرق:



  1. {get؛ Virtual property get-only إكمال المهام {get؛ }

    كائن من نوع Task ينتهي عند إغلاق القناة ؛
  2. الملكية الافتراضية فقط للحصول على عدد {get؛ }

    هنا يجب التأكيد على أن العدد الحالي للكائنات القابلة للقراءة تم إرجاعه ؛
  3. CanCount {get؛ خاصية الملكية الافتراضية فقط }

    يشير إلى ما إذا كانت خاصية Count متوفرة ؛
  4. bool TryRead(out T item)

    . bool, , . out ( null, );
  5. ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)

    ValueTask true, , . ValueTask false, ( );
  6. ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)

    . , . .



    , TryRead WaitToReadAsync. ( cancelation tokens), — TryRead. , while(true) WaitToReadAsync. true, , TryRead. TryRead , , . — , WaitToReadAsync, , , .

    , , - .




ChannelWriter - منتج



كل شيء مشابه للمستهلك ، لذلك دعونا نلقي نظرة على الأساليب على الفور:



  1. منطقي الطريقة الافتراضية TryComplete (استثناء؟ خطأ = فارغ)

    محاولات وضع علامة للقناة على أنها كاملة ، أي تبين أنه لن تتم كتابة المزيد من البيانات إليها. كمعلمة اختيارية ، يمكنك طرح استثناء تسبب في إنهاء القناة. إرجاع صحيح إذا كان قادرًا على الاكتمال ، وإلا كان خاطئًا (إذا كانت القناة قد اكتملت بالفعل أو لا تدعم الإكمال) ؛
  2. bool أسلوب مجردة TryWrite (عنصر T)

    محاولات لكتابة قيمة إلى القناة. إرجاع صحيح إذا كان ناجحًا وخطأًا إذا لم يكن كذلك
  3. طريقة مجردة ValueTask <bool> WaitToWriteAsync (CancellationToken cancellationToken = افتراضي)

    إرجاع ValueTask بقيمة حقيقية ستكتمل عند وجود مساحة للكتابة على القناة. ستكون القيمة خاطئة إذا لم تعد الكتابة إلى القناة مسموحًا بها ؛
  4. الطريقة الافتراضية ValueTask WriteAsync (عنصر T ، CancellationToken cancellationToken = افتراضي)

    يكتب بشكل غير متزامن مع القناة. على سبيل المثال ، إذا كانت القناة ممتلئة ، فستكون العملية غير متزامنة حقًا وستكتمل فقط بعد تحرير مساحة لهذا السجل ؛
  5. أسلوب الفراغ مكتمل (استثناء؟ خطأ = فارغ)

    فقط يحاول وضع علامة على القناة على أنها مكتملة باستخدام TryComplete ، وفي حالة الفشل يطرح استثناء.


مثال صغير على ما سبق (لبدء تجاربك الخاصة بسهولة):



Channel<int> unboundedChannel = Channel.CreateUnbounded<int>();

//      ,        
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;

//     
int objectToWriteInChannel = 555;
await writer.WriteAsync(objectToWriteInChannel);
//  ,     ,   ,  
writer.Complete();

//         
int valueFromChannel = await reader.ReadAsync();


الآن دعنا ننتقل إلى الجزء الأكثر إثارة للاهتمام.



التزامن بدون تخصيص



في عملية كتابة ودراسة الكود ، أدركت أنه لا يوجد شيء مثير للاهتمام تقريبًا في تنفيذ كل هذه العمليات. بشكل عام ، يمكنك وصفه بهذه الطريقة - تجنب الأقفال غير الضرورية باستخدام مجموعات تنافسية والاستخدام الوفير لـ ValueTask ، وهي بنية توفر الذاكرة. ومع ذلك ، فإنني أسارع إلى تذكيرك بأنه لا يستحق استبدال سريع للخوض في جميع الملفات الموجودة على جهاز الكمبيوتر الخاص بك واستبدال كل المهام بـ ValueTask. من المنطقي فقط في الحالات التي تكتمل فيها العملية بشكل متزامن في معظم الحالات. بعد كل شيء ، كما نتذكر ، مع عدم التزامن ، من الممكن تغيير الخيط ، مما يعني أن المكدس لن يكون كما كان من قبل. على أي حال ، يعرف المحترف الحقيقي في مجال الإنتاجية - لا تقم بالتحسين قبل ظهور المشاكل.



شيء واحد جيد هو أنني لن أسجل نفسي كمحترف ، وبالتالي فقد حان الوقت لمعرفة سر كتابة التعليمات البرمجية غير المتزامنة بدون تخصيصات للذاكرة ، والتي تبدو للوهلة الأولى جيدة جدًا بحيث لا يمكن تصديقها. لكن هذا يحدث.



واجهة IValueTaskSource



لنبدأ رحلتنا من البداية - بنية ValueTask ، التي تمت إضافتها في .net core 2.0 وتعديلها في 2.1. داخل هذا الهيكل ، يوجد حقل _obj كائن صعب. من السهل التخمين ، بناءً على الاسم المفسر ذاتيًا ، أن أحد الأشياء الثلاثة التي يمكن إخفاؤها في هذا المجال - خالية أو مهمة / مهمة <T> أو IValueTaskSource. في الواقع ، ينبع هذا من الطرق التي يتم بها إنشاء ValueTask.



كما تؤكد الشركة المصنعة ، يجب استخدام هذا الهيكل بشكل واضح فقط - مع الكلمة الرئيسية المنتظرة. أي أنه لا يجب عليك التقدم بالانتظار عدة مرات على نفس قيمة ValueTask ، واستخدام المجمعات ، وإضافة العديد من الاستمرارات ، وما إلى ذلك. أيضًا ، لا يجب أن تحصل على النتيجة من ValueTask أكثر من مرة. وهذا مرتبط بدقة بما نحاول فهمه - بإعادة استخدام كل هذه الأشياء دون تخصيص ذاكرة.



لقد ذكرت بالفعل واجهة IValueTaskSource . هو الذي يساعد على حفظ الذاكرة. يتم ذلك عن طريق إعادة استخدام IValueTaskSource نفسها عدة مرات للعديد من المهام. ولكن على وجه التحديد بسبب إعادة الاستخدام هذه ، لا توجد طريقة للعمل مع ValueTask.



حتى IValueTaskSource. تحتوي هذه الواجهة على 3 طرق ، من خلال تنفيذها ستوفر الذاكرة والوقت بنجاح لتخصيص تلك البايتات الثمينة.



  1. GetResult - يتم استدعاؤه مرة واحدة ، عندما تكون في آلة الحالة ، والتي تم تشكيلها في وقت التشغيل للطرق غير المتزامنة ، تكون النتيجة مطلوبة. يحتوي ValueTask على طريقة GetResult ، والتي تستدعي طريقة الواجهة التي تحمل الاسم نفسه ، والتي ، كما نتذكر ، يمكن تخزينها في حقل _obj.
  2. GetStatus - يتم استدعاؤه بواسطة جهاز الحالة لتحديد حالة العملية. أيضا عبر ValueTask.
  3. عند الانتهاء - مرة أخرى ، تستدعي آلة الحالة لإضافة متابعة للمهمة التي لم تكتمل في ذلك الوقت.


ولكن على الرغم من الواجهة البسيطة ، سيتطلب التنفيذ بعض المهارة. وهنا يمكننا أن نتذكر ما بدأنا به - القنوات . يستخدم هذا التطبيق فئة AsyncOperationوهو تنفيذ IValueTaskSource. هذه الفئة مخفية خلف معدّل الوصول الداخلي. لكن هذا لا يتوقف لفهم الآليات الأساسية. السؤال هو ، لماذا لا نعطي تطبيق IValueTaskSource للجماهير؟ السبب الأول (من أجل المتعة) هو عندما يكون هناك مطرقة في اليدين ، المسامير في كل مكان ، عندما يكون تنفيذ IValueTaskSource في متناول اليد ، هناك عمل أمي مع الذاكرة في كل مكان. السبب الثاني (الأكثر معقولية) هو أنه في حين أن الواجهة بسيطة ومتنوعة ، فإن التنفيذ الحقيقي هو الأمثل عند استخدام الفروق الدقيقة في التطبيق. وربما لهذا السبب ، من الممكن العثور على تطبيقات في أجزاء مختلفة من .net الرائع والقوي ، مثل AsyncOperation تحت غطاء القنوات ، AsyncIOOperation داخل واجهة المقبس الجديدة ، إلخ.

ومع ذلك ، في الإنصاف ، لا يزال هناك تنفيذ واحد مشترك -ManualResetValueTaskSourceCore . لكن هذا بعيد جدًا عن موضوع المقالة.



قارن التبادل



طريقة شائعة تمامًا لفئة شائعة تتجنب النفقات العامة بدائية المزامنة الكلاسيكية. أعتقد أن معظمهم على دراية بها ، ولكن لا يزال الأمر يستحق الوصف في 3 كلمات ، لأن هذا البناء يستخدم كثيرًا في AsyncOperation.

في الأدبيات السائدة ، تسمى هذه الوظيفة مقارنة ومبادلة (CAS). في .net ، يتوفر في فئة Interlocked .



التوقيع كالتالي:



public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;


هناك أيضًا حمولة زائدة مع int ، طويل ، عائم ، مزدوج ، IntPtr ، كائن.



الأسلوب نفسه ذري ، أي أنه يتم تنفيذه دون انقطاع. مقارنة القيمتين ، وإذا كانتا متساويتين ، يتم تعيين القيمة الجديدة للمتغير. يحلون المشكلة عندما تحتاج إلى التحقق من قيمة المتغير وتغيير المتغير اعتمادًا عليه.



لنفترض أنك تريد زيادة متغير إذا كانت قيمته أقل من 10.



ثم هناك سلسلتان.



تيار 1 تيار 2
للتحقق من قيمة متغير لشرط (أي إذا كان أقل من 10) ، والذي يعمل -
بين التحقق من وتغيير قيمة يعين قيمة متغيرة لا تفي بشرط (على سبيل المثال ، 15)
يغير القيمة ، على الرغم من أنه لا يجب ذلك ، لأن الشرط لم يعد مستوفى -




عند استخدام هذه الطريقة ، يمكنك إما تغيير القيمة التي تريدها بالضبط ، أو عدم تغييرها ، أثناء الحصول على القيمة الفعلية للمتغير.



location1 هو المتغير الذي نريد تغيير قيمته. تتم مقارنتها بالمقارنة ، في حالة المساواة ، يتم كتابة القيمة في الموقع 1. إذا نجحت العملية ، فستُرجع الطريقة القيمة السابقة للمتغير location1. إذا لم يكن الأمر كذلك ، سيتم إرجاع القيمة الفعلية للموقع 1.

بشكل أعمق ، هناك تعليمات بلغة التجميع ، cmpxchg ، تقوم بذلك. هي التي تستخدم تحت غطاء محرك السيارة.



غوص المكدس



أثناء النظر إلى كل هذا الرمز ، صادفت إشارات إلى "Stack Dive" أكثر من مرة. هذا شيء رائع ومثير للاهتمام وهو في الواقع غير مرغوب فيه للغاية. خلاصة القول هي أنه مع التنفيذ المتزامن للاستمرارات ، يمكننا نفاد موارد المكدس.



لنفترض أن لدينا 10000 مهمة في الأسلوب



//code1
await ...
//code2


لنفترض أن المهمة الأولى تكمل التنفيذ وبالتالي تحرر استمرار المهمة الثانية ، التي نبدأ على الفور في تنفيذها بشكل متزامن في هذا الخيط ، أي أخذ قطعة من المكدس مع إطار هذا الاستمرارية. وبدورها ، فإن هذا الاستمرار سيؤدي إلى إلغاء استمرار المهمة الثالثة ، التي نبدأ أيضًا في تنفيذها على الفور. إلخ. إذا لم يكن هناك المزيد من الانتظار في التكملة أو شيء ما يسقط المكدس بطريقة أو بأخرى ، فسوف نستهلك مساحة المكدس على طول الطريق. ما يمكن أن يسبب StackOverflow وتعطل التطبيق. في مراجعة التعليمات البرمجية الخاصة بي ، سأذكر كيف يحارب AsyncOperation هذا.



AsyncOperation كتطبيق IValueTaskSource



كود المصدر .



داخل AsyncOperation ، يوجد حقل _continuation type من نوع Action <كائن>. يتم استخدام الحقل للاستمرار أو صدق أو لا تصدق. ولكن ، كما هو الحال غالبًا في التعليمات البرمجية الحديثة جدًا ، تتحمل الحقول مسؤوليات إضافية (مثل جامع البيانات المهملة والجزء الأخير في مرجع جدول الطريقة). حقل _continuation من نفس السلسلة. هناك قيمتان خاصتان يمكن تخزينهما في هذا المجال ، إلى جانب الاستمرارية نفسها وقيمة فارغة. s_availableSentinel و s_completedSentinel . تشير هذه الحقول إلى أن العملية متاحة ومكتملة ، على التوالي. وهي متاحة لإعادة الاستخدام فقط لعملية غير متزامنة تمامًا.



كما تنفذ AsyncOperation IThreadPoolWorkItemباستخدام طريقة واحدة - Void Execute () => SetCompletionAndInvokeContinuation (). تقوم طريقة SetCompletionAndInvokeContinuation بالاستمرار. وتسمى هذه الطريقة إما مباشرة في كود AsyncOperation ، أو من خلال التنفيذ المذكور. بعد كل شيء ، يمكن طرح أنواع تنفيذ IThreadPoolWorkItem في تجمع مؤشر الترابط مثل ThreadPool.UnsafeQueueUserWorkItem (هذا ، المفضل المحلية: خطأ).



سيتم تنفيذ الأسلوب Execute بواسطة تجمع مؤشرات الترابط.



إن التنفيذ نفسه للاستمرار تافه للغاية.



يتم نسخ استمرار _continuation إلى متغير محلي ، ويتم كتابة s_completedSentinel في مكانه- كائن دمية اصطناعي (أو خفير ، لا أعرف كيف أتحدث إلي في خطابنا) ، مما يشير إلى اكتمال المهمة. ثم يتم ببساطة تنفيذ نسخة محلية من الاستمرار الحقيقي. باستخدام ExecutionContext ، يتم نشر هذه الإجراءات في السياق. لا يوجد سر هنا. يمكن استدعاء هذا الرمز إما مباشرة من قبل الفئة - ببساطة عن طريق استدعاء الطريقة التي تتضمن هذه الإجراءات ، أو من خلال واجهة IThreadPoolWorkItem في تجمع مؤشرات الترابط. يمكنك الآن تخمين كيفية عمل الوظيفة مع تنفيذ عمليات الاستمرارية بشكل متزامن.



الطريقة الأولى لواجهة IValueTaskSource هي GetResult ( github ).



الأمر بسيط:



  1. _currentId.

    _currentId — , . . ;
  2. _continuation - s_availableSentinel. , , AsyncOperation . , (pooled = true);
  3. _result.

    _result TrySetResult .


طريقة TrySetResult ( جيثب ).



الطريقة تافهة. - يقوم بتخزين المعلمة المقبولة في _result وإكمال الإشارات ، أي أنه يطلق على أسلوب SignalCompleteion ، وهو أمر مثير للاهتمام.



طريقة SignalCompletion ( جيثب ).



تستخدم هذه الطريقة كل ما تحدثنا عنه في البداية.



في البداية ، إذا _continuation == null ، نكتب دمية s_completedSentinel.



علاوة على ذلك ، يمكن تقسيم الطريقة إلى 4 كتل. يجب أن أقول على الفور لبساطة فهم المخطط ، الكتلة الرابعة هي مجرد تنفيذ متزامن للاستمرار. أي ، التنفيذ البسيط للاستمرار من خلال الطريقة ، كما وصفت في الفقرة حول IThreadPoolWorkItem.



  1. _schedulingContext == null, .. ( if).

    _runContinuationsAsynchronously == true, , — ( if).

    IThreadPoolWorkItem . AsyncOperation . .

    , if ( , ), , 2 3 , — .. 4 ;
  2. _schedulingContext is SynchronizationContext, ( if).

    _runContinuationsAsynchronously = true. . , , . , . 2 , :

    sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
    


    . , , ( , ), 4 — ;
  3. , 2 . .

    , _schedulingContext TaskScheduler, . , 2, .. _runContinuationsAsynchronously = true TaskScheduler . , Task.Factory.StartNew . .
  4. — . , .


الطريقة الثانية لواجهة IValueTaskSource هي GetStatus ( github )

تمامًا مثل حمار سانت بطرسبرغ.



إذا _continuation! = _CompletedSentinel، ثم العودة ValueTaskSourceStatus.Pending

إذا خطأ == لاغية، ثم العودة ValueTaskSourceStatus.Succeeded

إذا _error.SourceException هو OperationCanceledException، ثم العودة ValueTaskSourceStatus.Canceled

حسنا، منذ تولي كثيرا هنا، والعودة ValueTaskSourceStatus.Faulted



الثالث والأخير ، ولكن الطريقة الأكثر تعقيدًا لواجهة IValueTaskSource هي OnCompleted ( github ).



تضيف الطريقة متابعة يتم تنفيذها عند الانتهاء.



يلتقط ExecutionContext و SynchronizationContext حسب الحاجة.



بعد ذلك ، يتم استخدام Interlocked.CompareExchange ، الموضح أعلاه ، لحفظ الاستمرارية في الحقل ، ومقارنتها بالصفر. كتذكير ، تقوم CompareExchange بإرجاع القيمة الفعلية للمتغير.



إذا تم حفظ الاستمرار ، فسيتم إرجاع القيمة التي كانت في المتغير قبل التحديث ، أي ، فارغة. وهذا يعني أن العملية لم تكتمل وقت تسجيل الاستمرارية. والشخص الذي أكمله بنفسه سيكتشفها (كما رأينا أعلاه). وليس من المنطقي بالنسبة لنا القيام بأي إجراءات إضافية. وهنا ينتهي عمل الطريقة.



إذا لم ينجح حفظ القيمة ، أي أنه تم إرجاع شيء آخر غير null من CompareExchange. في هذه الحالة ، تمكن شخص ما من وضع القيمة بشكل أسرع منا. أي أنه حدث موقف من حالتين - إما أنجزت المهمة بشكل أسرع مما وصلنا إليه هنا ، أو كانت هناك محاولة لكتابة أكثر من متابعة واحدة ، وهو أمر لا يمكن القيام به.



وبالتالي ، نتحقق من القيمة المرتجعة ، سواء كانت تساوي s_completedSentinel - سيتم كتابتها إذا تم إكمالها.



  • إذا لم يكن هذا s_completedSentinel ، فإننا لم نستخدم وفقًا للخطة - حاولوا إضافة أكثر من متابعة واحدة. هذا هو ، الذي تم تدوينه بالفعل ، والآخر الذي نكتبه. وهذا وضع استثنائي.
  • s_completedSentinel, , , . , _runContinuationsAsynchronously = false.

    , , OnCompleted, awaiter'. . , AsyncOperation — System.Threading.Channels. , . , . , , ( ) . , awaiter' , , . awaiter'.

    لتجنب هذا الموقف ، يجب تشغيل الاستمرارية بشكل غير متزامن مهما كان. يتم تنفيذه وفقًا لنفس المخططات مثل أول 3 كتل في طريقة SignalCompleteion - فقط في تجمع أو في سياق أو من خلال مصنع وجدول


وإليك مثال للتواصل المتزامن:



class Program
    {
        static async Task Main(string[] args)
        {
            Channel<int> unboundedChannel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
            {
                AllowSynchronousContinuations = true
            });

            ChannelWriter<int> writer = unboundedChannel;
            ChannelReader<int> reader = unboundedChannel;

            Console.WriteLine($"Main, before await. Thread id: {Thread.CurrentThread.ManagedThreadId}");

            var writerTask = Task.Run(async () =>
            {
                Thread.Sleep(500);
                int objectToWriteInChannel = 555;
                Console.WriteLine($"Created thread for writing with delay, before await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
                await writer.WriteAsync(objectToWriteInChannel);
                Console.WriteLine($"Created thread for writing with delay, after await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
            });

            //Blocked here because there are no items in channel
            int valueFromChannel = await reader.ReadAsync();
            Console.WriteLine($"Main, after await (will be processed by created thread for writing). Thread id: {Thread.CurrentThread.ManagedThreadId}");

            await writerTask;

            Console.Read();
        }
    }


الإخراج:



الرئيسي ، قبل الانتظار. معرف

مؤشر الترابط : 1 تم إنشاء مؤشر ترابط للكتابة بتأخير ، قبل انتظار الكتابة. معرف

مؤشر الترابط : 4 الرئيسي ، بعد الانتظار (سيتم معالجته بواسطة مؤشر الترابط الذي تم إنشاؤه للكتابة). معرف الخيط: 4

تم إنشاء خيط للكتابة مع التأخير ، بعد انتظار الكتابة. معرف الخيط: 4



All Articles