مخطط شرارة - التطور في الممارسة العملية

القراء الأعزاء ، يوم جيد!



في هذه المقالة ، يصف المستشار الرائد لخط أعمال Big Data Solutions في Neoflex بالتفصيل خيارات بناء واجهات متاجر ذات بنية متغيرة باستخدام Apache Spark.



كجزء من مشروع تحليل البيانات ، غالبًا ما تنشأ مهمة بناء مارت بناءً على بيانات غير منظمة.



عادةً ما تكون هذه سجلات ، أو استجابات من أنظمة مختلفة ، محفوظة بتنسيق JSON أو XML. يتم تحميل البيانات إلى Hadoop ، فأنت بحاجة إلى إنشاء عرض منها. يمكننا تنظيم الوصول إلى واجهة المتجر التي تم إنشاؤها ، على سبيل المثال ، من خلال Impala.



في هذه الحالة ، كان تخطيط واجهة المحل المستهدفة غير معروف من قبل. علاوة على ذلك ، لا يمكن إعداد المخطط مسبقًا ، لأنه يعتمد على البيانات ، ونحن نتعامل مع هذه البيانات ذات التنظيم الضعيف للغاية.



على سبيل المثال ، يتم تسجيل الإجابة التالية اليوم:



{source: "app1", error_code: ""}


وغدًا تأتي الإجابة التالية من نفس النظام:



{source: "app1", error_code: "error", description: "Network error"}


نتيجة لذلك ، يجب إضافة حقل آخر إلى واجهة المتجر - الوصف ، ولا أحد يعرف ما إذا كان سيأتي أم لا.



مهمة إنشاء سوق على مثل هذه البيانات قياسية إلى حد ما ، ولدى Spark عدد من الأدوات لذلك. يتم دعم كل من JSON و XML لتحليل البيانات الأولية ، ويتم توفير دعم schemaEvolution لمخطط غير معروف سابقًا.



للوهلة الأولى ، يبدو الحل بسيطًا. نحتاج إلى أخذ مجلد به JSON وقراءته في إطار بيانات. سيقوم Spark بإنشاء مخطط وتحويل البيانات المتداخلة إلى هياكل. بعد ذلك ، يجب حفظ كل شيء في الباركيه ، والذي يتم دعمه أيضًا في Impala ، من خلال تسجيل الواجهة في Hive metastore.



يبدو أن كل شيء بسيط.



ومع ذلك ، فليس من الواضح من الأمثلة القصيرة في التوثيق ما يجب القيام به مع عدد من المشاكل في الممارسة العملية.



يصف التوثيق أسلوبًا ليس لإنشاء واجهة محل ، ولكن لقراءة JSON أو XML في إطار بيانات.



وبالتحديد ، يتم إعطاؤه ببساطة كيفية قراءة وتحليل JSON:



df = spark.read.json(path...)


هذا يكفي لإتاحة البيانات لشركة Spark.



من الناحية العملية ، يكون السيناريو أكثر تعقيدًا من مجرد قراءة ملفات JSON من مجلد وإنشاء إطار بيانات. يبدو الموقف على النحو التالي: هناك بالفعل عرض معين ، وتأتي البيانات الجديدة كل يوم ، ويجب إضافتها إلى الواجهة ، دون أن ننسى أن المخطط قد يكون مختلفًا.



المخطط المعتاد لبناء واجهة المتجر هو كما يلي:



الخطوة 1. يتم تحميل البيانات في Hadoop ، متبوعة بإعادة التحميل يوميًا وإضافتها إلى قسم جديد. اتضح المجلد بالبيانات الأولية مقسمة حسب الأيام.



الخطوة 2.أثناء تمهيد التهيئة ، تتم قراءة هذا المجلد وتحليله بواسطة Spark. يتم حفظ إطار البيانات الناتج بتنسيق متاح للتحليل ، على سبيل المثال ، في الباركيه ، والذي يمكن بعد ذلك استيراده إلى إمبالا. يؤدي هذا إلى إنشاء عرض مستهدف مع جميع البيانات التي تراكمت حتى هذه النقطة.



الخطوة 3. يتم إنشاء تنزيل لتحديث واجهة المتجر كل يوم.

السؤال الذي يطرح نفسه هو التحميل الإضافي ، والحاجة إلى تقسيم الواجهة ، ومسألة دعم المخطط العام للعرض.



دعنا نعطي مثالا. لنفترض أنه تم تنفيذ الخطوة الأولى لبناء التخزين ، وتم تكوين تصدير ملفات JSON إلى مجلد.



لا يمثل إنشاء إطار بيانات منهم ، ثم حفظه كواجهة عرض ، مشكلة. هذه هي الخطوة الأولى التي يمكنك العثور عليها بسهولة في وثائق Spark:



df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)


يبدو أن كل شيء يكون على ما يرام.



قرأنا وحللنا JSON ، ثم نحفظ إطار البيانات كباركيه ، ونقوم بتسجيله في Hive بأي طريقة مناسبة:



