البيانات الكبيرة / الأخطاء: تحليل شفرة مصدر Apache Flink

image1.png


تعالج تطبيقات البيانات الضخمة كميات هائلة من المعلومات ، غالبًا في الوقت الفعلي. بطبيعة الحال ، يجب أن تكون هذه التطبيقات موثوقة للغاية بحيث لا يمكن لأي خطأ في الكود أن يتداخل مع معالجة البيانات. لتحقيق موثوقية عالية ، من الضروري مراقبة جودة رمز المشاريع المطورة لهذا المجال عن كثب. يتعامل محلل PVS-Studio الثابت مع هذه المشكلة. اليوم ، تم اختيار مشروع Apache Flink الذي طورته مؤسسة Apache Software Foundation ، وهي إحدى الشركات الرائدة في سوق برمجيات البيانات الضخمة ، كموضوع اختبار للمحلل.



ما هو اباتشي فلينك؟ إنه إطار مفتوح المصدر للمعالجة الموزعة لكميات كبيرة من البيانات. تم تطويره كبديل لبرنامج Hadoop MapReduce في عام 2010 في الجامعة التقنية في برلين. يعتمد إطار العمل على محرك تنفيذ موزع لتطبيقات معالجة الدُفعات والدفق. تمت كتابة هذا المحرك بلغات Java و Scala. اليوم يمكن استخدام Apache Flink في المشاريع المكتوبة باستخدام Java و Scala و Python وحتى SQL.



تحليل المشروع



بعد تنزيل الكود المصدري للمشروع ، بدأت في إنشاء المشروع باستخدام الأمر "mvn clean package -DskipTests" المحدد في الإرشادات الموجودة على GitHub . بينما كان التجميع قيد التقدم ، باستخدام الأداة المساعدة CLOC ، اكتشفت أن هناك 10838 ملف Java في المشروع ، والتي تحتوي على حوالي 1.3 مليون سطر من التعليمات البرمجية. علاوة على ذلك ، كان هناك 3833 ملفًا تجريبيًا لجافا ، وهو ما يمثل أكثر من ثلث جميع ملفات جافا. لقد لاحظت أيضًا أن المشروع يستخدم محلل الكود الثابت FindBugs والأداة المساعدة Cobertura ، التي توفر معلومات حول تغطية الكود عن طريق الاختبارات. مع وضع كل هذا في الاعتبار ، يصبح من الواضح أن مطوري Apache Flink راقبوا بعناية جودة التعليمات البرمجية وتغطية الاختبار أثناء التطوير.



بعد بناء ناجح ، فتحت المشروع في IntelliJ IDEA وأطلقت التحليل باستخدام PVS-Studio للمكوِّن الإضافي IDEA و Android Studio . تم توزيع تحذيرات جهاز التحليل على النحو التالي:



  • 183 عالية
  • 759 متوسط
  • 545 منخفض.


تم تعيين حوالي 2/3 من مشغلات محلل PVS-Studio لاختبار الملفات. بالنظر إلى هذه الحقيقة وحجم قاعدة كود المشروع ، يمكننا القول أن مطوري Apache Flink تمكنوا من الحفاظ على جودة الكود في أفضل حالاتهم.



بعد أن درست تحذيرات المحلل بمزيد من التفصيل ، اخترت أكثرها إثارة للاهتمام في رأيي. لذلك دعونا نرى ما تمكنت PVS-Studio من العثور عليه في هذا المشروع!





فقط القليل من الإهمال



V6001 توجد تعبيرات فرعية متطابقة "معالجة البيانات" على يسار ويمين عامل التشغيل '=='. CheckpointStatistics.java (229)



@Override
public boolean equals(Object o) 
{
  ....
  CheckpointStatistics that = (CheckpointStatistics) o;
  return id == that.id &&
    savepoint == that.savepoint &&
    triggerTimestamp == that.triggerTimestamp &&
    latestAckTimestamp == that.latestAckTimestamp &&
    stateSize == that.stateSize &&
    duration == that.duration &&
    alignmentBuffered == that.alignmentBuffered &&
    processedData == processedData &&                // <=
    persistedData == that.persistedData &&
    numSubtasks == that.numSubtasks &&
    numAckSubtasks == that.numAckSubtasks &&
    status == that.status &&
    Objects.equals(checkpointType, that.checkpointType) &&
    Objects.equals(
      checkpointStatisticsPerTask, 
      that.checkpointStatisticsPerTask);
}
      
      





