سيارة أجرة بدون طيار تقود البط المطاطي الأصفر حول المدينة! وحدة التحقق من المشكلات لمنصة Gym-Duckietown

يقول المحللون إنه بحلول عام 2040 ، ستقود معظم المدن الكبرى في العالم سيارات بدون سائقين . ولكن للاسترخاء على الطريق بعد 20 عامًا من الآن ، نحتاج إلى القيام ببعض العمل الجيد على خوارزميات القيادة المستقلة. للقيام بذلك ، طور معهد ماساتشوستس للتكنولوجيا منصة Duckietown ، والتي تتيح لك القيام بذلك بأقل تكلفة. في Duckietown ، تنقل الروبوتات المتنقلة منخفضة التكلفة البط المطاطي الأصفر في نموذج مصغر للمدينة. على أساس هذه المنصة ، يتم عقد أولمبياد القيادة بالذكاء الاصطناعي وإطلاق دورات في الجامعات حول استخدام تقنيات الذكاء الاصطناعي في إدارة المركبات غير المأهولة.



في هذه المقالة سأتحدث عن مشروع الدورة التدريبية الخاص بي ، والذي عملت عليه مع مختبر خوارزميات الروبوت المحمول JetBrains Research : حول مدقق المشكلة الذي كتبته لمحاكي Gym-Duckietown . سنتحدث عن نظام الاختبار وتكامل هذا النظام مع المنصات التعليمية عبر الإنترنت التي تستخدم تقنية المصنف الخارجي - على سبيل المثال ، مع منصة Stepik.org .









نبذة عن الكاتب



اسمي دانييل بلوشينكو ، وأنا طالب في السنة الأولى (الثانية) في برنامج الماجستير " البرمجة وتحليل البيانات " في كلية سان بطرسبرج HSE. في عام 2019 ، أكملت درجة البكالوريوس في الرياضيات التطبيقية وعلوم الكمبيوتر في نفس الجامعة.



منصة Duckietown



Duckietown هو مشروع بحثي عن المركبات المستقلة . أنشأ منظمو المشروع منصة تساعد على تنفيذ نهج جديد للتعلم في مجال الذكاء الاصطناعي والروبوتات. بدأ كل شيء كدورة في معهد ماساتشوستس للتكنولوجيا في عام 2016 ، لكنه انتشر تدريجيًا في جميع أنحاء العالم وعبر مستويات مختلفة من التعليم ، من المدرسة الثانوية إلى برامج الماجستير.



تتكون منصة Duckietown من جزأين. أولاً ، إنه نموذج مصغر لبيئة النقل الحضري مع الطرق والمباني وإشارات الطرق والعوائق. ثانيًا ، إنه النقل. تتلقى الروبوتات الصغيرة المتنقلة (Duckiebots) التي تدير Raspberry Pi معلومات حول العالم من حولها من خلال كاميرا وتنقل سكان المدينة - البط المطاطي الأصفر - على طول الطرق.







لقد كنت أعمل مع محاكي Duckietown. يدعيGym-Duckietown ، وهو مشروع مفتوح المصدر مكتوب بلغة Python. يضع المحاكي الروبوت الخاص بك داخل المدينة ، ويغير موقعه اعتمادًا على الخوارزمية التي تستخدمها (أو على الزر الذي ضغطت عليه) ، ويعيد رسم الصورة ويكتب الموقع الحالي للروبوت في السجلات.  



إذا كنت مهتمًا بالمحاولة ، فإنني أوصي باستنساخ المستودع بنفسك وتشغيل manual_control.py : بهذه الطريقة يمكن التحكم في الروبوت باستخدام الأسهم الموجودة على لوحة المفاتيح.



لقطة شاشة من



المحاكي يمكن استخدام المحاكي كبيئة لتنفيذ المهام. دعنا نحدد المشكلة التالية: على خريطة معينة ، تتكون من حارة واحدة ، تحتاج إلى القيادة مترًا واحدًا في خط مستقيم.







لحل المشكلة ، يمكنك استخدام الخوارزمية التالية:



