تكامل الربيع - تدفقات البيانات الديناميكية

الألعاب النارية هبر! سنقوم اليوم بتحليل منطقة محددة نوعًا ما - معالجة البيانات المتدفقة باستخدام إطار عمل التكامل الربيعي وكيفية جعل هذه التدفقات في وقت التشغيل دون التهيئة الأولية في سياق التطبيق. تطبيق نموذج كامل في Gita .



المقدمة



Spring Integration هو إطار عمل تكامل مؤسسي (EIP) يستخدم آليات المراسلة تحت الغطاء بين محولات البروتوكولات / أنظمة التكامل المختلفة القائمة على قنوات الرسائل (قوائم الانتظار المشروطة). نظائرها الشهيرة هي Camel ، Mule ، Nifi.



من حالة الاختبار ، سيكون علينا - لتقديم خدمة REST يمكنها قراءة معلمات الطلب المستلمة ، والانتقال إلى قاعدة البيانات الخاصة بنا ، على سبيل المثال ، postgres ، التحديث والجلب من بيانات الجدول وفقًا للمعلمات المستلمة من المصدر ، وإرسال النتيجة مرة أخرى إلى قائمة الانتظار (طلب / response) ، وقم أيضًا بعمل مثيلات متعددة بمسارات طلب مختلفة.



بشكل تقليدي ، سيبدو مخطط تدفق البيانات كما يلي:



صورة



بعد ذلك ، سأوضح كيف يمكنك القيام بذلك ببساطة دون الرقص كثيرًا باستخدام الدف ، باستخدام IntegrationFlowContext ، مع نقاط نهاية مكون / مؤشر ترابط للتحكم. سيتم وضع جميع كود المشروع الرئيسي في المستودع ، وهنا سوف أشير إلى عدد قليل من القصاصات. حسنا ، من هو مهتم ، من فضلك ، تحت القط.



أدوات



لنبدأ مع كتلة التبعية افتراضيًا. في الأساس ، سنحتاج إلى مشاريع التمهيد الربيعي - من أجل أيديولوجية REST لإدارة التدفق / المكونات ، وتكامل الربيع - لإنشاء حالتنا بناءً على القنوات والمحولات.



ونعتقد على الفور ما نحتاجه أيضًا لإعادة إنتاج الحالة. بالإضافة إلى التبعيات الأساسية ، سنحتاج إلى مشاريع فرعية - Integral-http و Integration-jdbc و Integration-groovy (توفر محولات بيانات قابلة للتخصيص ديناميكيًا استنادًا إلى البرامج النصية Goovy). بشكل منفصل ، سأقول أنه في هذا المثال لن نستخدم محول رائع على أنه غير ضروري ، ولكننا سنوفر القدرة على تخصيصه من الخارج.



قائمة التبعية
 <!-- Spring block -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-groovy</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
        </dependency>

        <!-- Db block -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
        </dependency>

        <!-- Utility block -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.reflections</groupId>
            <artifactId>reflections</artifactId>
            <version>0.9.12</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>




مطبخ داخلي



دعنا ننتقل إلى إنشاء مكونات النظام الضرورية (الأغلفة / النماذج). سنحتاج إلى القناة والفاصوليا و httpInboundGateway والمعالج و jdbcOutboundGateway ونماذج النتائج.



  • الفول - كائن مساعد مطلوب للمحولات والخيط
  • قناة - قناة لتوصيل الرسائل من / إلى مكونات الدفق
  • httpInboundGateway - نقطة وصول http التي سنرسل إليها طلبًا بالبيانات لمزيد من المعالجة
  • معالج - نوع عام من المعالج (محولات الأخدود ، محولات مختلفة ، إلخ.)
  • jdbcOutboundGateway - محول jdbc
  • النتيجة - معالج لإرسال المعلومات إلى قناة معينة


سنحتاج إلى أغلفة لتخزين المعلمات وتهيئة مكونات الدفق بأكمله بشكل صحيح ، لذلك نقوم على الفور بإنشاء مخزن مكونات ، ونضيف. وظائف محولات JSON -> نموذج التعريف. رسم الخرائط المباشر للحقول باستخدام جاكسون والأشياء في حالتي لم يكن قابلاً للتطبيق - لدينا دراجة أخرى لبروتوكول اتصال محدد.



دعنا نقوم بذلك بشكل جيد على الفور ، باستخدام التعليقات التوضيحية :



StreamComponent - مسؤول عن تحديد الفئات كنموذج توليف لمكون دفق ولديه معلومات الخدمة - اسم المكون ونوع المكون وما إذا كان المكون متداخلاً ووصفًا ؛