على خلفية تعبيرات أخرى في المقابل ، فإن هذا الخطأ ليس لافتًا للنظر. عند تجاوز طريقة يساوي لفئة CheckpointStatistics ، أخطأ المبرمج في معالجة البيانات = = تعبير البيانات المجهزة ، وهو أمر لا معنى له لأنه صحيح دائمًا. وبالمثل، فإن بقية التعبير في المقابل كان لا بد من مقارنة مجال الكائن الحالي هذا وجوه هذا : processedData == that.processedData... هذا الموقف هو أحد أنماط الخطأ النموذجية الموجودة في وظائف المقارنة ، والتي تم وصفها بالتفصيل في مقالة " الشر يعيش في وظائف المقارنة ". وهكذا اتضح أن مجرد "القليل من عدم الانتباه" كسر منطق التحقق من تكافؤ كائنات فئة CheckpointStatistics .



التعبير دائما صحيح



يكون تعبير V6007 "input2.length> 0" دائمًا صحيحًا. Operator.java (283)



public static <T> Operator<T> createUnionCascade(Operator<T> input1, 
                                                 Operator<T>... input2) 
{
  if (input2 == null || input2.length == 0) 
  {
    return input1;                                // <=
  } 
  else if (input2.length == 1 && input1 == null) 
  {
    return input2[0];
  }
  ....
  if (input1 != null) 
  {
    ....
  } 
  else if (input2.length > 0 && input2[0] != null) // <=
  {
    ....
  } 
  else 
  {
    ....
  }
}
      
      





في هذه الطريقة ، تبين أن المحلل أكثر انتباهاً من الشخص ، والذي قرر الإبلاغ بطريقته الخاصة ، مشيرًا إلى أن التعبير input2.length> 0 سيكون دائمًا صحيحًا. والسبب هو أنه إذا كان طول مصفوفة input2 هو 0 ، فإن إدخال الشرط 2 == null || input2.length == 0 من الأول إذا كان في الطريقة سيكون صحيحًا ، وسيتم مقاطعة تنفيذ الطريقة قبل الوصول إلى السطر مع التعبير input2.length> 0 .



كل محلل الرؤية



تعبير V6007 'slotSharingGroup == null' خطأ دائمًا. StreamGraphGenerator.java (510)



private <T> Collection<Integer> transformFeedback(....)
{
  ....
  String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
  if (slotSharingGroup == null)
  {
    slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
  }
  ....
}
      
      





أبلغ المحلل عن خطأ slotSharingGroup == null دائمًا. هذا يشير إلى أن determineSlotSharingGroup طريقة لن يعود لاغية . هل المحلل ذكي للغاية بحيث يمكنه حساب جميع القيم التي يمكن أن تعيدها هذه الطريقة؟ دعونا نتحقق بشكل أفضل من كل شيء بأنفسنا:



public class StreamGraphGenerator 
{
  ....
  public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
  ....
  private String determineSlotSharingGroup(String specifiedGroup, 
                                           Collection<Integer> inputIds) 
  {
    if (specifiedGroup != null)
    {
      return specifiedGroup; // <= 1
    }
    else
    {
      String inputGroup = null;
      for (int id: inputIds)
      {
        String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
        if (inputGroup == null)
        {
          inputGroup = inputGroupCandidate;
        }
        else if (!inputGroup.equals(inputGroupCandidate))
        {
          return DEFAULT_SLOT_SHARING_GROUP; // <= 2
        }
      }
      return inputGroup == null 
             ? DEFAULT_SLOT_SHARING_GROUP 
             : inputGroup; // <= 3
    }
  }
  ....
}
      
      





بالترتيب نمر بجميع عمليات الإرجاع ونرى ما يمكن أن يستعيد هذه الطريقة:



  • سيعيد الإرجاع الأول المعامل إلى التابع selectedGroup ، ولكن فقط إذا لم يكن فارغًا .
  • return for DEFAULT_SLOT_SHARING_GROUP, ;
  • return inputGroup, null. DEFAULT_SLOT_SHARING_GROUP.


وتبين أن المحلل كان قادرا حقا لحساب استحالة عودته لاغية من determineSlotSharingGroup طريقة وحذرنا حول هذا الموضوع، لافتا الى اللامعنى للتحقق slotSharingGroup == لاغية . وعلى الرغم من أن هذا الموقف ليس خاطئًا ، إلا أن هذه الحماية الإضافية للمحلل يمكن أن تكتشف خطأ في بعض الحالات الأخرى. على سبيل المثال ، عندما تحتاج إلى طريقة لإرجاع قيمة خالية في ظل ظروف معينة.