for _ in range(25):
    env.step([1, 0])
    env.render()


هذا المتغير envيخزن حالة البيئة.

تأخذ طريقة الخطوة كمدخل action: قائمة من عنصرين يصفان عمل الروبوت. يحدد العنصر الأول السرعة ، والثاني - زاوية الدوران. renderتعيد الطريقة رسم الصورة ، مع مراعاة الموضع الجديد للروبوت. يتم تحديد عدد الخطوات والسرعة بشكل تجريبي: هذه هي القيم التي يحتاجها الروبوت ليقطع مترًا واحدًا بالضبط في خط مستقيم.



إذا كنت ترغب في استخدام هذا المقتطف في manual_control.py ، فقم بلصقه هنا . الكود حتى هذه النقطة مشغول بتحميل البيئة. للتبسيط ، يمكنك إعادة استخدامه ، ثم إضافة الحل أعلاه إلى المشكلة.



نظام الاختبار



أود أن أكون قادرًا على التحقق من هذه المهام تلقائيًا: تنفيذ خوارزمية التحكم في الروبوت ومحاكاة الرحلة والإبلاغ عما إذا كانت الخوارزمية المقترحة تؤدي المهمة بشكل صحيح. سيجعل نظام الاختبار هذا من الممكن استخدام البيئة أثناء التحضير للمسابقات في إدارة النقل المستقل ، وكذلك للأغراض التعليمية: لإصدار مجموعة من المشكلات للطلاب والتحقق من حلولهم تلقائيًا. لقد شاركت في تطويره أثناء العمل في مشروع الدورة التدريبية وسأخبرك أدناه بما حصلت عليه.



يبدو تسلسل الخطوات عند فحص الحل كما يلي:







يمكنك إضافة مهام إلى نظام الاختبار بنفسك أو الرجوع إلى المهام التي قمت بها. كل مهمة لها منشئ شرط - فئة تولد حالة البيئة. يحتوي على اسم الخريطة وموضع بداية الروبوت داخل خلية البداية. يتم أيضًا تعيين إحداثيات الهدف: في هذه الحالة ، تكون نقطة على بعد متر واحد من موضع البداية.



class Ride1MTaskGenerator(TaskGenerator):
    def __init__(self, args):
        super().__init__(args)

    def generate_task(self):
        env_loader = CVTaskEnv if self.args.is_cv_task else TrackingDuckietownEnv
        env = env_loader(
            map_name="straight_road",
            position_on_initial_road_tile=PositionOnInitialRoadTile(
                x_coefficient=0.5,
                z_coefficient=0.5,
                angle=0,
            ),
        )
        self.generated_task['target_coordinates'] = [[env.road_tile_size * 0.5 + 1, 0, env.road_tile_size * 0.5]]
        self.generated_task['env'] = env
        env.render()
        return self.generated_task


هنا TrackingDuckietownEnvو CVTaskEnvهناك فئات المجمع التي تستخدم لتخزين المعلومات عن الرحلة لمزيد من التحليل. 



class TrackingDuckietownEnv:
    def __init__(self, **kwargs):
        self.__wrapped = DuckietownEnv(**kwargs)
    def step(self, action):
        obs, reward, done, misc = self.__wrapped.step(action)
        message = misc['Simulator']['msg']
        if 'invalid pose' in message.lower():
            raise InvalidPoseException(message)
        for t in self.trackers:
            t.track(self)
        return obs, reward, done, misc


تقوم أجهزة التتبع بجمع معلومات حول الحالة الحالية ، مثل موقع الروبوت.



CVTaskEnvيتم استخدامه إذا كان الحل مطلوبًا باستخدام معلومات فقط من الكاميرا ("رؤية الكمبيوتر") ، وليس وظائف المحاكي: على سبيل المثال ، إذا كنت بحاجة إلى معرفة مدى بُعد الروبوت عن مركز الشريط أو مكان أقرب كائن مرئي. يمكن لوظائف CVTaskEnvمحاكي الاستدعاء تبسيط حل المشكلة ، وتحد الفئة من استدعاء أساليب المحاكي. يتم استخدامه عند عرض العلم is_cv_task



