JWT авторизация. Часть 2. Реализация аутентификации в drf

В предыдщей части мы познакомились с понятием и структурой JWT токенов и узнали, как закодировать токены с помощью стандартной библиотеки python.  В этой части мы реализуем систему аутентификации и авторизации в django-rest-framework на базе пакета PyJWT.

PyJWT это простая библиотека для кодированяи и декодирования JWT токенов. Интерфейс использования достаточно прост. Чтобы закодировать данные нам понадобится вызвать функцию, передав полезную нагрузку в виде словаря, секретный ключ и тип хэширующей функции.

>>> import jwt
>>> encoded = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS256')
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg'

>>> jwt.decode(encoded, 'secret', algorithms=['HS256'])
{'some': 'payload'}

Установить PyJWT можно следующим образом:

$ pipenv install pyjwt

или в случае с pip

$ pip3 install pyjwt

Создание  API

Для создания схемы авторизация и аутентификации нам понадобится всего 2 эндпоинта

url описание
/login Выдача новых токенов по username/password
/refresh Обновление пары токенов

Login

Создадим serializer, который принимает на вход имейл и пароль пользователя, валидирует правильность данных и отдает пару токенов.

# authentication/serializers.py

from datetime import datetime, timedelta

from django.contrib.auth import get_user_model
from rest_framework import serializers
from django.utils.translation import gettext_lazy as _
import jwt


UserModel = get_user_model()

class LoginSerializer(serializers.Serializer):
    # ==== INPUT ====
    email = serializers.EmailField(required=True, write_only=True)
    password = serializers.CharField(required=True, write_only=True)
    # ==== OUTPUT ====
    access = serializers.CharField(read_only=True)
    refresh = serializers.CharField(read_only=True)

email имеет тип EmailField c параметрами required=True и write_only=True. Это значит, что базовая валидация для это поля требует правильного формата имейла. В случае его отсутствия, сериализатор сообщит об ошибке.

password имеет тип CharField c параметрами required=True и write_only=True. Базовая валидация для это поля требует текст. В случае его отсутствия, сериализатор также сообщит об ошибке.

access и refresh это текстовые поля, которые будут переданы на выходе, когда данные пройду валидацию и создание, поэтому они имеют аргумент read_only=True.

Теперь необходимо реализовать метод validate, который проверяет введенные данные.

    def validate(self, attrs):
        # standard validation
        validated_data = super().validate(attrs)

        # validate email and password
        email = validated_data['email']
        password = validated_data['password']
        error_msg = _('email or password are incorrect')
        try:
            user = UserModel.objects.get(email=email)
            if not user.check_password(password):
                raise serializers.ValidationError(error_msg)
            validated_data['user'] = user
        except UserModel.DoesNotExist:
            raise serializers.ValidationError(error_msg)

        return validated_data

super().validate(attrs) запускает базовую валидацию на имеющиеся поля и их формат. Возвращает свалидированные данные. Затем с помощью UserModel.objects.get(email=email) мы получаем пользователя с указанным имейлом. Если его не существует -- сообщаем об этом. В противном случае -- проверяем, что такой юзер имеет введенный пароль. Затем с помощью validated_data['user'] = user добавляем инстанс пользователя в свалидированные данные, чтобы передать по цепочке методу create.

Добавим некоторые константы для токенов:

JWT_SECRET = 'my_secret'  #   секретное слово для подписи
JWT_ACCESS_TTL = 60 * 5   # время жизни access токена в секундах (5 мин)
JWT_REFRESH_TTL = 3600 * 24 * 7 # время жизни refresh токена в секундах (неделя)

Теперь опишем метод create(), который будет отдавать сгенерированные токены.

    def create(self, validated_data):
        access_payload = {
            'iss': 'backend-api',
            'user_id': validated_data['user'].id,
            'exp': datetime.utcnow() + timedelta(seconds=JWT_ACCESS_TTL),
            'type': 'access'
        }
        access = jwt.encode(payload=access_payload, key=JWT_SECRET)

        refresh_payload = {
            'iss': 'backend-api',
            'user_id': validated_data['user'].id,
            'exp': datetime.utcnow() + timedelta(seconds=JWT_REFRESH_TTL),
            'type': 'refresh'
        }
        refresh = jwt.encode(payload=refresh_payload, key=JWT_SECRET)

        return {
            'access': access,
            'refresh': refresh
        }

