- الضغط الخلفي رائع
- لا يتوفر الضغط الخلفي إلا في المكتبات التي تطبق مواصفات التدفقات التفاعلية
- هذه المواصفات معقدة للغاية لدرجة أنك لا يجب أن تحاول تنفيذها بنفسك
سأحاول في هذا المقال إظهار ما يلي:
- الضغط الخلفي بسيط للغاية
- لتنفيذ ضغط عكسي غير متزامن ، يكفي عمل نسخة غير متزامنة من السيمافور
- إذا كان هناك تنفيذ إشارة غير متزامن ، يتم تنفيذ واجهة org.reactivestreams.Publisher في بضع عشرات من أسطر التعليمات البرمجية
الضغط المرتد هو رد فعل يضبط سرعة منتج البيانات لتتناسب مع سرعة المستهلك. في حالة عدم وجود مثل هذا الاتصال ، يمكن للمنتِج الأسرع تجاوز المخزن المؤقت للمستهلك أو ، إذا كان المخزن المؤقت بلا أبعاد ، يستنفد ذاكرة الوصول العشوائي بالكامل.
في البرمجة متعددة مؤشرات الترابط ، تم حل هذه المشكلة بواسطة Dijkstroy ، الذي اقترح آلية مزامنة جديدة - semaphore. يمكن اعتبار الإشارة بمثابة عداد إذن. من المفترض أن يطلب المنتج الإذن من السيمافور قبل القيام بعمل كثيف الموارد. إذا كانت الإشارة فارغة ، فسيتم حظر مؤشر ترابط المنتج.
لا يمكن للبرامج غير المتزامنة حظر مؤشرات الترابط ، لذلك لا يمكنها الوصول إلى إشارة فارغة للحصول على إذن (ولكن يمكنها القيام بجميع عمليات الإشارات الأخرى). يجب عليهم منع إعدامهم بطريقة أخرى. وبهذه الطريقة الأخرى ، فإنهم ببساطة يتركون الخيط العامل الذي كانوا يعملون عليه ، ولكن قبل ذلك يرتبون للعودة إلى العمل بمجرد امتلاء الإشارة.
الطريقة الأكثر أناقة لإيقاف برنامج غير متزامن واستئنافه هي هيكلته كعامل تدفق بيانات مع منافذ :
نموذج تدفق البيانات - ممثلون مع منافذ ، اتصالات موجهة بين منافذهم ، ورموز أولية. مأخوذة من: وصف منظم لممثلي Dataflow وتطبيقه
هناك منافذ الإدخال والإخراج. تستقبل منافذ الإدخال الرموز (الرسائل والإشارات) من منافذ الإخراج للجهات الفاعلة الأخرى. إذا كان منفذ الإدخال يحتوي على رموز مميزة ، وكان منفذ الإخراج مكانًا لوضع الرموز المميزة ، فإنه يعتبر نشطًا. إذا كانت جميع منافذ الممثل نشطة ، يتم إرسالها للتنفيذ. وبالتالي ، عند استئناف عمله ، يمكن لبرنامج الممثل قراءة الرموز بأمان من منافذ الإدخال والكتابة في عطلة نهاية الأسبوع. تكمن كل حكمة البرمجة غير المتزامنة في هذه الآلية البسيطة. إن تخصيص المنافذ ككائنات فرعية منفصلة للجهات الفاعلة يبسط إلى حد كبير تشفير البرامج غير المتزامنة ويسمح بزيادة تنوعها من خلال الجمع بين المنافذ من أنواع مختلفة.
يحتوي ممثل Hewitt الكلاسيكي على منفذين - أحدهما مرئي ، مع مخزن مؤقت للرسائل الواردة ، والآخر عبارة عن ثنائي مخفي يحظر عند إرسال الممثل للتنفيذ وبالتالي يمنع الممثل من إعادة التشغيل حتى نهاية الإطلاق الأولي. الإشارة غير المتزامنة المطلوبة هي تقاطع بين هذين المنفذين. مثل المخزن المؤقت للرسائل ، يمكنه تخزين العديد من الرموز المميزة ، ومثل المنفذ المخفي ، تكون هذه الرموز سوداء اللون ، أي لا يمكن تمييزها ، كما هو الحال في شبكات Petri ، وعداد الرموز يكفي لتخزينها.
في المستوى الأول من التسلسل الهرمي، لدينا فئة
AbstractActor
مع ثلاث فئات متداخلة - قاعدة Port
ومشتقاتها AsyncSemaPort
و InPort
، وكذلك مع وجود آلية لإطلاق فاعل للتنفيذ في حالة عدم وجود منافذ المحظورة. باختصار ، يبدو كالتالي:
public abstract class AbstractActor {
/** */
private int blocked = 0;
protected synchronized void restart() {
controlPort.unBlock();
}
private synchronized void incBlockCount() {
blocked++;
}
private synchronized void decBlockCount() {
blocked--;
if (blocked == 0) {
controlPort.block();
excecutor.execute(this::run);
}
}
protected abstract void turn() throws Throwable;
/** */
private void run() {
try {
turn();
restart();
} catch (Throwable throwable) {
whenError(throwable);
}
}
}
يحتوي على الحد الأدنى من مجموعة فئات المنافذ:
Port
- الفئة الأساسية لجميع المنافذ
protected class Port {
private boolean isBlocked = true;
public Port() {
incBlockCount();
}
protected synchronized void block() {
if (isBlocked) {
return;
}
isBlocked = true;
incBlockCount();
}
protected synchronized void unBlock() {
if (!isBlocked) {
return;
}
isBlocked = false;
decBlockCount();
}
}
إشارة غير متزامنة:
public class AsyncSemaPort extends Port {
private long permissions = 0;
public synchronized void release(long n) {
permissions += n;
if (permissions > 0) {
unBlock();
}
}
public synchronized void aquire(long delta) {
permissions -= delta;
if (permissions <= 0) {
//
// ,
//
block();
}
}
}
InPort
- الحد الأدنى من المخزن المؤقت لرسالة واردة واحدة:
public class InPort<T> extends Port implements OutMessagePort<T> {
private T item;
@Override
public void onNext(T item) {
this.item = item;
unBlock();
}
public synchronized T poll() {
T res = item;
item = null;
return res;
}
}
النسخة الكاملة من الطبقة
AbstractActor
يمكن الاطلاع هنا.
في المستوى التالي من التسلسل الهرمي ، لدينا ثلاث جهات فاعلة مجردة بمنافذ محددة ، ولكن مع إجراءات معالجة غير محددة:
- الفئة
AbstractProducer
هي ممثل بمنفذ واحد من نوع الإشارة غير المتزامن (ومنفذ تحكم داخلي ، موجود افتراضيًا في جميع الجهات الفاعلة). - الفصل
AbstractTransformer
عبارة عن ممثل هيويت منتظم ، مع إشارة إلى منفذ الإدخال للممثل التالي في السلسلة ، حيث يرسل الرموز المميزة المحولة. - يعتبر الفصل
AbstractConsumer
أيضًا ممثلًا عاديًا ، لكنه لا يرسل الرموز المميزة المحولة إلى أي مكان ، بينما له ارتباط بإشارة المنتج ، ويفتح هذه الإشارة بعد استيعاب رمز الإدخال. هذا يحافظ على عدد الرموز في العملية ثابتًا ولا يحدث تجاوز في المخزن المؤقت.
في المستوى الأخير ، في دليل الاختبار بالفعل ، يتم تحديد الجهات الفاعلة المحددة المستخدمة في الاختبارات :
- الفصل
ProducerActor
يولد دفقًا محدودًا من الأعداد الصحيحة. TransformerActor
يأخذ الفصل الرقم التالي من الدفق ويرسله إلى أسفل السلسلة.- فئة
ConsumerActor
- يقبل ويطبع الأرقام الناتجة
يمكننا الآن بناء سلسلة من معالجات المعالجة المتوازية غير المتزامنة على النحو التالي: المنتج - أي عدد من المحولات - المستهلك
وهكذا ، قمنا بتنفيذ ضغط عكسي ، وحتى بشكل أكثر عمومية مما هو عليه في مواصفات التدفقات التفاعلية - يمكن أن تمتد التغذية الراجعة على عدد تعسفي من سلاسل المعالجة ، وليس فقط المتجاورة ، كما في المواصفات.
لتنفيذ المواصفات ، تحتاج إلى تحديد منفذ إخراج حساس لعدد الأذونات التي تم تمريرها إليه باستخدام طريقة request () - سيكون هذا
Publisher
، ويكمل المنفذ الحالي InPort
باستدعاء هذه الطريقة - سيكون هذا Subscriber
. وهذا يعني أننا نفترض أن واجهات Publisher
وSubscriber
وصف سلوك الموانئ وليس الفاعلين. لكن بالحكم على حقيقة أنه يوجد في قائمة الواجهات أيضًا Processor
، والتي لا يمكن بأي حال من الأحوال أن تكون واجهة منفذ ، يعتبر مؤلفو المواصفات أن واجهاتهم هي واجهات للممثل. حسنًا ، يمكننا أن نجعل الجهات الفاعلة التي تنفذ كل هذه الواجهات من خلال تفويض تنفيذ وظائف الواجهة إلى المنافذ المقابلة.
من أجل التبسيط ، دعنا
Publisher
لا يمتلك المخزن المؤقت الخاص به وسوف يكتب مباشرة إلى المخزن المؤقت Subscriber
. للقيام بذلك ، تحتاج إلى شخص ما Subscriber
للاشتراك والوفاء request()
، أي أن لدينا شرطين ، وبناءً عليه ، نحتاج إلى منفذين - InPort<Subscriber>
و AsyncSemaPort
. أيا منها مناسب كقاعدة للتنفيذPublisher
أ ، نظرًا لاحتوائه على طرق غير ضرورية ، سنجعل هذه المنافذ متغيرات داخلية:
public class ReactiveOutPort<T> implements Publisher<T>, Subscription, OutMessagePort<T> {
protected AbstractActor.InPort<Subscriber<? super T>> subscriber;
protected AbstractActor.AsyncSemaPort sema;
public ReactiveOutPort(AbstractActor actor) {
subscriber = actor.new InPort<>();
sema = actor.new AsyncSemaPort();
}
}
هذه المرة ،
ReactiveOutPort
لم نقم بتعريف الفئة على أنها متداخلة ، لذا فقد احتاجت إلى معلمة منشئ ، إشارة إلى الفاعل المرفق ، لإنشاء مثيل للمنافذ المحددة على أنها فئات متداخلة. تتلخص
الطريقة
subscribe(Subscriber subscriber)
في حفظ المشترك والاتصال subscriber.onSubscribe()
:
public synchronized void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) {
throw new NullPointerException();
}
if (this.subscriber.isFull()) {
subscriber.onError(new IllegalStateException());
return;
}
this.subscriber.onNext(subscriber);
subscriber.onSubscribe(this);
}
والتي عادة ما ينتج عنها مكالمة
Publisher.request()
تتلخص في رفع السيمافور بمكالمة AsyncSemaPort.release()
:
public synchronized void request(long n) {
if (subscriber.isEmpty()) {
return; // this spec requirement
}
if (n <= 0) {
subscriber.current().onError(new IllegalArgumentException());
return;
}
sema.release(n);
}
والآن يتبقى لنا ألا ننسى خفض الإشارة باستخدام مكالمة
AsyncSemaPort.aquire()
في وقت استخدام المورد:
public synchronized void onNext(T item) {
Subscriber<? super T> subscriber = this.subscriber.current();
if (subscriber == null) {
throw new IllegalStateException();
}
sema.aquire();
subscriber.onNext(item);
}
تم تصميم مشروع AsyncSemaphore خصيصًا لهذه المقالة. تم صنعه عن قصد بأكبر حجم ممكن حتى لا يتعب القارئ. نتيجة لذلك ، يحتوي على قيود كبيرة:
-
Publisher
'Subscriber
' -
Subscriber
' 1
بالإضافة إلى ذلك ،
AsyncSemaPort
فهو ليس تناظريًا كاملاً لإشارة متزامنة - يمكن لعميل واحد فقط تنفيذ العملية aquire()
y AsyncSemaPort
(أي الممثل المرفق). لكن هذا ليس عيبًا - AsyncSemaPort
فهو يؤدي دوره جيدًا. من حيث المبدأ ، يمكنك القيام بذلك بشكل مختلف - خذها واستكملهاjava.util.concurrent.Semaphore
بواجهة اشتراك غير متزامنة (انظر AsyncSemaphore.java من مشروع DF4J ). يمكن لمثل هذه الإشارة أن تربط الممثلين وخيوط التنفيذ بأي ترتيب.
بشكل عام ، لكل نوع من أنواع التفاعل المتزامن (المنع) نظيره غير المتزامن (غير المحظور). لذلك ، في نفس مشروع DF4J هناك تنفيذ
BlockingQueue
، مدعومة بواجهة غير متزامنة. هذا يفتح إمكانية التحول خطوة بخطوة من برنامج متعدد مؤشرات الترابط إلى برنامج غير متزامن ، جزئياً استبدال الخيوط بالممثلين.