class CVTaskEnv:
    def __init__(self, **kwargs):
        self.__wrapped = TrackingDuckietownEnv(**kwargs)

    def __getattr__(self, item):
        if item in self.__wrapped.overriden_methods:
            return self.__wrapped.__getattribute__(item)
        ALLOWED_FOR_CV_TASKS = [
            'render', '_CVTaskEnv__wrapped', '_TrackingDuckietownEnv__wrapped',
            'road_tile_size', 'trip_statistics'
        ]
        if item in ALLOWED_FOR_CV_TASKS:
            return self.__wrapped.__getattr__(item)
        else:
            raise AttributeError(item + " call is not allowed in CV tasks")


بعد اكتمال تنفيذ القرار - بشرط ألا يتم مقاطعته بسبب انتهاء المهلة - يتم تمرير معلومات الرحلة عبر سلسلة من المدققات. إذا نجحت جميع أجهزة الداما ، فسيتم حل المشكلة بشكل صحيح. بخلاف ذلك ، يتم عرض حكم توضيحي - على سبيل المثال ، تحطم روبوت ، أو انحرف عن الطريق ، وما إلى ذلك.



إليك أحد المدققات القياسية. يتحقق من أن الروبوت قد عاد إلى نقطة البداية في نهاية الرحلة. يمكن أن يكون هذا مفيدًا ، على سبيل المثال ، في مهمة تحتاج إلى القيادة إلى نقطة معينة ثم العودة مرة أخرى.



class SameInitialAndFinalCoordinatesChecker(Checker):
    def __init__(self, maximum_deviation=0.1, **kwargs):
        super().__init__(**kwargs)
        self.maximum_deviation = maximum_deviation

    def check(self, generated_task, trackers, **kwargs):
        trip_statistics = next(x for x in trackers if isinstance(x, TripStatistics))
        trip_data = trip_statistics.trip_data
        if len(trip_data) == 0:
            return True
        initial_coordinates = trip_data[0].position.coordinates
        final_coordinates = trip_data[-1].position.coordinates
        return np.linalg.norm(initial_coordinates - final_coordinates) < self.maximum_deviation


يمكن استخدام نظام الاختبار هذا في الوضع اليدوي ، أي بدء الاختبار يدويًا ، ثم دراسة الحكم بصريًا. إذا أردنا ، على سبيل المثال ، إطلاق دورة تدريبية عبر الإنترنت حول النقل المستقل على موقع Stepik.org ، فسنحتاج إلى التكامل مع النظام الأساسي. سيتم مناقشة هذا في الجزء التالي من المقالة.



التكامل مع منصات الإنترنت



لمهام الاختبار ، غالبًا ما يتم استخدام تقنية المصنف الخارجي ، والتي تم تطويرها بواسطة منصة edX .



عند استخدام المصنف الخارجي ، لا تقوم المنصة التعليمية بفحص المهام من تلقاء نفسها ، ولكنها تنشئ قائمة انتظار من الحزم التي يتم إرسالها إلى جهاز آخر. يتم تنفيذ وظيفة توصيل قائمة الانتظار في مشروع xqueue-watcher . يقوم مراقب Xqueue بجلب الطرود ثم يتم اختبارها بواسطة المدقق (الذي يقوم عادةً بأشياء غير تافهة أكثر من مقارنات النص / الأرقام). بعد ذلك ، يتم إرسال حكم التحقق مرة أخرى إلى جانب المنصة التعليمية.



دعونا نفكر بمزيد من التفصيل في لحظة الاتصال بقائمة الانتظار. بعد أن توفر المنصة التعليمية بيانات الاتصال ، يجب إضافتها إليهاملفات التكوين ، وفي طريقة الدرجات ، قم بتنفيذ إطلاق التحقق مباشرة. تعليمات أكثر تفصيلا ويمكن الاطلاع هنا و هنا .