df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')


نحصل على عرض.



لكن في اليوم التالي ، تمت إضافة بيانات جديدة من المصدر. لدينا مجلد به JSON ، وتم إنشاء واجهة عرض بناءً على هذا المجلد. بعد تحميل الجزء التالي من البيانات من المصدر ، ينفد سوق البيانات من البيانات ليوم واحد.



سيكون الحل المنطقي هو تقسيم واجهة المتجر حسب اليوم ، مما سيسمح بإضافة قسم جديد كل يوم تالي. آلية ذلك معروفة أيضًا ، يتيح لك Spark كتابة الأقسام بشكل منفصل.



أولاً ، نقوم بتحميل التهيئة ، وحفظ البيانات كما هو موضح أعلاه ، وإضافة التقسيم فقط. يسمى هذا الإجراء تهيئة واجهة المحل ويتم تنفيذه مرة واحدة فقط:



df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)


في اليوم التالي ، نقوم بتحميل قسم جديد فقط:



df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")


كل ما تبقى هو إعادة التسجيل مع Hive لتحديث المخطط.

ومع ذلك ، هذا هو المكان الذي تنشأ فيه المشاكل.



المشكلة الأولى. عاجلاً أم آجلاً ، لا يمكن قراءة الباركيه الناتج. هذا له علاقة بكيفية اختلاف الباركيه و JSON في الحقول الفارغة.



لنفكر في حالة نموذجية. على سبيل المثال ، وصل JSON أمس:



 1: {"a": {"b": 1}},


واليوم تبدو JSON نفسها كما يلي:



 2: {"a": null}


لنفترض أن لدينا قسمين مختلفين لكل منهما صف واحد.

عندما نقرأ البيانات الأولية بالكامل ، سيكون Spark قادرًا على تحديد النوع وفهم أن "a" هو حقل من النوع "هيكل" ، مع حقل متداخل "b" من النوع INT. ولكن ، إذا تم حفظ كل قسم على حدة ، فسيتم الحصول على باركيه مع أنظمة تقسيم غير متوافقة:



df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)


هذا الموقف معروف جيدًا ، لذلك تمت إضافة خيار خاص لإزالة الحقول الفارغة عند تحليل البيانات الأولية:



df = spark.read.json("...", dropFieldIfAllNull=True)


في هذه الحالة ، يتكون الباركيه من أقسام يمكن قراءتها معًا.

على الرغم من أن أولئك الذين فعلوا ذلك في الممارسة العملية سوف يضحكون بمرارة. لماذا ا؟ لأنه من المحتمل ظهور حالتين أخريين. أو ثلاثة. أو أربعة. الأول ، الذي سيظهر بشكل شبه مؤكد ، هو أن الأنواع الرقمية ستبدو مختلفة في ملفات JSON المختلفة. على سبيل المثال ، {intField: 1} و {intField: 1.1}. إذا تم العثور على مثل هذه الحقول في جزء واحد ، فإن دمج المخطط سيقرأ كل شيء بشكل صحيح ، مما يؤدي إلى النوع الأكثر دقة. ولكن إذا كان الأمر مختلفًا ، فسيكون لدى أحدهما intField: int ، والآخر intField: double.



هناك العلم التالي للتعامل مع هذا الموقف:



df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)


الآن لدينا مجلد توجد به الأقسام ، والذي يمكن قراءته في إطار بيانات واحد وباركيه صالح لواجهة المتجر بالكامل. نعم؟ لا.



تذكر أننا سجلنا الجدول في الخلية. الخلية ليست حساسة لحالة الأحرف في أسماء الحقول ، بينما الباركيه حساس لحالة الأحرف. لذلك ، الأقسام ذات المخططات: field1: int ، و Field1: int هي نفسها في Hive ، لكن ليس لـ Spark. تذكر كتابة أسماء الحقول بأحرف صغيرة.



بعد ذلك ، يبدو أن كل شيء على ما يرام.



ومع ذلك ، ليس كل شيء بهذه البساطة. تنشأ مشكلة ثانية معروفة أيضًا. نظرًا لأنه يتم حفظ كل قسم جديد على حدة ، ستكون ملفات خدمة Spark في مجلد القسم ، على سبيل المثال ، علامة نجاح العملية _SUCCESS. سيؤدي هذا إلى حدوث خطأ عند محاولة الباركيه. لتجنب ذلك ، تحتاج إلى إعداد التكوين عن طريق تعطيل Spark من إضافة ملفات الخدمة إلى المجلد:



hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")


يبدو أنه يتم الآن إضافة قسم باركيه جديد كل يوم إلى مجلد واجهة المتجر الهدف ، حيث يتم تخزين البيانات المحللة لهذا اليوم. لقد حرصنا مسبقًا على عدم وجود أقسام بها تعارض في نوع البيانات.