اجمعهم كلهم



تعبير V6007 " currentCount <= lastEnd" يكون دائمًا صحيحًا. CountSlidingWindowAssigner.java (75)



V6007 تعبير 'lastStart <= currentCount' صحيح دائمًا. CountSlidingWindowAssigner.java (75)



@Override
public Collection<CountWindow> assignWindows(....) throws IOException 
{
  Long countValue = count.value();
  long currentCount = countValue == null ? 0L : countValue;
  count.update(currentCount + 1);
  long lastId = currentCount / windowSlide;
  long lastStart = lastId * windowSlide;
  long lastEnd = lastStart + windowSize - 1;
  List<CountWindow> windows = new ArrayList<>();
  while (lastId >= 0 && 
         lastStart <= currentCount && 
         currentCount <= lastEnd) 
  {
    if (lastStart <= currentCount && currentCount <= lastEnd) // <=
    {
      windows.add(new CountWindow(lastId));
    }
    lastId--;
    lastStart -= windowSlide;
    lastEnd -= windowSlide;
  }
  return windows;
}
      
      





يحذر المحلل من أن التعبيرات currentCount <= lastEnd و lastStart <= currentCount صحيحة دائمًا. في الواقع ، إذا نظرت إلى حالة حلقة while ، فستجد نفس التعبيرات تمامًا. هذا يعني أن هذه التعبيرات ستكون صحيحة دائمًا داخل الحلقة ، لذلك ستتم إضافة جميع الكائنات من نوع CountWindow التي تم إنشاؤها في الحلقة إلى قائمة windows . هناك العديد من الخيارات لظهور هذا الاختيار الذي لا معنى له ، وأول ما يتبادر إلى الذهن هو إما قطعة أثرية لإعادة البناء أو طمأنة المطور. لكن يمكن أن يكون خطأ ، إذا أردت التحقق من شيء آخر ...



ترتيب حجة غير صحيح



V6029 ترتيب غير صحيح للوسيطات التي تم تمريرها إلى الأسلوب: 'hasBufferForReleasedChannel'، 'hasBufferForRemovedChannel'. NettyMessageClientDecoderDelegateTest.java (165) ، NettyMessageClientDecoderDelegateTest.java (166)



private void testNettyMessageClientDecoding(
       boolean hasEmptyBuffer,
       boolean hasBufferForReleasedChannel,
       boolean hasBufferForRemovedChannel) throws Exception 
{
  ....
  List<BufferResponse> messages = createMessageList (
    hasEmptyBuffer,
    hasBufferForReleasedChannel,
    hasBufferForRemovedChannel);
  ....
}
      
      





يؤدي افتقار Java إلى القدرة على استدعاء طريقة باستخدام معلمات محددة أحيانًا إلى مزحة قاسية مع المطورين. هذا بالضبط ما حدث عندما أشار المحلل إلى طريقة createMessageList . بالنظر إلى تعريف هذه الطريقة ، يتضح أنه يجب تمرير المعلمة hasBufferForRemovedChannel إلى الطريقة قبل المعامل hasBufferForReleasedChannel :



