استخدام Azure Service Bus من Java

مرحبا زملائي! لقد حدث أن تم كتابة طلبنا في حزمة جافا ، ولكن تمت استضافته في Azure. ونحاول تحقيق أقصى استفادة من خدمات إدارة مزود السحابة.



أحدها هو Azure Service Bus واليوم أود التحدث عن ميزات استخدامه في تطبيق Spring Boot العادي.



إذا كنت تريد أن تقرأ عن ميزات أشعل النار - قم بالتمرير إلى نهاية المقالة



ما هو Azure Service Bus



بضع كلمات حول Azure Service Bus هو وسيط الرسائل السحابية (استبدال السحابة لـ RabbitMQ ، ActiveMQ). يدعم قوائم الانتظار (يتم تسليم الرسالة إلى مستلم واحد) والموضوعات (آلية النشر / الاشتراك) - بمزيد من التفاصيل هنا



يتم إعلان الدعم:



  1. رسائل مرتبة - تشير الوثائق إلى أن هذا يرد أولاً يصرف أولاً ، ولكن يتم تنفيذه باستخدام مفهوم جلسات الرسائل - مجموعة من الرسائل ، وليس قائمة الانتظار بأكملها. إذا كنت بحاجة إلى ضمان ترتيب الرسائل ، فحينئذٍ تقوم بدمج الرسائل في مجموعة ، وسيتم الآن تسليم الرسائل في المجموعة كـ FIFO. لذا فإن Azure Service Bus Queue ليس FIFO - فهو يسلم رسائلك بشكل عشوائي كما يناسبها
  2. قائمة انتظار الرسائل الميتة - كل شيء بسيط هنا ، لم يتمكنوا من تسليم الرسالة بنجاح بعد محاولات N أو فترة زمنية - انتقلوا إلى DLQ
  3. التسليم المجدول - يمكنك تعيين تأخير قبل التسليم
  4. تأجيل الرسائل - لإخفاء الرسائل في قائمة الانتظار ، ولن يتم تسليم الرسالة تلقائيًا ، ولكن يمكن استردادها بواسطة المعرف. نحتاج إلى تخزين هذا المعرف في مكان ما


كيفية التكامل مع Azure Service Bus



يدعم Azure Service Bus 1.0 AMQP ، مما يعني أنه غير متوافق مع عملاء RabbitMQ. يستخدم الأرنب AMQP 0.9.1



العميل "القياسي" الوحيد الذي يمكنه العمل مع ناقل الخدمة هو Apache Qpid .



هناك ثلاث طرق لإقران تطبيق Spring Boot مع Service Bus:



  1. JMS + QPID — , — QPID — .

    timeout producer — — factory.setCacheProducers(false);
  2. Spring Cloud — Azure Service Bus — , . Service Bus

    ( 1.2.6) — , azure service bus java sdk.



    Spring Integration — , «Scheduled delivery» «Message deferral» .



    sdk, MessageAndSessionPump

  3. azure service bus java sdk — ,


Spring Cloud — Azure Service Bus



سوف أتناول هذه الطريقة بمزيد من التفصيل وأخبرك عن ميزات استخدام

التطبيق النموذجي الموجود في المستودع الرسمي ، لذلك لا جدوى من تكرار الكود - يوجد هنا المستودع مع مثال .



لان إنها Spring Integration Messaging ، كل ذلك يعود إلى القناة ، و MessageHandler ، و MessagingGateway ، و ServiceActivator.



ثم هناك ServiceBusQueueTemplate .



إرسال الرسائل



يجب أن يكون لدينا قناة نكتب فيها الرسالة التي نريد إرسالها ، وعلى الطرف الآخر يوجد MessageHandler يرسلها إلى ناقل الخدمة.



و MessagHandler هو com.microsoft.azure.spring.integration.core.DefaultMessageHandler - وهذا هو الرابط للخدمة الخارجية.



كيف تربطه بقناة؟ - إضافة الشرح - ServiceActivator (inputChannel = OUTPUT_CHANNEL) والآن لدينا MessagHandler و الاستماع إلى OUTPUT_CHANNEL قناة .



بعد ذلك ، نحتاج إلى كتابة رسالتنا بطريقة ما إلى القناة - وهنا مرة أخرى سحر الربيع - نعلن عن MessagingGateway وربطه بالقناة بالاسم.



مقتطف من المثال :



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(String text);
}


هذا كل شيء: بوابة -> قناة -> MessagHandler -> ServiceBusQueueTemplate -> ServiceBusMessageConverter .



في الكود ، يبقى حقن بوابتنا واستدعاء طريقة الإرسال .