يستدعي Xqueue-watcher نقطة النهاية get_submission ، والتي ستسترجع الحزمة من قائمة الانتظار إن أمكن. بعد ذلك ، تذهب للاختبار. يقوم مراقب xqueue بعد ذلك باستدعاء put_result لإرجاع الحكم.



يمكنك بدء مشاهدة xqueue مثل هذا:



make requirements && python -m xqueue_watcher -d conf.d/


لنفترض أننا نريد استخدام تقنية المصنف الخارجي ، لكن لا نريد إجراء الدورة التدريبية على منصة عبر الإنترنت. يتم تنفيذ Xqueue-watcher على افتراض وجود بعض تخزين الملفات حيث يتم تحميل الملفات ذات الحلول (تحتوي الأنظمة الأساسية على مثل هذا التخزين). يمكننا تعديل Xqueue بحيث لا تكون هناك حاجة لتخزين الملفات ، ويمكن تشغيل هذه الأنظمة بشكل عام ، حتى على الكمبيوتر المحمول الخاص بنا.



تحتاج أولاً إلى معرفة كيفية الحفاظ على قائمة انتظار الطرود نفسها. يتم توفير وظيفة قائمة الانتظار من خلال مشروع xqueue .





الصورة مأخوذة من الوثائق .



يمكنك تشغيله على النحو التالي:



apt-get install libaio1 libaio-dev
apt-get install libmysqlclient-dev
pip3 install -r requirements.txt
python3 manage.py migrate
python3 manage.py runserver $xqueue_address


قد تحتاج إلى إنشاء ملف ~ / edx / edx.log



افتراضيًا ، لا يمنح xqueue xqueue-watcher محتويات الحزم ، أي الملفات التي تحتوي على حل للمشكلة ، ولكن روابط لهذه الملفات في تخزين الملفات. حتى لا تعتمد على تخزين الملفات ، يمكنك إرسال الملفات نفسها وتخزينها على نفس الجهاز الذي يعمل منه xqueue-watcher. 



إليك كيفية تعديل التعليمات البرمجية المصدر لتحقيق ذلك: تم استبدال



تنفيذ طريقة _upload في lms_interface.py بهذه الطريقة :



def _upload(file_to_upload, path, name):
    '''
    Upload file using the provided keyname.
    Returns:
        URL to access uploaded file
    '''
    full_path = os.path.join(path, name)
    return default_storage.save(full_path, file_to_upload)


إذا لم يتم توصيل تخزين الملفات ، فستقوم هذه الطريقة بحفظ الملف مع الحل إلى المسار $ queue_name / $ file_to_upload_hash.



في تنفيذ get_sumbission في ملف ext_interface.py ، اكتب بدلاً من هذا السطر :



xqueue_files = json.loads(submission.urls)
for xqueue_file in xqueue_files.keys():
    with open(xqueue_files[xqueue_file], 'r') as f:
        xqueue_files[xqueue_file] = f.readlines()


لن ننقل الروابط (المسارات) إلى الملفات ، بل ننقل محتوياتها.



يتم تنفيذ كل حل في حاوية عامل إرساء "لمرة واحدة" بموارد محدودة ، أي ، يتم إنشاء حاوية منفصلة لتنفيذ كل حل ، والذي يتم حذفه بعد انتهاء الاختبار. لإنشاء مثل هذه الحاويات وتنفيذ الأوامر فيها ، يتم استخدام portainer-api (في الواقع ، كغلاف فوق Docker API).



النتيجة



في هذا المقال ، تحدثت عن كيفية إنشاء نظام الاختبار ومهام النقل المستقل ، بالإضافة إلى تكامل هذا النظام مع المنصات التعليمية عبر الإنترنت التي تستخدم تقنية المصنف الخارجي. آمل أن يتم إطلاق دورة تدريبية باستخدام هذا النظام قريبًا ، وسيكون الجزء المتعلق بالتكامل مع الأنظمة الأساسية عبر الإنترنت مفيدًا لأولئك الذين يرغبون في إنشاء دورة تدريبية خاصة بهم دون اتصال بالإنترنت أو عبر الإنترنت.



All Articles