private List<BufferResponse> createMessageList(
  boolean hasEmptyBuffer,
  boolean hasBufferForRemovedChannel,
  boolean hasBufferForReleasedChannel) 
{
  ....
  if (hasBufferForReleasedChannel) {
    addBufferResponse(messages, 
                      releasedInputChannelId, 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  if (hasBufferForRemovedChannel) {
    addBufferResponse(messages, 
                      new InputChannelID(), 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  ....
  return messages;
}
      
      





ومع ذلك ، عند استدعاء الطريقة ، قام المطور بخلط ترتيب هذه الوسائط ، ولهذا السبب سيتم كسر منطق طريقة createMessageList إذا اختلفت قيم الوسائط المختلطة.



أوه ، هذا النسخ واللصق



V6032 من الغريب أن يكون جسم طريقة "بحث عن فيرست "مكافئًا تمامًا لجسم طريقة أخرى" بحث عن آخر ". RocksIteratorWrapper.java (53) ، RocksIteratorWrapper.java (59)



public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable {
  ....
  private RocksIterator iterator;
  ....

  @Override
  public void seekToFirst() {
    iterator.seekToFirst(); // <=
    status(); 
  }
  
  @Override
  public void seekToLast() {
    iterator.seekToFirst();  // <=
    status();
  }
  
  ....
}
      
      





إن أجسام أساليب البحث عن "إلى فيرست" و " تسعى إلى آخر" هي نفسها. علاوة على ذلك ، يتم استخدام كلتا الطريقتين في الكود.



شيء غير نظيف هنا! في الواقع ، إذا نظرت إلى الطرق التي يستخدمها كائن المكرر ، فسوف يتضح الخطأ الذي ساعد المحلل في العثور عليه:



public class RocksIterator extends AbstractRocksIterator<RocksDB>
{
  ....
}

public abstract class AbstractRocksIterator<....> extends ....
{
  ....
  public void seekToFirst() // <=
  {
    assert this.isOwningHandle();
    this.seekToFirst0(this.nativeHandle_);
  }
  
  public void seekToLast() // <=
  {
    assert this.isOwningHandle();
    this.seekToLast0(this.nativeHandle_);
  }
  ....
}
      
      





اتضح أن طريقة البحث عن فئة RocksIteratorWrapper تم إنشاؤها بواسطة طريقة النسخ واللصق ، والتي تسعى إلى الحصول على نفس الفئة. ومع ذلك، لسبب ما، نسي المطور ليحل محل و مكرر الصورة seekToFirst طريقة الدعوة مع seekToLast .



الارتباك مع سلاسل التنسيق



تنسيق V6046 غير صحيح. من المتوقع وجود عدد مختلف من عناصر التنسيق. الوسائط غير المستخدمة: 1. UnsignedTypeConversionITCase.java (102)



public static void prepareMariaDB() throws IllegalStateException {
  ....
  if (!initDbSuccess) {
    throw new IllegalStateException(
      String.format(
        "Initialize MySQL database instance failed after {} attempts," + // <=
        " please open an issue.", INITIALIZE_DB_MAX_RETRY));
  }
}
      
      





تختلف سلاسل التنسيق الخاصة بأسلوب String.format وأدوات تسجيل Java. على عكس سلسلة التنسيق الخاصة بأسلوب String.format ، حيث يتم تحديد بدائل الوسيطة باستخدام الحرف "٪" ، تستخدم سلاسل تنسيق المُسجل مجموعة الأحرف "{}" بدلاً من ذلك. بسبب هذا الالتباس ، حدث هذا الخطأ. كسلسلة تنسيق ، يتم تمرير سلسلة إلى طريقة String.format ، والتي تم نسخها على الأرجح من مكان آخر حيث تم استخدامها في بعض المسجل. نتيجة لذلك ، لن يتم استبدال قيمة الحقل INITIALIZE_DB_MAX_RETRY في رسالة IllegalStateException . بدلاً من "{}" ، والشخص الذي يمسك أو يسجل هذا الاستثناء لن يعرف أبدًا عدد محاولات الاتصال بقاعدة البيانات التي تم إجراؤها.



توزيع غير طبيعي



V6048 يمكن تبسيط هذا التعبير. معامل "مؤشر" في العملية يساوي 0. CollectionUtil.java (76)



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets;                                 // <=
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))

           .add(element); 
  }

  return buckets.values();
}
      
      





تقسم طريقة التقسيم العناصر من مجموعة العناصر إلى مقاطع متعددة ، ثم تعيد تلك المقاطع. ومع ذلك ، نظرًا للخطأ الذي أشار إليه المحلل ، فلن يحدث فصل. سيكون التعبير المستخدم لتحديد مؤشر رقم المقطع ٪ numBuckets دائمًا 0 ، لأن الفهرس دائمًا هو 0. اعتقدت في الأصل أن رمز هذه الطريقة قد تمت إعادة بنائه ، ونتيجة لذلك نسوا إضافة زيادة في متغير الفهرس في الحلقة for . لكن بالنظر إلى الالتزامحيث تمت إضافة هذه الطريقة ، اتضح أن هذا الخطأ جاء مع هذه الطريقة. نسخة مصححة من الكود:



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets; 
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))
           .add(element);
    index++;
  }

  return buckets.values();
}
      
      





نوع غير متوافق



V6066 نوع العنصر الذي تم تمريره كوسيطة غير متوافق مع نوع المجموعة: String ، ListStateDescriptor <NextTransactionalIdHint>. FlinkKafkaProducer.java (1083)