لقد ذكرت ServiceBusMessageConverter في سلسلة الاتصال لسبب ما - إذا كنت تريد إضافة رؤوس مخصصة (على سبيل المثال CORRELATION_ID) إلى الرسالة ، فهذا هو المكان الذي يجب نقلهم فيه من org.springframework.messaging.MessageHeaders إلى الرسالة الزرقاء.

طريقة خاصة setCustomHeaders .



في هذه الحالة ، ستبدو بوابتك كما يلي:



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(@Payload String text, @Header("CORRELATION_ID") String correlationId);
}


استقبال الرسائل



حسنًا ، نعرف كيف نرسل الرسائل ، وكيف نحصل عليها الآن؟



هنا كل شيء هو نفسه - MessageProducer -> القناة -> المعالج



The MessageProducer هي com.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusQueueInboundChannelAdapter - هذا هو الرابط الخاص بنا إلى خدمة خارجية. في الداخل ، يوجد نفس ServiceBusQueueTemplate مع ServiceBusMessageConverter حيث يمكنك قراءة الرؤوس المخصصة ووضعها في رسالة التكامل الربيعية.



القناة مثبتة بالفعل فيها يدويًا:



@Bean
public ServiceBusQueueInboundChannelAdapter queueMessageChannelAdapter(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, 
        ServiceBusQueueOperation queueOperation) {
    queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
    ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter(QUEUE_NAME,
            queueOperation);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}


لكن المعالج نفسه مرتبط بالقناة عبر ServiceActivator .



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
    String message = new String(payload);
.......