SettingClass - المسؤول عن الخيارات الإضافية لمسح النموذج ، مثل مسح حقول الدرجة الفائقة وتجاهل الحقول عند تهيئة القيم ؛



SettingValue - المسؤول عن تحديد حقل الفئة على أنه قابل للتخصيص من الخارج ، مع إعدادات التسمية في JSON والوصف ومحول النوع وعلامة الحقل المطلوبة وعلامة الكائن الداخلي لأغراض إعلامية ؛



مدير تخزين المكونات



الأساليب المساعدة للعمل مع نماذج وحدات التحكم REST



نموذج أساسي - تجريد مع مجموعة من الحقول الإضافية / طرق النموذج



نماذج تكوين التدفق الحالي



Mapper JSON -> نموذج التعريف



تم إعداد الأرضية الرئيسية للعمل. لننتقل الآن إلى التنفيذ المباشر للخدمات التي ستكون مسؤولة عن دورة الحياة وتخزين وتهيئة التدفقات وسنضع على الفور فكرة أنه يمكننا موازاة تيار واحد بنفس التسمية في عدة حالات ، أي سنحتاج إلى عمل معرّفات فريدة (أدلة) لجميع مكونات التدفق ، وإلا قد تحدث تصادمات مع مكونات أخرى فردية (الفاصوليا والقنوات وما إلى ذلك) في سياق التطبيق. لكن دعنا أولاً نصنع مصممي خرائط من مكونين - http و jdbc ، أي زيادة النماذج التي تم إجراؤها سابقًا لمكونات الدفق نفسه (HttpRequestHandlerEndpointSpec و JdbcOutboundGateway).



HttpRegistry



JdbcRegistry



خدمة الإدارة المركزية ( StreamDeployingService) يؤدي وظائف تخزين العمال / غير النشطين ، ويسجل وظائف جديدة ، ويبدأ ، ويوقف ، ويزيل الخيوط بالكامل من سياق التطبيق. ميزة مهمة في الخدمة هي تقديم تبعية IntegrationFlowBuilderRegistry ، مما يساعدنا على جعل ديناميكيات التطبيق (ربما تذكر ملفات التكوين xml أو فئات DSL للكيلومترات). وفقًا لمواصفات البث ، يجب أن يبدأ دائمًا بمكون أو قناة واردة ، لذلك نأخذ هذا في الاعتبار عند تنفيذ طريقة registerStreamContext.



والمدير المساعد ( IntegrationFlowBuilderRegistry) ، الذي يؤدي وظيفة مخطط النماذج لتدفق المكونات وتهيئة التدفق نفسه باستخدام IntegrationFlowBuilder. لقد قمت أيضًا بتطبيق معالج السجل في خط أنابيب التدفق ، وخدمة لجمع مقاييس قناة التدفق (خيار قابل للتطبيق) وتنفيذ محتمل لمحولات رسائل التدفق استنادًا إلى تنفيذ Groovy (إذا أصبح هذا المثال فجأة أساسًا للبيع ، فيجب أن تتم الترجمة المسبقة للنصوص الرائعة في مرحلة تهيئة التدفق ، لأنك واجهت اختبارات الحمل في ذاكرة الوصول العشوائي وبغض النظر عن عدد النوى والقوة لديك). اعتمادًا على تكوين مراحل تسجيل النموذج والمعلمات على مستوى السجل ، سيكون نشطًا بعد كل إرسال لرسالة من مكون إلى مكون. تم تمكين المراقبة وتعطيلها بواسطة معلمة في application.yml:



monitoring:
  injectction:
    default: true


الآن لدينا جميع الآليات لتهيئة تدفقات معالجة البيانات الديناميكية ، يمكننا أيضًا كتابة مصممي الخرائط للبروتوكولات والمحولات المختلفة مثل RabbitMQ ، Kafka ، Tcp ، Ftp ، إلخ. علاوة على ذلك ، في معظم الحالات ، لا تحتاج إلى كتابة أي شيء بيدك (باستثناء ، بالطبع ، نماذج التكوين والأساليب المساعدة) - يوجد بالفعل عدد كبير إلى حد ما من المكونات في المستودع .



ستكون المرحلة النهائية هي تنفيذ وحدات التحكم للحصول على معلومات حول مكونات النظام الحالية ، وإدارة التدفقات والحصول على المقاييس.



ComponentsController - يوفر معلومات حول جميع المكونات في نموذج يمكن للمستخدم قراءته ، ومكون واحد حسب الاسم والنوع.



StreamController - يوفر إدارة التدفق الكامل ، أي تهيئة نماذج JSON الجديدة ، بدء وإيقاف وحذف وإصدار المقاييس حسب المعرف.



المنتج النهائي