public interface OperatorStateStore 
{
  Set<String> getRegisteredStateNames();
}
public class FlinkKafkaProducer<IN> extends ....
{
  ....
  private static final 
  ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>
  NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = ....;

  @Override
  public void initializeState(FunctionInitializationContext context).... 
  {
    ....
    if (context.getOperatorStateStore()
               .getRegisteredStateNames()
               .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR))    // <=
    {
       migrateNextTransactionalIdHindState(context);
    }
    ....
  }
}
      
      





سيكون التعبير الذي يشير إليه المحلل دائمًا خاطئًا ، مما يعني أن استدعاء طريقة migrateNextTransactionalIdHindState لن يحدث أبدًا. كيف حدث أن يبحث شخص ما عن عنصر من نوع مختلف تمامًا في مجموعة من النوع Set <String> - ListStateDescriptor <FlinkKafkaProducer.NextTransactionalIdHint> ؟ بدون مساعدة المحلل ، كان مثل هذا الخطأ ، على الأرجح ، قد عاش في الكود لفترة طويلة جدًا ، لأنه لا يلفت الأنظار ومن المستحيل العثور عليه بدون فحص شامل لهذه الطريقة.



تغيير متغير غير ذري



V6074 تعديل غير ذري لمتغير متطاير. فحص "currentNumAcknowledgedSubtasks". PendingCheckpointStats.java (131)



boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
  TaskStateStats taskStateStats = taskStats.get(jobVertexId);

  if (taskStateStats != null && taskStateStats.reportSubtaskStats(subtask)) {
    currentNumAcknowledgedSubtasks++;                // <=
    latestAcknowledgedSubtask = subtask;

    currentStateSize += subtask.getStateSize();      // <=

    long processedData = subtask.getProcessedData();
    if (processedData > 0) {
      currentProcessedData += processedData;         // <=
    }

    long persistedData = subtask.getPersistedData();
    if (persistedData > 0) {
      currentPersistedData += persistedData;         // <=
    }
    return true;
  } else {
    return false;
  }
}
      
      





بالإضافة إلى 3 تحذيرات أخرى للمحلل بنفس الطريقة:



  • V6074 تعديل غير ذري لمتغير متطاير. افحص "currentStateSize". PendingCheckpointStats.java (134)
  • V6074 تعديل غير ذري لمتغير متطاير. فحص "currentProcessedData". PendingCheckpointStats.java (138)
  • V6074 تعديل غير ذري لمتغير متطاير. فحص "currentPersistedData". PendingCheckpointStats.java (143)


اقترح المحلل أن ما يصل إلى 4 مجالات متطايرة في الطريقة تتغير غير الذرية. والمحلل ، كما هو الحال دائمًا ، يتضح أنه صحيح ، لأن عمليتي ++ و + = هي في الواقع سلسلة من عدة عمليات قراءة - تعديل - كتابة. كما تعلم ، فإن القيمة المتغيرة للحقل تكون مرئية لجميع مؤشرات الترابط ، مما يعني أنه قد يتم فقد جزء من تغييرات الحقل بسبب حالة السباق. يمكنك قراءة المزيد من المعلومات التفصيلية حول هذا في وصف التشخيص.



استنتاج



في مشاريع البيانات الضخمة ، تعد الموثوقية أحد المتطلبات الرئيسية ؛ لذلك ، يجب مراقبة جودة الكود فيها عن كثب. تمت مساعدة مطوري Apache Flink في ذلك من خلال العديد من الأدوات ، وقد كتبوا أيضًا عددًا كبيرًا من الاختبارات. ومع ذلك ، حتى في ظل هذه الظروف ، كان محلل PVS-Studio قادرًا على اكتشاف الأخطاء. من المستحيل التخلص من الأخطاء تمامًا ، ولكن استخدام العديد من أدوات تحليل التعليمات البرمجية الثابتة بشكل منتظم سيسمح لك بالاقتراب من هذا النموذج المثالي. نعم ، بشكل منتظم. فقط مع الاستخدام المنتظم يظهر التحليل الساكن فعاليته ، والتي تم وصفها بمزيد من التفصيل في هذه المقالة .





إذا كنت ترغب في مشاركة هذه المقالة مع جمهور يتحدث الإنجليزية ، فيرجى استخدام رابط الترجمة: فاليري كوماروف. البيانات الكبيرة / بيانات الأخطاء: تحليل شفرة مصدر Apache Flink .



All Articles