يمكنك الحصول على الخط على الفور:



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(String payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
.......


ربما تكون قد لاحظت معلمة Checkpointer غريبة Checkpointer ، يتم استخدامها لتأكيد معالجة الرسالة يدويًا.

إذا قمت بتعيين CheckpointMode.MANUAL عند إنشاء ServiceBusQueueInboundChannelAdapter ، يجب عليك إرسال إقرار للرسالة بنفسك. إذا كنت تستخدم CheckpointMode.RECORD ، فسيتم إرسال التأكيد تلقائيًا - التفاصيل في رمز ServiceBusQueueTemplate .







ميزات الاستخدام



لذا ، فإن قائمة "مكابس" و "رقائق" التي ذهبنا بالفعل.



ReceiveMode.PEEKLOCK



يدعم Azure Service Bus وضع PEEKLOCK - يأخذ المستهلك رسالة ، ويقفل في ناقل الخدمة ، ولا يمكن لأي شخص الوصول إليه لفترة معينة (مدة التأمين) ، ولكن لا يتم حذفه منه. إذا لم يرسل المستهلك خلال الوقت المخصص تأكيدًا للمعالجة - النجاح / التخلي أو لم يمدد القفل - تعتبر الرسالة متاحة مرة أخرى وسيتم إجراء محاولة تسليم جديدة.



ومن المثير للاهتمام ، التخلي ببساطة عن إعادة تعيين القفل وتصبح الرسالة متاحة على الفور لإعادة التسليم.



ServiceBusQueueTemplate الافتراضي يخلق QueueClient وضع ReceiveMode.PEEKLOCK .



إذا حدث استثناء غير معالج في معالجنا- لن يتم إرسال أي إعلام إلى الخادم وستظل الرسالة مقفلة وستتم إعادة تسليمها قبل انتهاء المهلة.

في هذه الحالة ، سيزداد عداد التسليم ، وهذا أمر منطقي.



لا أعرف ما إذا كان هذا خطأ أو ميزة - ولكن من الملائم جدًا إجراء تأخير بين إعادة المحاولة في المواقف التي يكون فيها ذلك ضروريًا.



إذا تعذر معالجة الرسالة حتى مع إعادة المحاولة ، فمن الضروري التقاط الاستثناءات وتمييز الرسالة على أنها تمت معالجتها وإضافة منطق إضافي إلى التطبيق ، وإلا فسيتم تسليمها مرارًا وتكرارًا حتى تصل إلى حد إعادة التسليم (يتم تكوينه عند إنشاء قائمة انتظار في ناقل الخدمة )



عدد رسائل الجلب المسبق والتزامن



كما قد تكون خمنت ، فإن إعداد التزامن مسؤول عن عدد معالجات الرسائل المتوازية ، وعدد رسائل الجلب المسبق هو عدد الرسائل التي سنصل إليها في المخزن المؤقت من الخادم.



افتراضيًا ، يتم تكوين ServiceBusQueueTemplate تلقائيًا (AzureServiceBusQueueAutoConfiguration) بقيمة 1 لكلا المعلمتين ، أي بشكل افتراضي ، ستحتوي كل قائمة انتظار على مؤشر ترابط معالجة واحد ، على الرغم من أن مفهوم ناقل الخدمة مع الإفادة لكل رسالة فردية يتضمن العديد من المعالجات المتزامنة. هذا هو الأهم إذا كان لديك معالجة طلب طويلة.



لسوء الحظ ، لا يمكن ضبط هذه الإعدادات من خلال تكوين التطبيق (application.yml / application.properties) ولا يمكن ضبطها إلا في الكود. ولكن حتى من خلال الكود ، لن تتمكن من تعيين إعدادات مختلفة لقوائم انتظار مختلفة.



لذلك ، إذا كنت بحاجة إلى إجراء إعدادات مختلفة ، فسيتعين عليك إنشاء العديد من وحدات برامج ServiceBusQueueTemplate لكل ServiceBusQueueInboundChannelAdapter



كومبليتابليفيوتشر داخل حافلة الخدمة اللازوردية java sdk



خدمة اللازوردية حافلة جافا SDK يتم تنفيذ نفسها حول CompletableFuture و CachedThreadPool المنفذ - MessagingFactory.INTERNAL_THREAD_POOL لذا كن حذرا مع جميع أنواع الفاصوليا موضوع المحلية



الرسائل المطلوبة



نستخدم ناقل الخدمة كقائمة انتظار وظيفية - تعتمد بعض الوظائف على بعضها البعض وبالتالي يجب تنفيذها بالترتيب الذي تم إنشاؤه بها.



كما ذكرت أعلاه ، تستخدم القمصان مفهوم جلسات الرسائل - عندما يتم تجميع الرسائل في جلسة عن طريق المفتاح (يتم نقلها في العنوان) ، تكون الجلسة موجودة طالما أن هناك رسالة واحدة على الأقل مع مفتاح الجلسة - بالتفصيل في وثائق

ناقل الخدمة يضمن تسليم الرسائل داخل هذه المجموعة بترتيب الإضافة إلى الخادم (أي بالترتيب الذي كتبه خادم ناقل الخدمة إلى المستودع).



من الجدير بالذكر أيضًا ما إذا كنت قد أنشأت قائمة انتظار ممكّنة للجلسات - وهذا يعني أنه يجب أن تحتوي جميع الرسائل على رأس مع مفتاح جلسة.



على الفور ، كنا سعداء جدًا بإمكانية ناقل الخدمة لصف الرسائل في قائمة انتظار FIFO - وإن كان ذلك لمجموعة من الرسائل.



لكن بعد فترة ، بدأنا نلاحظ المشاكل:



  • بدأت بعض الرسائل في الوصول إلى عدد لا نهائي من المرات
  • تباطأت معالجة قائمة الانتظار
  • في إحصائيات ناقل الخدمة ، يتم تمييز نصف الطلبات على أنها فاشلة ، وتظهر الطلبات الفاشلة حتى في قائمة انتظار فارغة عند الخمول


بالنظر إلى كود sdk ، اكتشفنا خصوصية العمل مع الجلسات:



  1. يلتقط المستهلك الجلسة ويبدأ في قراءة جميع الرسائل المتاحة فيها
  2. معالجة عدد الجلسات التي تساوي معامل التزامن في نفس الوقت
  3. unhandled exception — 1 ( ) — re-delivery ? 0 exception — ttl .
  4. — success abandon. — delay re-delivery

    .. abandon — , delivery counter .

    delivery count


نتيجة لذلك ، تخلوا عن ميزة حافلة الخدمة هذه وكتبوا دراجة ، وحافلة الخدمة تعمل كمحرك.



بمجرد إلغاء قائمة الانتظار الممكّنة للجلسات ، اختفت الأخطاء في الإحصائيات ؛ الطلب إلى ناقل الخدمة.



في حزمة JMS + Qpid - هذه الوظيفة غير متاحة.



المشاكل المحتملة مع أحجام قائمة الانتظار أكبر من 1G



لم نلتقي بعد ، لكن سمعت أنها تبدأ في العمل بشكل غير مستقر إذا كان حجم قائمة الانتظار أكبر من 1G.



إذا صادفت هذا أو العكس ، فكل شيء يعمل - اكتب في التعليقات.



مشاكل طلبات التعقب



لا يمكن لوكيل رؤى تطبيق azure القياسي تتبع إرسال الرسائل كرسائل تبعية ورسائل واردة كطلبات.



كان علي إضافة بعض التعليمات البرمجية.



النتيجة



إذا كنت بحاجة إلى قائمة انتظار مهمة مع معالجة رسائل طويلة ولا تحتاج إلى قائمة انتظار ، فيمكنك استخدام.



إذا كانت معالجة الرسائل سريعة - استخدم Azure Event Hub - كافكا العادي ، فإن العميل القياسي يعمل بشكل جيد.



All Articles