أعلم ، أعلم ، ربما تفكر في "ماذا ، مرة أخرى؟!"
نعم ، لقد كتبوا بالفعل في Habré عدة مرات عن إطار FastAPI . لكنني أقترح النظر في هذه الأداة بمزيد من التفصيل قليلاً وكتابة واجهة برمجة التطبيقات الخاصة بـ Habr المصغر الخاص بك بدون كارما وتصنيفات ، ولكن
مخطط قاعدة البيانات وعمليات الترحيل
بادئ ذي بدء ، باستخدام لغة تعبير SQLAlchemy ، سنصف مخطط قاعدة البيانات. لنقم بإنشاء ملف Models / users.py :
import sqlalchemy
from sqlalchemy.dialects.postgresql import UUID
metadata = sqlalchemy.MetaData()
users_table = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("email", sqlalchemy.String(40), unique=True, index=True),
sqlalchemy.Column("name", sqlalchemy.String(100)),
sqlalchemy.Column("hashed_password", sqlalchemy.String()),
sqlalchemy.Column(
"is_active",
sqlalchemy.Boolean(),
server_default=sqlalchemy.sql.expression.true(),
nullable=False,
),
)
tokens_table = sqlalchemy.Table(
"tokens",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column(
"token",
UUID(as_uuid=False),
server_default=sqlalchemy.text("uuid_generate_v4()"),
unique=True,
nullable=False,
index=True,
),
sqlalchemy.Column("expires", sqlalchemy.DateTime()),
sqlalchemy.Column("user_id", sqlalchemy.ForeignKey("users.id")),
)
و نماذج / ملف posts.py :
import sqlalchemy
from .users import users_table
metadata = sqlalchemy.MetaData()
posts_table = sqlalchemy.Table(
"posts",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("user_id", sqlalchemy.ForeignKey(users_table.c.id)),
sqlalchemy.Column("created_at", sqlalchemy.DateTime()),
sqlalchemy.Column("title", sqlalchemy.String(100)),
sqlalchemy.Column("content", sqlalchemy.Text()),
)
لأتمتة عمليات ترحيل قاعدة البيانات ، قم بتثبيت alembic :
$ pip install alembic
لتهيئة Alembic ، قم بتشغيل:
$ alembic init migrations
وهذا الأمر يخلق في الدليل الحالي alembic.ini ملف و الهجرات الدليل يتضمن ما يلي:
- إصدارات الدليل حيث سيتم تخزين ملفات الهجرة
- env.py النصي الذي يدير عندما يتم استدعاء الإنبيق
- ملف script.py.mako يحتوي على نموذج لعمليات الترحيل الجديدة.
سنشير إلى عنوان url لقاعدة البيانات الخاصة بنا ، لهذا ، في ملف alembic.ini ، أضف السطر:
sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASS)s@%(DB_HOST)s:5432/%(DB_NAME)s
يسمح لنا تنسيق ٪ (variable_name) s بتعيين قيم مختلفة للمتغيرات اعتمادًا على البيئة ، وتجاوزها في ملف env.py مثل هذا:
from os import environ
from alembic import context
from app.models import posts, users
# Alembic Config
# alembic.ini
config = context.config
section = config.config_ini_section
config.set_section_option(section, "DB_USER", environ.get("DB_USER"))
config.set_section_option(section, "DB_PASS", environ.get("DB_PASS"))
config.set_section_option(section, "DB_NAME", environ.get("DB_NAME"))
config.set_section_option(section, "DB_HOST", environ.get("DB_HOST"))
fileConfig(config.config_file_name)
target_metadata = [users.metadata, posts.metadata]
هنا نحصل على قيم DB_USER و DB_PASS و DB_NAME و DB_HOST من متغيرات البيئة. بالإضافة إلى ذلك ، يحدد ملف env.py البيانات الوصفية لقاعدة البيانات الخاصة بنا في سمة target_metadata ، والتي بدونها لن يتمكن Alembic من تحديد التغييرات التي يجب إجراؤها في قاعدة البيانات.
كل شيء جاهز ويمكننا إنشاء عمليات الترحيل وتحديث قاعدة البيانات:
$ alembic revision --autogenerate -m "Added required tables"
$ alembic upgrade head
نقوم بتشغيل التطبيق وربط قاعدة البيانات
لنقم بإنشاء ملف main.py :
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World"}
وقم بتشغيل التطبيق عن طريق تشغيل الأمر:
$ uvicorn main:app --reload
دعونا نتأكد من أن كل شيء يعمل كما ينبغي. افتح http://127.0.0.1:8000/ في المتصفح وشاهد
{"Hello": "World"}
للاتصال بقاعدة البيانات ، سنستخدم وحدة قواعد البيانات ، والتي تتيح لنا تنفيذ الاستعلامات بشكل غير متزامن.
دعونا تكوين و بدء التشغيل و shutdhown أحداث خدمتنا، حيث الاتصال والانفصال من قاعدة البيانات سوف يحدث. دعنا نعدل ملف main.py :
from os import environ
import databases
#
DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")
DB_NAME = "async-blogs"
SQLALCHEMY_DATABASE_URL = (
f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
)
# database,
database = databases.Database(SQLALCHEMY_DATABASE_URL)
app = FastAPI()
@app.on_event("startup")
async def startup():
#
await database.connect()
@app.on_event("shutdown")
async def shutdown():
#
await database.disconnect()
@app.get("/")
async def read_root():
# ,
query = (
select(
[
posts_table.c.id,
posts_table.c.created_at,
posts_table.c.title,
posts_table.c.content,
posts_table.c.user_id,
users_table.c.name.label("user_name"),
]
)
.select_from(posts_table.join(users_table))
.order_by(desc(posts_table.c.created_at))
)
return await database.fetch_all(query)
نفتح http://127.0.0.1:8000/ وإذا رأينا قائمة فارغة [] في الرد ، فحينئذٍ سارت الأمور على ما يرام ويمكننا المضي قدمًا.
طلب واستجابة التحقق من الصحة
سنقوم بتنفيذ إمكانية تسجيل المستخدم. للقيام بذلك ، نحتاج إلى التحقق من صحة طلبات واستجابات HTTP. لحل هذه المشكلة ، سوف نستخدم مكتبة pydantic :
pip install pydantic
أنشئ ملف schemas / users.py وأضف نموذجًا مسؤولاً عن التحقق من صحة نص الطلب:
from pydantic import BaseModel, EmailStr
class UserCreate(BaseModel):
""" sign-up """
email: EmailStr
name: str
password: str
لاحظ أن أنواع الحقول محددة باستخدام نوع التعليق التوضيحي. بالإضافة إلى أنواع البيانات المضمنة مثل int و str ، يقدم pydantic عددًا كبيرًا من الأنواع لتوفير التحقق الإضافي. على سبيل المثال ، يتحقق نوع EmailStr من أن القيمة المستلمة هي بريد إلكتروني صالح. لاستخدام نوع EmailStr ، تحتاج إلى تثبيت وحدة التحقق من البريد الإلكتروني :
pip install email-validator
يجب أن يحتوي نص الاستجابة على الحقول الخاصة به ، على سبيل المثال id و access_token ، لذلك دعونا نضيف النماذج المسؤولة عن إنشاء الاستجابة لملف schemas / users.py :
from typing import Optional
from pydantic import UUID4, BaseModel, EmailStr, Field, validator
class UserCreate(BaseModel):
""" sign-up """
email: EmailStr
name: str
password: str
class UserBase(BaseModel):
""" """
id: int
email: EmailStr
name: str
class TokenBase(BaseModel):
token: UUID4 = Field(..., alias="access_token")
expires: datetime
token_type: Optional[str] = "bearer"
class Config:
allow_population_by_field_name = True
@validator("token")
def hexlify_token(cls, value):
""" UUID hex """
return value.hex
class User(UserBase):
""" """
token: TokenBase = {}
لكل حقل في النموذج ، يمكنك كتابة مدقق مخصص . على سبيل المثال ، يحول hexlify_token قيمة UUID إلى سلسلة سداسية عشرية. تجدر الإشارة إلى أنه يمكنك استخدام فئة Field عندما تحتاج إلى تجاوز السلوك الافتراضي لحقل النموذج. على سبيل المثال ، الرمز المميز: UUID4 = الحقل (... ، الاسم المستعار = "access_token") يعيّن الاسم المستعار access_token لحقل الرمز المميز . للإشارة إلى أن الحقل مطلوب ، يتم تمرير قيمة خاصة - ... ( علامة القطع ) كمعامل أول .
أضف ملف utils / users.py ، حيث سننشئ الطرق المطلوبة لكتابة مستخدم إلى قاعدة البيانات:
import hashlib
import random
import string
from datetime import datetime, timedelta
from sqlalchemy import and_
from app.models.database import database
from app.models.users import tokens_table, users_table
from app.schemas import users as user_schema
def get_random_string(length=12):
""" , """
return "".join(random.choice(string.ascii_letters) for _ in range(length))
def hash_password(password: str, salt: str = None):
""" """
if salt is None:
salt = get_random_string()
enc = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000)
return enc.hex()
def validate_password(password: str, hashed_password: str):
""" , """
salt, hashed = hashed_password.split("$")
return hash_password(password, salt) == hashed
async def get_user_by_email(email: str):
""" """
query = users_table.select().where(users_table.c.email == email)
return await database.fetch_one(query)
async def get_user_by_token(token: str):
""" """
query = tokens_table.join(users_table).select().where(
and_(
tokens_table.c.token == token,
tokens_table.c.expires > datetime.now()
)
)
return await database.fetch_one(query)
async def create_user_token(user_id: int):
""" user_id """
query = (
tokens_table.insert()
.values(expires=datetime.now() + timedelta(weeks=2), user_id=user_id)
.returning(tokens_table.c.token, tokens_table.c.expires)
)
return await database.fetch_one(query)
async def create_user(user: user_schema.UserCreate):
""" """
salt = get_random_string()
hashed_password = hash_password(user.password, salt)
query = users_table.insert().values(
email=user.email, name=user.name, hashed_password=f"{salt}${hashed_password}"
)
user_id = await database.execute(query)
token = await create_user_token(user_id)
token_dict = {"token": token["token"], "expires": token["expires"]}
return {**user.dict(), "id": user_id, "is_active": True, "token": token_dict}
قم بإنشاء موجهات ملف / users.py وأضف مسار تسجيل ، مما يشير إلى أنه يتوقع نموذج CreateUser في الطلب ويعيد نموذج مستخدم :
from fastapi import APIRouter
from app.schemas import users
from app.utils import users as users_utils
router = APIRouter()
@router.post("/sign-up", response_model=users.User)
async def create_user(user: users.UserCreate):
db_user = await users_utils.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return await users_utils.create_user(user=user)
يبقى فقط توصيل المسارات من ملف أجهزة التوجيه / users.py . للقيام بذلك ، أضف الأسطر التالية إلى main.py :
from app.routers import users
app.include_router(users.router)
المصادقة والتحكم في الوصول
الآن بعد أن أصبح لدينا مستخدمون في قاعدة البيانات الخاصة بنا ، نحن جاهزون لإعداد مصادقة التطبيق. دعنا نضيف نقطة نهاية تأخذ اسم مستخدم وكلمة مرور وترجع رمزًا مميزًا. قم بتحديث ملف أجهزة التوجيه / users.py لإضافة :
from fastapi import Depends
from fastapi.security import OAuth2PasswordRequestForm
@router.post("/auth", response_model=users.TokenBase)
async def auth(form_data: OAuth2PasswordRequestForm = Depends()):
user = await users_utils.get_user_by_email(email=form_data.username)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
if not users_utils.validate_password(
password=form_data.password, hashed_password=user["hashed_password"]
):
raise HTTPException(status_code=400, detail="Incorrect email or password")
return await users_utils.create_user_token(user_id=user["id"])
في الوقت نفسه ، لا نحتاج إلى وصف نموذج الطلب بأنفسنا ، يوفر Fastapi فئة تبعية خاصة OAuth2PasswordRequestForm ، مما يجعل المسار يتوقع حقلين اسم مستخدم وكلمة مرور.
لتقييد الوصول إلى مسارات معينة للمستخدمين غير المصادق عليهم ، سنكتب طريقة التبعية. سيتحقق من أن الرمز المقدم ينتمي إلى المستخدم النشط ويعيد تفاصيل المستخدم. سيتيح لنا ذلك استخدام معلومات المستخدم على جميع المسارات التي تتطلب المصادقة. دعونا خلق تيلس / ملف dependecies.py :
from app.utils import users as users_utils
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth")
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = await users_utils.get_user_by_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not user["is_active"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
return user
يرجى ملاحظة أن التبعية قد تعتمد بدورها على تبعية أخرى. على سبيل المثال ، OAuth2PasswordBearer هي تبعية توضح لـ FastAPI أن المسار الحالي يتطلب مصادقة.
للتحقق من أن كل شيء يعمل كما هو متوقع ، أضف المسار / users / me ، الذي يعرض تفاصيل المستخدم الحالي. أضف الخطوط إلى أجهزة التوجيه / users.py :
from app.utils.dependencies import get_current_user
@router.get("/users/me", response_model=users.UserBase)
async def read_users_me(current_user: users.User = Depends(get_current_user)):
return current_user
الآن لدينا المسار / users / me ، والذي لا يستطيع الوصول إليه سوى المستخدمين المصادق عليهم.
كل شيء جاهز لإضافة إمكانية المستخدمين أخيرًا لإنشاء المنشورات وتحريرها:
utils / posts.py
from datetime import datetime
from app.models.database import database
from app.models.posts import posts_table
from app.models.users import users_table
from app.schemas import posts as post_schema
from sqlalchemy import desc, func, select
async def create_post(post: post_schema.PostModel, user):
query = (
posts_table.insert()
.values(
title=post.title,
content=post.content,
created_at=datetime.now(),
user_id=user["id"],
)
.returning(
posts_table.c.id,
posts_table.c.title,
posts_table.c.content,
posts_table.c.created_at,
)
)
post = await database.fetch_one(query)
# Convert to dict and add user_name key to it
post = dict(zip(post, post.values()))
post["user_name"] = user["name"]
return post
async def get_post(post_id: int):
query = (
select(
[
posts_table.c.id,
posts_table.c.created_at,
posts_table.c.title,
posts_table.c.content,
posts_table.c.user_id,
users_table.c.name.label("user_name"),
]
)
.select_from(posts_table.join(users_table))
.where(posts_table.c.id == post_id)
)
return await database.fetch_one(query)
async def get_posts(page: int):
max_per_page = 10
offset1 = (page - 1) * max_per_page
query = (
select(
[
posts_table.c.id,
posts_table.c.created_at,
posts_table.c.title,
posts_table.c.content,
posts_table.c.user_id,
users_table.c.name.label("user_name"),
]
)
.select_from(posts_table.join(users_table))
.order_by(desc(posts_table.c.created_at))
.limit(max_per_page)
.offset(offset1)
)
return await database.fetch_all(query)
async def get_posts_count():
query = select([func.count()]).select_from(posts_table)
return await database.fetch_val(query)
async def update_post(post_id: int, post: post_schema.PostModel):
query = (
posts_table.update()
.where(posts_table.c.id == post_id)
.values(title=post.title, content=post.content)
)
return await database.execute(query)
أجهزة التوجيه / posts.py
from app.schemas.posts import PostDetailsModel, PostModel
from app.schemas.users import User
from app.utils import posts as post_utils
from app.utils.dependencies import get_current_user
from fastapi import APIRouter, Depends, HTTPException, status
router = APIRouter()
@router.post("/posts", response_model=PostDetailsModel, status_code=201)
async def create_post(post: PostModel, current_user: User = Depends(get_current_user)):
post = await post_utils.create_post(post, current_user)
return post
@router.get("/posts")
async def get_posts(page: int = 1):
total_cout = await post_utils.get_posts_count()
posts = await post_utils.get_posts(page)
return {"total_count": total_cout, "results": posts}
@router.get("/posts/{post_id}", response_model=PostDetailsModel)
async def get_post(post_id: int):
return await post_utils.get_post(post_id)
@router.put("/posts/{post_id}", response_model=PostDetailsModel)
async def update_post(
post_id: int, post_data: PostModel, current_user=Depends(get_current_user)
):
post = await post_utils.get_post(post_id)
if post["user_id"] != current_user["id"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to modify this post",
)
await post_utils.update_post(post_id=post_id, post=post_data)
return await post_utils.get_post(post_id)
دعنا نربط مسارات جديدة عن طريق إضافة main.py
from app.routers import posts
app.include_router(posts.router)
اختبارات
سنكتب الاختبارات في pytest :
$ pip install pytest
لاختبار نقاط النهاية ، يوفر FastAPI أداة خاصة TestClient .
لنكتب اختبار نقطة نهاية لا يتطلب اتصالاً بقاعدة البيانات:
from app.main import app
from fastapi.testclient import TestClient
client = TestClient(app)
def test_health_check():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"Hello": "World"}
كما ترى ، كل شيء بسيط للغاية. تحتاج إلى تهيئة TestClient واستخدامه لاختبار طلبات HTTP.
لاختبار باقي نقاط النهاية ، تحتاج إلى إنشاء قاعدة بيانات اختبار. لنقوم بتحرير ملف main.py ، مضيفين تكوين قاعدة الاختبار إليه:
from os import environ
import databases
DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")
TESTING = environ.get("TESTING")
if TESTING:
#
DB_NAME = "async-blogs-temp-for-test"
TEST_SQLALCHEMY_DATABASE_URL = (
f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
)
database = databases.Database(TEST_SQLALCHEMY_DATABASE_URL)
else:
DB_NAME = "async-blogs"
SQLALCHEMY_DATABASE_URL = (
f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
)
database = databases.Database(SQLALCHEMY_DATABASE_URL)
ما زلنا نستخدم قاعدة بيانات "المدونات غير المتزامنة" لتطبيقنا . ولكن إذا تم تعيين قيمة متغير البيئة TESTING ، فسيتم استخدام قاعدة البيانات "async-blogs-temp-for-test" .
لإنشاء قاعدة بيانات "async-blogs-temp-for-test" تلقائيًا عند تشغيل الاختبارات وحذفها بعد تنفيذها ، دعنا ننشئ عنصرًا ثابتًا في ملف الاختبارات / conftest.py :
import os
import pytest
# `os.environ`,
os.environ['TESTING'] = 'True'
from alembic import command
from alembic.config import Config
from app.models import database
from sqlalchemy_utils import create_database, drop_database
@pytest.fixture(scope="module")
def temp_db():
create_database(database.TEST_SQLALCHEMY_DATABASE_URL) #
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
alembic_cfg = Config(os.path.join(base_dir, "alembic.ini")) # alembic
command.upgrade(alembic_cfg, "head") #
try:
yield database.TEST_SQLALCHEMY_DATABASE_URL
finally:
drop_database(database.TEST_SQLALCHEMY_DATABASE_URL) #
لإنشاء قاعدة البيانات وحذفها ، سنستخدم مكتبة sqlalchemy_utils .
باستخدام temp_db fixture في الاختبارات ، يمكننا اختبار جميع نقاط النهاية لتطبيقنا:
def test_sign_up(temp_db):
request_data = {
"email": "vader@deathstar.com",
"name": "Darth Vader",
"password": "rainbow"
}
with TestClient(app) as client:
response = client.post("/sign-up", json=request_data)
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["email"] == "vader@deathstar.com"
assert response.json()["name"] == "Darth"
assert response.json()["token"]["expires"] is not None
assert response.json()["token"]["access_token"] is not None
الاختبارات / test_posts.py
import asyncio
from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient
def test_create_post(temp_db):
user = UserCreate(
email="vader@deathstar.com",
name="Darth",
password="rainbow"
)
request_data = {
"title": "42",
"content": "Don't panic!"
}
with TestClient(app) as client:
# Create user and use his token to add new post
loop = asyncio.get_event_loop()
user_db = loop.run_until_complete(create_user(user))
response = client.post(
"/posts",
json=request_data,
headers={"Authorization": f"Bearer {user_db['token']['token']}"}
)
assert response.status_code == 201
assert response.json()["id"] == 1
assert response.json()["title"] == "42"
assert response.json()["content"] == "Don't panic!"
def test_create_post_forbidden_without_token(temp_db):
request_data = {
"title": "42",
"content": "Don't panic!"
}
with TestClient(app) as client:
response = client.post("/posts", json=request_data)
assert response.status_code == 401
def test_posts_list(temp_db):
with TestClient(app) as client:
response = client.get("/posts")
assert response.status_code == 200
assert response.json()["total_count"] == 1
assert response.json()["results"][0]["id"] == 1
assert response.json()["results"][0]["title"] == "42"
assert response.json()["results"][0]["content"] == "Don't panic!"
def test_post_detail(temp_db):
post_id = 1
with TestClient(app) as client:
response = client.get(f"/posts/{post_id}")
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["title"] == "42"
assert response.json()["content"] == "Don't panic!"
def test_update_post(temp_db):
post_id = 1
request_data = {
"title": "42",
"content": "Life? Don't talk to me about life."
}
with TestClient(app) as client:
# Create user token to add new post
loop = asyncio.get_event_loop()
token = loop.run_until_complete(create_user_token(user_id=1))
response = client.put(
f"/posts/{post_id}",
json=request_data,
headers={"Authorization": f"Bearer {token['token']}"}
)
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["title"] == "42"
assert response.json()["content"] == "Life? Don't talk to me about life."
def test_update_post_forbidden_without_token(temp_db):
post_id = 1
request_data = {
"title": "42",
"content": "Life? Don't talk to me about life."
}
with TestClient(app) as client:
response = client.put(f"/posts/{post_id}", json=request_data)
assert response.status_code == 401
الاختبارات / test_users.py
import asyncio
import pytest
from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient
def test_sign_up(temp_db):
request_data = {
"email": "vader@deathstar.com",
"name": "Darth",
"password": "rainbow"
}
with TestClient(app) as client:
response = client.post("/sign-up", json=request_data)
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["email"] == "vader@deathstar.com"
assert response.json()["name"] == "Darth"
assert response.json()["token"]["expires"] is not None
assert response.json()["token"]["token"] is not None
def test_login(temp_db):
request_data = {"username": "vader@deathstar.com", "password": "rainbow"}
with TestClient(app) as client:
response = client.post("/auth", data=request_data)
assert response.status_code == 200
assert response.json()["token_type"] == "bearer"
assert response.json()["expires"] is not None
assert response.json()["access_token"] is not None
def test_login_with_invalid_password(temp_db):
request_data = {"username": "vader@deathstar.com", "password": "unicorn"}
with TestClient(app) as client:
response = client.post("/auth", data=request_data)
assert response.status_code == 400
assert response.json()["detail"] == "Incorrect email or password"
def test_user_detail(temp_db):
with TestClient(app) as client:
# Create user token to see user info
loop = asyncio.get_event_loop()
token = loop.run_until_complete(create_user_token(user_id=1))
response = client.get(
"/users/me",
headers={"Authorization": f"Bearer {token['token']}"}
)
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["email"] == "vader@deathstar.com"
assert response.json()["name"] == "Darth"
def test_user_detail_forbidden_without_token(temp_db):
with TestClient(app) as client:
response = client.get("/users/me")
assert response.status_code == 401
@pytest.mark.freeze_time("2015-10-21")
def test_user_detail_forbidden_with_expired_token(temp_db, freezer):
user = UserCreate(
email="sidious@deathstar.com",
name="Palpatine",
password="unicorn"
)
with TestClient(app) as client:
# Create user and use expired token
loop = asyncio.get_event_loop()
user_db = loop.run_until_complete(create_user(user))
freezer.move_to("'2015-11-10'")
response = client.get(
"/users/me",
headers={"Authorization": f"Bearer {user_db['token']['token']}"}
)
assert response.status_code == 401
مصادر PS
هذا كل شيء ، يمكن عرض مستودع المصدر من المنشور على GitHub .