نرفع التطبيق الناتج ووصف حالة الاختبار بتنسيق JSON.



عينة دفق البيانات
البرنامج النصي لتهيئة قاعدة البيانات:



CREATE TABLE IF NOT EXISTS account_data
(
    id          INT                      NOT NULL,
    accountname VARCHAR(45)              NOT NULL,
    password    VARCHAR(128),
    email       VARCHAR(255),
    last_ip     VARCHAR(15) DEFAULT NULL NOT NULL
);

CREATE UNIQUE INDEX account_data_username_uindex
    ON account_data (accountname);

ALTER TABLE account_data
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_data_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_data
    ADD CONSTRAINT account_data_pk
        PRIMARY KEY (id);

CREATE TABLE IF NOT EXISTS account_info
(
    id             INT NOT NULL,
    banned         BOOLEAN  DEFAULT FALSE,
    premium_points INT      DEFAULT 0,
    premium_type   SMALLINT DEFAULT -1
);

ALTER TABLE account_info
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_info_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_info
    ADD CONSTRAINT account_info_account_data_id_fk FOREIGN KEY (id) REFERENCES account_data (id)
        ON UPDATE CASCADE ON DELETE CASCADE;

ALTER TABLE account_info
    ADD CONSTRAINT account_info_pk
        PRIMARY KEY (id);



INSERT INTO account_data (accountname, password, email, last_ip)
VALUES ('test', 'test', 'test@test', '127.0.0.1');
INSERT INTO account_info (banned, premium_points, premium_type)
VALUES (false, 1000, 1);


هام: يتم استخدام معلمة الترتيب للتهيئة المتسلسلة للمكونات في سياق التدفق ، أي نظرًا لأن المكونات يتم إنشاؤها وفقًا لهذه المعلمة ، سيتم إعادة إنتاج معالجة الرسالة الواردة. (يتم دائمًا سرد القنوات وسلال التخزين أولاً في القائمة). وللصالح - تحتاج إلى القيام بمعالجة الرسم البياني وستختفي الحاجة إلى هذه المعلمة من تلقاء نفسها.



{
  "flowName": "Rest Postgres stream",
  "components": [
    {
      "componentName": "bean",
      "componentType": "other",
      "componentParameters": {
        "id": "pgDataSource",
        "bean-type": "com.zaxxer.hikari.HikariDataSource",
        "property-args": [
          {
            "property-name": "username",
            "property-value": "postgres"
          },
          {
            "property-name": "password",
            "property-value": "postgres"
          },
          {
            "property-name": "jdbcUrl",
            "property-value": "jdbc:postgresql://localhost:5432/test"
          },
          {
            "property-name": "driverClassName",
            "property-value": "org.postgresql.Driver"
          }
        ]
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcReqChannel",
        "order": 1,
        "channel-type": "direct",
        "max-subscribers": 1000
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcRepChannel",
        "order": 1,
        "channel-type": "direct"
      }
    },
    {
      "componentName": "http-inbound-gateway",
      "componentType": "source",
      "componentParameters": {
        "order": 2,
        "http-inbound-supported-methods": [
          "POST"
        ],
        "payload-type": "org.genfork.integration.model.request.http.SimpleJdbcPayload",
        "log-stages": true,
        "log-level": "INFO",
        "request-channel": "jdbcReqChannel",
        "reply-channel": "jdbcRepChannel"
      }
    },
    {
      "componentName": "handler",
      "componentType": "processor",
      "componentParameters": {
        "order": 3,
        "handler-definition": {
          "componentName": "jdbc-outbound-adapter",
          "componentType": "app",
          "componentParameters": {
            "data-source": "pgDataSource",
            "query": "SELECT accountname, password, email, last_ip, banned, premium_points, premium_type FROM account_data d INNER JOIN account_info i ON d.id = i.id WHERE d.id = :payload.accountId",
            "update-query": "UPDATE account_info SET banned = true WHERE id = :payload.accountId",
            "jdbc-reply-channel": "jdbcRepChannel",
            "log-stages": true,
            "log-level": "INFO"
          }
        }
      }
    },
    {
      "componentName": "result",
      "componentType": "app",
      "componentParameters": {
        "order": 4,
        "cancel": false,
        "result-channel": "jdbcRepChannel"
      }
    }
  ]
}





الاختبار:



1) نقوم بتهيئة دفق جديد باستخدام طريقة



POST / دفق / نشر ، حيث سيكون JSON في نص الطلب.



استجابة لذلك ، سيتعين على النظام الإرسال إذا كان كل شيء صحيحًا ، وإلا ستظهر رسالة خطأ:



{
    "status": "SUCCESS", -  
    "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b" -  
}