В полезную нагрузку включаем айди пользователя, время, когда токен станет невалидным и тип токена - access или refresh.

Создадим LoginVeiw и добавим в urls

# authentication/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from .serializers import LoginSerialize


class LoginView(APIView):

    def post(self, request, format=None):
        serializer = LoginSerializer(data=request.data)
        if serializer.is_valid():
            response_data = serializer.save()
            return Response(response_data)

        return Response(data=serializer.errors, status=status.HTTP_400_BAD_REQUEST)

# authentication/urls.py
from django.urls import path, re_path

from .views import LoginView

api_urls = [
    path('login/', LoginView.as_view(), name='login'),
]

urlpatterns = api_urls

Refresh

По такому же принципу опишем refresh сериализатор и  view. Только теперь в методе  validate мы проверям валидность refresh токена.

# authentication/serializers.py
class RefreshSerializer(serializers.Serializer):
    # ==== INPUT ====
    refresh_token = serializers.CharField(required=True, write_only=True)
    # ==== OUTPUT ====
    access = serializers.CharField(read_only=True)
    refresh = serializers.CharField(read_only=True)

    def validate(self, attrs):
        # standard validation
        validated_data = super().validate(attrs)

        # validate refresh
        refresh_token = validated_data['refresh_token']
        try:
            payload = jwt.decode(refresh_token, JWT_SECRET)
            if payload['type'] != 'refresh':
                error_msg = {'refresh_token': _('Token type is not refresh!')}
                raise serializers.ValidationError(error_msg)
            validated_data['payload'] = payload
        except jwt.ExpiredSignatureError:
            error_msg = {'refresh_token': _('Refresh token is expired!')}
            raise serializers.ValidationError(error_msg)
        except jwt.InvalidTokenError:
            error_msg = {'refresh_token': _('Refresh token is invalid!')}
            raise serializers.ValidationError(error_msg)

        return validated_data

    def create(self, validated_data):
        access_payload = {
            'iss': 'backend-api',
            'user_id': validated_data['payload']['user_id'],
            'exp': datetime.utcnow() + timedelta(seconds=JWT_ACCESS_TTL),
            'type': 'access'
        }
        access = jwt.encode(payload=access_payload, key=JWT_SECRET)

        refresh_payload = {
            'iss': 'backend-api',
            'user_id': validated_data['payload']['user_id'],
            'exp': datetime.utcnow() + timedelta(seconds=JWT_REFRESH_TTL),
            'type': 'refresh'
        }
        refresh = jwt.encode(payload=refresh_payload, key=JWT_SECRET)

        return {
            'access': access,
            'refresh': refresh
        }

# authentication/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from .serializers import LoginSerializer, RefreshSerializer


class LoginView(APIView):

    def post(self, request, format=None):
        serializer = LoginSerializer(data=request.data)
        if serializer.is_valid():
            response_data = serializer.save()
            return Response(response_data)

        return Response(data=serializer.errors, status=status.HTTP_400_BAD_REQUEST)


class RefreshView(APIView):

    def post(self, request, format=None):
        serializer = RefreshSerializer(data=request.data)
        if serializer.is_valid():
            response_data = serializer.save()
            return Response(response_data)

        return Response(data=serializer.errors, status=status.HTTP_400_BAD_REQUEST)

# authentication/urls.py
from django.urls import path, re_path

from .views import LoginView, RefreshView

api_urls = [
    path('login/', LoginView.as_view(), name='login'),
    path('refresh/', RefreshView.as_view(), name='refresh'),

]

urlpatterns = api_urls

Теперь протестим логин:

Login с правильными данными

Логин с неправильными данными:

Login с неправильным форматом email (myemail@email)
Неправильный пароль

Эндпоинт для рефреша токенов:

Корректный рефреш токен

И в случае, если рефреш токен невалидный:

Невалидный рефшер токен