в этом FastAPI Приложение, которое мы используем OAuth2PasswordRequestForm
справиться OAuth 2.0 Процесс авторизации пароля. Этот метод авторизации позволяет клиенту получить токен доступа непосредственно с сервера авторизации, отправив имя пользователя и пароль. Обычно он используется, когда клиенту доверяют, например, устройству пользователя.
OAuth2PasswordRequestForm
Преимущества:существовать /items/
путь POST В запросе мы передаем OAuth2PasswordRequestForm
Зависит от получения данных формы. Эта форма включает в себя следующие поля:
username
: имя пользователяpassword
: парольscopes
: Запрошенный объем разрешений (необязательно)client_id
: Идентификатор клиентаclient_secret
: ключ клиентаgrant_type
: Тип авторизации, вот "password"
,Указывает процесс авторизации пароляиспользовать curl
Пример отправки запроса:
curl -X 'POST' \
'http://127.0.0.1:18081/items/' \
-H 'accept: application/json' \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'grant_type=password&username=user_test&password=user_password&scope=test_scope&client_id=test_client_id&client_secret=test_client_secret'
Этот запрос проходит application/x-www-form-urlencoded
Введите содержимое, отправьте данные аутентификации пользователя на сервер. После получения этой информации сервер может выполнить соответствующую обработку, такую как проверка личности пользователя, проверка личности пользователя и проверка личности пользователя. токенждать。
существоватьсерверная часть,get_items
Функция получает эти данные и затем может выполнить дальнейшую обработку, например:
наконец,Функция возвращает эту информацию в словарной форме.,В основном используется для презентацийиотлаживать。существовать В практическом применении,ты можешь вернутьсяодингенерироватьиздоступжетонили связанныйизсообщение об ошибке。
from __future__ import annotations
from typing import Annotated
import uvicorn
from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordRequestForm
app = FastAPI()
@app.post("/items/")
async def get_items(oauth2_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
"""
curl -X 'POST' \
'http://127.0.0.1:18081/items/' \
-H 'accept: application/json' \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'grant_type=password&username=user_test&password=user_password&scope=test_scope&client_id=test_client_id&client_secret=test_client_secret'
{
"grant_type": "password",
"username": "user_test",
"password": "user_password",
"scopes": [
"test_scope"
],
"client_id": "test_client_id",
"client_secret": "test_client_secret"
}
"""
print('username:', oauth2_data.username,
'\npassword:', oauth2_data.password,
'\nclient_id:', oauth2_data.client_id,
'\nclient_secret:', oauth2_data.client_secret,
'\nscopes:', oauth2_data.scopes,
'\ngrant_type:', oauth2_data.grant_type)
return dict(oauth2_data.__dict__.items())
if __name__ == '__main__':
uvicorn.run(app, host = '127.0.0.1', port = 18081)
from __future__ import annotations
import datetime
import jwt
import uvicorn
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
app = FastAPI()
SECRET_KEY = "your-secret-key"
ALGORITHMS = ["HS256", "HS384", "HS512"]
user_cache = {} # временный кэш
def check_token(token: str):
"""
исследовать token Это действительно?
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms = ALGORITHMS)
print('check_token payload:', payload)
if payload['username'] == user_cache[token]['username'] and payload['password'] == user_cache[token]['password']:
return True
else:
return False
except jwt.ExpiredSignatureError:
return False
except jwt.InvalidTokenError:
return False
@app.post('/generate_my_token')
def generate_token(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
"""
curl -X 'POST' \
'http://127.0.0.1:18081/generate_my_token' \
-H 'accept: application/json' \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'grant_type=password&username=test1&password=pwd1&scope=&client_id=string&client_secret=string'
{
"access_token":"eyJhbGciOiJIUzM4NCIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0MSJ9.9GbGTL3B9rQP6nOyJSbMnzr4n9qKnibbC_W0ANsEH9do_rNUfEVbigwWoF1w2ySn",
"algorithm":"HS384"
}
один JWT Токен в основном состоит из трех частей, разделенных точками (.)
Эти три части:
Заголовок
Полезная нагрузка
Подпись
Заголовок
Header даодин JSON объект, закодированный как Base64Url Нить:
echo "eyJhbGciOiJIUzM4NCIsInR5cCI6IkpXVCJ9"|base64 -d
{"alg":"HS384","typ":"JWT"}
Полезная нагрузка
Payload 也даодин JSON объект, закодированный как Base64Url нить
echo "eyJ1c2VybmFtZSI6InN0cmluZyIsInBhc3N3b3JkIjoic3RyaW5nIiwiY2xpZW50X2lkIjoic3RyaW5nIiwiY2xpZW50X3NlY3JldCI6InN0cmluZyIsInNjb3BlcyI6W10sImdyYW50X3R5cGUiOiJwYXNzd29yZCIsImFsZ29yaXRobSI6IkhTMjU2IiwidGltZXN0YW1wIjoiMjAyNC0xMS0xOVQxMjo0ODoxMi4wMTI4NTkifQ"|base64 -d
{"username":"string","password":"string","client_id":"string","client_secret":"string","scopes":[],"grant_type":"password","algorithm":"HS256","timestamp":"2024-11-19T12:48:12.012859"}
Подпись
Подпись используется для проверки того, что сообщение не было подделано во время доставки, а также для использованиязакрытого сообщения. ключ подписи из токена, его также можно проверить JWT из Кто отправитель?
"""
print('generate_token HEADER:', request.headers.items())
# Здесь должна быть проверка пользователяипарольиз逻辑
if form_data.username == "" or form_data.password == "":
raise HTTPException(status_code = 400, detail = "Incorrect username or password")
# в соответствии симя пользователь и пароль выбирают алгоритм шифрования
v = 0
for i in form_data.username + form_data.password:
v += ord(i)
enc_alg = ALGORITHMS[v % len(ALGORITHMS)]
payload = {
'username': form_data.username,
'password': form_data.password,
'client_id': form_data.client_id,
'client_secret': form_data.client_secret,
'scopes': form_data.scopes,
'grant_type': form_data.grant_type,
'algorithm': enc_alg,
'timestamp': datetime.datetime.now().isoformat()
}
# Простой пример: ограниченность JWT
token = jwt.encode(payload, SECRET_KEY, algorithm = enc_alg)
user_cache[token] = {'username': form_data.username,
'password': form_data.password,
'login_user': token,
'client_id': form_data.client_id,
'client_secret': form_data.client_secret,
'scopes': form_data.scopes,
'grant_type': form_data.grant_type}
print(check_token(token))
return {"access_token": token, "algorithm": enc_alg}
# OAuth2PasswordBearer Попробую извлечь из запроса заголовок изAuthorization Bearer жетон
# если не найденжетонилижетон Неправильный формат,оно вернетсяодин错误响应
# -H "Authorization: Bearer eyJhbGciOiJIUzM4NCIsInR5cCI6IkpXVCJ9.xxxxxx"
oauth2_pwd_bearer = OAuth2PasswordBearer(tokenUrl = 'generate_my_token')
async def get_current_user(token: str = Depends(oauth2_pwd_bearer)):
print('get_current_user, login_user:', token)
if token not in user_cache:
raise HTTPException(status_code = 401, detail = "Invalid token")
if not check_token(token):
raise HTTPException(status_code = 401, detail = "Token check failed")
return {'login-user': user_cache[token]}
@app.get('/items')
async def read_items(request: Request, login_user: str = Depends(get_current_user)):
"""
curl -X GET http://127.0.0.1:18081/items \
-H "Authorization: Bearer eyJhbGciOiJIUzM4NCIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0MSJ9.9GbGTL3B9rQP6nOyJSbMnzr4n9qKnibbC_W0ANsEH9do_rNUfEVbigwWoF1w2ySn"
"""
print('read_items HEADER:', request.headers.items())
return {'response': login_user}
if __name__ == '__main__':
uvicorn.run(app, host = '127.0.0.1', port = 18081)
/generate_my_token
)POST /generate_my_token
OAuth2PasswordRequestForm
Извлеките данные формы из запроса.check_token
get_current_user
OAuth2PasswordBearer
Выдержка из заголовка запроса изAuthorization JWT。check_token
проверять Токен./items
)GET /items
Depends(get_current_user)
Убедитесь, что действительны только JWT Только пользователи могут получить доступ.использовать ecc Ключ в качестве примера token Генерация и проверка
from __future__ import annotations
import datetime
import jwt
import uvicorn
from cryptography.hazmat.primitives import serialization
from fastapi import Depends, FastAPI,Request
from fastapi.security import OAuth2PasswordRequestForm
app = FastAPI()
# читатьзакрытый ключ
with open('oauth2_jwt_token_ecc_private.pem', 'rb') as f:
private_key = serialization.load_pem_private_key(
f.read(),
password = None,
)
with open('oauth2_jwt_token_ecc_public.pem', 'rb') as f:
public_key = serialization.load_pem_public_key(
f.read(),
)
def check_token(token: str):
"""
проверять token
"""
print('internal check token:', token)
try:
payload = jwt.decode(token, public_key, algorithms = ['ES512'])
print('check_token, payload:', payload)
return True
except jwt.exceptions.InvalidSignatureError:
print('check_token, InvalidSignatureError')
return False
except jwt.exceptions.ExpiredSignatureError:
print('check_token, ExpiredSignatureError')
return False
except jwt.exceptions.InvalidTokenError:
print('check_token, InvalidTokenError')
return False
@app.post('/generate_my_ecc_jwt_token')
async def generate_my_ecc_jwt_token(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
"""
Поддержка асимметричного алгоритма:
RS256 - RSA использовать SHA-256
RS384 - RSA использовать SHA-384
RS512 - RSA использовать SHA-512
ES256 - ECDSA использовать P-256 и SHA-256
ES384 - ECDSA использовать P-384 и SHA-384
ES512 - ECDSA использовать P-521 и SHA-512
PS256 - RSA SSA-PSS использовать SHA-256 и MGF1
PS384 - RSA SSA-PSS использовать SHA-384 и MGF1
PS512 - RSA SSA-PSS использовать SHA-512 и MGF1
curl -X 'POST' \
'http://127.0.0.1:18081/generate_my_ecc_jwt_token' \
-H 'accept: application/json' \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'grant_type=password&username=string&password=string&scope=&client_id=string&client_secret=string'
{
"access_token": "eyJhbGciOiJFUzUxMiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6InN0cmluZyIsInBhc3N3b3JkIjoic3RyaW5nIiwiY2xpZW50X2lkIjoic3RyaW5nIiwiY2xpZW50X3NlY3JldCI6InN0cmluZyIsInNjb3BlcyI6W10sImdyYW50X3R5cGUiOiJwYXNzd29yZCJ9.AH62PD4t1x3oE-tJXzy3KimxX6Rvz1uY8J8EtCx92tDg6bId98BJUP-dkL_46hB8JOq4VbwJO77xEh_YOiEHJs-tAXfPgIawtSXgtraY1yRVOoEAN4f-bqdhqOeloPKIE3xH5ly1AD5LvLH7qLUxLI6Geil16Y67QuDw6N7C0kJXZe6j"
}
"""
print('generate_my_ecc_jwt_token HEADER:', request.headers.items())
# генерировать JWT
payload = {
'username': form_data.username,
'password': form_data.password,
'client_id': form_data.client_id,
'client_secret': form_data.client_secret,
'scopes': form_data.scopes,
'grant_type': form_data.grant_type,
'algorithm': 'ES512',
'timestamp': datetime.datetime.now().isoformat()
}
token = jwt.encode(payload, private_key, algorithm = 'ES512')
print(check_token(token))
return {'access_token': token}
if __name__ == '__main__':
uvicorn.run(app, host = '127.0.0.1', port = 18081)
открытый ключ
-----BEGIN PUBLIC KEY-----
MIGbMBAGByqGSM49AgEGBSuBBAAjA4GGAAQAPcXcQJsSY3z+WaFTxkxNxVn6HRat
2ofsvGDbZ06wkCWc08BNXPLQ4COPAvBE1rT0wqS4MkMmgI0F2jfWUU0BEiwAr5h8
JqEOZFpxTrm2tG/TX8j7SJH2/LFxLF9riv8jWjJLqdiuBYKBrLF2cxSjpGyku3ef
WuQBuHs81LHOSjX7Wqk=
-----END PUBLIC KEY-----
закрытый ключ
-----BEGIN PRIVATE KEY-----
MIHuAgEAMBAGByqGSM49AgEGBSuBBAAjBIHWMIHTAgEBBEIBVp+PXoGods37Go4R
UIk6LKvBVisTB+OEKJVQb37omfHihXbKrj2AaWF34AH13/r0QNcRmhOfpb/0PjHp
eCjuK8yhgYkDgYYABAA9xdxAmxJjfP5ZoVPGTE3FWfodFq3ah+y8YNtnTrCQJZzT
wE1c8tDgI48C8ETWtPTCpLgyQyaAjQXaN9ZRTQESLACvmHwmoQ5kWnFOuba0b9Nf
yPtIkfb8sXEsX2uK/yNaMkup2K4FgoGssXZzFKOkbKS7d59a5AG4ezzUsc5KNfta
qQ==
-----END PRIVATE KEY-----