2) بدأنا البدء باستخدام الطريقة:



GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / start ، حيث نشير إلى معرف الدفق الأولي في وقت سابق.



استجابة لذلك ، سيتعين على النظام الإرسال إذا كان كل شيء صحيحًا ، وإلا ستظهر رسالة خطأ:



{
    "status": "SUCCESS", -  
}


3) استدعاء دفق بواسطة معرف في النظام؟ كيف وماذا وأين - في مخطط نموذج HttpRegistry ، كتبت الشرط



Http.inboundGateway(localPath != null ? localPath : String.format("/stream/%s/call", uuid))


حيث يتم أخذ معلمة مسار http-inbound في الاعتبار ، وإذا لم يتم تحديدها بشكل صريح في تكوين المكون ، يتم تجاهلها ويتم تعيين مسار استدعاء النظام. في حالتنا ، سيكون:



POST / stream / ece4d4ac-3b46-4952-b0a6-8cf334074b99 / call - حيث يوجد معرف الدفق ، مع نص الطلب:



{
    "accountId": 1
}


ردًا على ذلك ، سوف نتلقى ، إذا كانت مراحل معالجة الطلب قد عملت بشكل صحيح ، فسوف نتلقى هيكلًا ثابتًا لسجلات الجداول account_data و account_info.



{
    "accountname": "test",
    "password": "test",
    "email": "test@test",
    "last_ip": "127.0.0.1",
    "banned": true,
    "premium_points": 1000,
    "premium_type": 1
}


إن خصوصية محول JdbcOutboundGateway هي أنه إذا قمت بتحديد معلمة التحديث - الاستعلام ، يتم تسجيل معالج إضافي ، والذي يقوم أولاً بتحديث البيانات ، وبعد ذلك فقط يتم إحضاره بواسطة معلمة الاستعلام.



إذا حددت نفس المسارات يدويًا ، فسيتم إلغاء إمكانية تشغيل المكونات باستخدام HttpInboundGateway كنقطة وصول إلى دفق في عدة حالات لأن النظام لن يسمح بتسجيل مسار مشابه.



4) دعنا نلقي نظرة على المقاييس باستخدام طريقة GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / المقاييس



محتوى الاستجابة
, / , / / :



[
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcReqChannel",
        "sendDuration": {
            "count": 1,
            "min": 153.414,
            "max": 153.414,
            "mean": 153.414,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 153.414,
        "minSendDuration": 153.414,
        "meanSendDuration": 153.414,
        "meanSendRate": 0.001195117818082359,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.2bf65d9d-97c6-4199-86aa-0c808c25071b.channel#2",
        "sendDuration": {
            "count": 1,
            "min": 0.1431,
            "max": 0.1431,
            "mean": 0.1431,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.1431,
        "minSendDuration": 0.1431,
        "meanSendDuration": 0.1431,
        "meanSendRate": 0.005382436008121413,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 0.0
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcRepChannel",
        "sendDuration": {
            "count": 1,
            "min": 0.0668,
            "max": 0.0668,
            "mean": 0.0668,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.0668,
        "minSendDuration": 0.0668,
        "meanSendDuration": 0.0668,
        "meanSendRate": 0.001195118373693797,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    }
]




خاتمة



وهكذا ، تم توضيح كيف ، بعد قضاء وقت وجهد أكثر قليلاً ، في كتابة طلب للتكامل مع أنظمة مختلفة بدلاً من كتابة معالجات يدوية إضافية (خطوط الأنابيب) في كل مرة في تطبيقك للتكامل مع أنظمة أخرى ، 200-500 سطر من التعليمات البرمجية لكل منها.



في المثال الحالي ، يمكنك موازاة عمل نفس نوع سلاسل العمليات لعدة مثيلات عن طريق معرفات فريدة ، وتجنب الاصطدامات في السياق العام للتطبيق بين تبعيات الخيط (صناديق ، قنوات ، إلخ).



بالإضافة إلى ذلك ، يمكنك تطوير المشروع:



  • حفظ التدفقات إلى قاعدة البيانات ؛
  • تقديم الدعم لجميع مكونات التكامل التي يوفرها لنا مجتمع الربيع والربيع التكامل ؛
  • جعل العمال الذين يؤدون العمل مع الخيوط على جدول زمني ؛
  • إنشاء واجهة مستخدم سليمة لتهيئة التدفقات باستخدام "مكعبات الماوس والمكونات" الشرطية (بالمناسبة ، تم توضيح المثال جزئيًا لمشروع github.com/spring-cloud/spring-cloud-dataflow-ui ).


ومرة أخرى سوف أقوم بتكرار الرابط إلى المستودع .



All Articles