ولكن أمامنا المشكلة الثالثة. الآن المخطط العام غير معروف ، علاوة على ذلك ، في Hive ، الجدول الذي يحتوي على مخطط خاطئ ، لأن كل قسم جديد ، على الأرجح ، أدخل تشويهًا في المخطط.



تحتاج إلى إعادة تسجيل الجدول. يمكن القيام بذلك ببساطة: اقرأ باركيه واجهة المتجر مرة أخرى ، وأخذ المخطط وأنشئ DDL بناءً عليه ، والذي يعيد تسجيل المجلد في Hive كجدول خارجي ، وتحديث مخطط واجهة المتجر الهدف.



نحن نواجه مشكلة رابعة. في المرة الأولى التي سجلنا فيها الجدول ، اعتمدنا على Spark. الآن نقوم بذلك بأنفسنا ، وعليك أن تتذكر أن حقول الباركيه يمكن أن تبدأ بأحرف غير صالحة لـ Hive. على سبيل المثال ، يقوم Spark برمي الأسطر التي لا يمكنه تحليلها في حقل "corrupt_record". لا يمكن تسجيل مثل هذا الحقل مع Hive دون هروب.



بمعرفة هذا ، نحصل على المخطط:



f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)


الرمز ("_corrupt_record"، "` _corrupt_record` ") +" "+ f [1] .replace (": "،" `:"). استبدل ("<"، "<` "). استبدل ("، " ، "،"). replace ("array <" "،" array <") يجعل DDL آمنًا ، أي بدلاً من:



create table tname (_field1 string, 1field string)


باستخدام أسماء الحقول مثل "_field1 ، 1field" ، يتم إنشاء DDL آمن حيث يتم إفلات أسماء الحقول: إنشاء جدول `tname` (سلسلة` _field1` ، سلسلة `1field`).



السؤال الذي يطرح نفسه: كيفية الحصول على إطار البيانات مع المخطط الكامل بشكل صحيح (في كود pf)؟ كيف أحصل على هذا pf؟ هذه هي المشكلة الخامسة. هل تعيد قراءة مخطط جميع الأقسام من المجلد الذي يحتوي على ملفات باركيه لواجهة المتجر المستهدفة؟ هذه هي الطريقة الأكثر أمانًا ولكنها الأصعب.



المخطط موجود بالفعل في الخلية. يمكنك الحصول على مخطط جديد من خلال الجمع بين مخطط الجدول بأكمله والقسم الجديد. لذلك عليك أن تأخذ مخطط الجدول من Hive ودمجه مع مخطط التقسيم الجديد. يمكن القيام بذلك عن طريق قراءة البيانات الوصفية للاختبار من Hive ، وحفظها في مجلد مؤقت ، وقراءة كلا القسمين باستخدام Spark مرة واحدة.



في الأساس ، لديك كل ما تحتاجه: مخطط الجدول الأصلي في Hive وقسم جديد. لدينا أيضا البيانات. كل ما تبقى هو الحصول على مخطط جديد يجمع بين مخطط واجهة المتجر والحقول الجديدة من القسم الذي تم إنشاؤه:



from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")


بعد ذلك ، نقوم بإنشاء DDL لتسجيل الجدول ، كما في المقتطف السابق.

إذا كانت السلسلة بأكملها تعمل بشكل صحيح ، أي - كان هناك حمل تهيئة ، وفي الخلية يوجد جدول تم إنشاؤه بشكل صحيح ، فسنحصل على مخطط جدول محدث.



والمشكلة الأخيرة هي أنه لا يمكنك فقط إضافة قسم إلى جدول Hive ، حيث سيتم كسره. تحتاج إلى إجبار Hive على إصلاح بنية القسم:



from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)


تُترجم المهمة البسيطة المتمثلة في قراءة JSON وإنشاء واجهة متجر منها إلى التغلب على عدد من الصعوبات الضمنية ، والتي يتعين عليك البحث عن حلول لها بشكل منفصل. على الرغم من أن هذه الحلول بسيطة ، إلا أنها تستغرق وقتًا طويلاً للعثور عليها.



لتنفيذ بناء الواجهة ، كان علي:



  • أضف أقسامًا إلى واجهة المتجر ، تخلص من ملفات الخدمة
  • تعامل مع الحقول الفارغة في البيانات الأصلية التي كتبتها Spark
  • يلقي أنواع بسيطة في سلسلة
  • تحويل أسماء الحقول إلى أحرف صغيرة
  • تفريغ بيانات منفصل وتسجيل الجدول في الخلية (إنشاء DDL)
  • تذكر أن تهرب من أسماء الحقول ، والتي قد لا تكون متوافقة مع الخلية
  • تعلم كيفية تحديث تسجيل الجدول في Hive


بإيجاز ، نلاحظ أن قرار بناء واجهات عرض يخفي العديد من المزالق. لذلك ، إذا ظهرت صعوبات في التنفيذ ، فمن الأفضل الاتصال بشريك متمرس لديه خبرة ناجحة.



شكرا لقراءة هذا المقال ، نأمل أن تجد المعلومات المفيدة.



All Articles