JWT авторизация. Часть 2. Реализация аутентификации в drf
В предыдщей части мы познакомились с понятием и структурой JWT токенов и узнали, как закодировать токены с помощью стандартной библиотеки python. В этой части мы реализуем систему аутентификации и авторизации в django-rest-framework на базе пакета PyJWT.
PyJWT это простая библиотека для кодированяи и декодирования JWT токенов. Интерфейс использования достаточно прост. Чтобы закодировать данные нам понадобится вызвать функцию, передав полезную нагрузку в виде словаря, секретный ключ и тип хэширующей функции.
>>> import jwt
>>> encoded = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS256')
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg'
>>> jwt.decode(encoded, 'secret', algorithms=['HS256'])
{'some': 'payload'}
Установить PyJWT можно следующим образом:
$ pipenv install pyjwt
или в случае с pip
$ pip3 install pyjwt
Создание API
Для создания схемы авторизация и аутентификации нам понадобится всего 2 эндпоинта
url | описание |
---|---|
/login | Выдача новых токенов по username/password |
/refresh | Обновление пары токенов |
Login
Создадим serializer, который принимает на вход имейл и пароль пользователя, валидирует правильность данных и отдает пару токенов.
# authentication/serializers.py
from datetime import datetime, timedelta
from django.contrib.auth import get_user_model
from rest_framework import serializers
from django.utils.translation import gettext_lazy as _
import jwt
UserModel = get_user_model()
class LoginSerializer(serializers.Serializer):
# ==== INPUT ====
email = serializers.EmailField(required=True, write_only=True)
password = serializers.CharField(required=True, write_only=True)
# ==== OUTPUT ====
access = serializers.CharField(read_only=True)
refresh = serializers.CharField(read_only=True)
email
имеет тип EmailField
c параметрами required=True
и write_only=True
. Это значит, что базовая валидация для это поля требует правильного формата имейла. В случае его отсутствия, сериализатор сообщит об ошибке.
password
имеет тип CharField
c параметрами required=True
и write_only=True
. Базовая валидация для это поля требует текст. В случае его отсутствия, сериализатор также сообщит об ошибке.
access
и refresh
это текстовые поля, которые будут переданы на выходе, когда данные пройду валидацию и создание, поэтому они имеют аргумент read_only=True
.
Теперь необходимо реализовать метод validate
, который проверяет введенные данные.
def validate(self, attrs):
# standard validation
validated_data = super().validate(attrs)
# validate email and password
email = validated_data['email']
password = validated_data['password']
error_msg = _('email or password are incorrect')
try:
user = UserModel.objects.get(email=email)
if not user.check_password(password):
raise serializers.ValidationError(error_msg)
validated_data['user'] = user
except UserModel.DoesNotExist:
raise serializers.ValidationError(error_msg)
return validated_data
super().validate(attrs)
запускает базовую валидацию на имеющиеся поля и их формат. Возвращает свалидированные данные. Затем с помощью UserModel.objects.get(email=email)
мы получаем пользователя с указанным имейлом. Если его не существует -- сообщаем об этом. В противном случае -- проверяем, что такой юзер имеет введенный пароль. Затем с помощью validated_data['user'] = user
добавляем инстанс пользователя в свалидированные данные, чтобы передать по цепочке методу create
.
Добавим некоторые константы для токенов:
JWT_SECRET = 'my_secret' # секретное слово для подписи
JWT_ACCESS_TTL = 60 * 5 # время жизни access токена в секундах (5 мин)
JWT_REFRESH_TTL = 3600 * 24 * 7 # время жизни refresh токена в секундах (неделя)
Теперь опишем метод create()
, который будет отдавать сгенерированные токены.
def create(self, validated_data):
access_payload = {
'iss': 'backend-api',
'user_id': validated_data['user'].id,
'exp': datetime.utcnow() + timedelta(seconds=JWT_ACCESS_TTL),
'type': 'access'
}
access = jwt.encode(payload=access_payload, key=JWT_SECRET)
refresh_payload = {
'iss': 'backend-api',
'user_id': validated_data['user'].id,
'exp': datetime.utcnow() + timedelta(seconds=JWT_REFRESH_TTL),
'type': 'refresh'
}
refresh = jwt.encode(payload=refresh_payload, key=JWT_SECRET)
return {
'access': access,
'refresh': refresh
}
В полезную нагрузку включаем айди пользователя, время, когда токен станет невалидным и тип токена - access
или refresh
.
Создадим LoginVeiw
и добавим в urls
# authentication/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from .serializers import LoginSerialize
class LoginView(APIView):
def post(self, request, format=None):
serializer = LoginSerializer(data=request.data)
if serializer.is_valid():
response_data = serializer.save()
return Response(response_data)
return Response(data=serializer.errors, status=status.HTTP_400_BAD_REQUEST)
# authentication/urls.py
from django.urls import path, re_path
from .views import LoginView
api_urls = [
path('login/', LoginView.as_view(), name='login'),
]
urlpatterns = api_urls
Refresh
По такому же принципу опишем refresh сериализатор и view. Только теперь в методе validate мы проверям валидность refresh токена.
# authentication/serializers.py
class RefreshSerializer(serializers.Serializer):
# ==== INPUT ====
refresh_token = serializers.CharField(required=True, write_only=True)
# ==== OUTPUT ====
access = serializers.CharField(read_only=True)
refresh = serializers.CharField(read_only=True)
def validate(self, attrs):
# standard validation
validated_data = super().validate(attrs)
# validate refresh
refresh_token = validated_data['refresh_token']
try:
payload = jwt.decode(refresh_token, JWT_SECRET)
if payload['type'] != 'refresh':
error_msg = {'refresh_token': _('Token type is not refresh!')}
raise serializers.ValidationError(error_msg)
validated_data['payload'] = payload
except jwt.ExpiredSignatureError:
error_msg = {'refresh_token': _('Refresh token is expired!')}
raise serializers.ValidationError(error_msg)
except jwt.InvalidTokenError:
error_msg = {'refresh_token': _('Refresh token is invalid!')}
raise serializers.ValidationError(error_msg)
return validated_data
def create(self, validated_data):
access_payload = {
'iss': 'backend-api',
'user_id': validated_data['payload']['user_id'],
'exp': datetime.utcnow() + timedelta(seconds=JWT_ACCESS_TTL),
'type': 'access'
}
access = jwt.encode(payload=access_payload, key=JWT_SECRET)
refresh_payload = {
'iss': 'backend-api',
'user_id': validated_data['payload']['user_id'],
'exp': datetime.utcnow() + timedelta(seconds=JWT_REFRESH_TTL),
'type': 'refresh'
}
refresh = jwt.encode(payload=refresh_payload, key=JWT_SECRET)
return {
'access': access,
'refresh': refresh
}
# authentication/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from .serializers import LoginSerializer, RefreshSerializer
class LoginView(APIView):
def post(self, request, format=None):
serializer = LoginSerializer(data=request.data)
if serializer.is_valid():
response_data = serializer.save()
return Response(response_data)
return Response(data=serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class RefreshView(APIView):
def post(self, request, format=None):
serializer = RefreshSerializer(data=request.data)
if serializer.is_valid():
response_data = serializer.save()
return Response(response_data)
return Response(data=serializer.errors, status=status.HTTP_400_BAD_REQUEST)
# authentication/urls.py
from django.urls import path, re_path
from .views import LoginView, RefreshView
api_urls = [
path('login/', LoginView.as_view(), name='login'),
path('refresh/', RefreshView.as_view(), name='refresh'),
]
urlpatterns = api_urls
Теперь протестим логин:

Логин с неправильными данными:


Эндпоинт для рефреша токенов:

И в случае, если рефреш токен невалидный:
