datetime:2020/5/13 10:33
author:nzb

登录相关(基于jwt token登录;单个用户表或多个用户表)

  • urls.py

      path("login/", CustomLoginJSONWebToken.as_view()),
    
  • settings.py

# 验证中间件
MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'corsheaders.middleware.CorsMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    # 'corsheaders.middleware.CorsPostCsrfMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
    'middlewares.ValidTokenMiddleware',     # 换台设备需重新登录
]

# drf框架的配置信息
REST_FRAMEWORK = {
    # 默认分页
    'DEFAULT_PAGINATION_CLASS': 'utils.PagePagination',
    'DATETIME_FORMAT': '%Y-%m-%d %H:%M:%S',
    # 自定义token内含值
    'JWT_PAYLOAD_HANDLER': 'utils.custom_payload_handler',
    # 异常处理
    'EXCEPTION_HANDLER': 'utils.exception_handler.exception_handler',       # 见01-重写异常处理手柄
    # 用户登陆认证方式
    'DEFAULT_AUTHENTICATION_CLASSES': (
    #单个用户表配置
        # 'rest_framework_jwt.authentication.JSONWebTokenAuthentication',
        # 'rest_framework.authentication.SessionAuthentication',
        # 'rest_framework.authentication.BasicAuthentication',
    # 多个用户表配置
    'authentication.CustomAuthenticate',  # 自定义 JSON Token Authentication

    ),
    # 获取用户的secret_key
    # 没用,需要也是放到下面的 JWT_AUTH 中,不设置是获取上面的 SECRET_KEY 用于签名加密(不能泄露)
    # 'JWT_GET_USER_SECRET_KEY': 'utils.jwt_get_user_secret',

}
# jwt载荷中的有效期设置(from rest_framework_jwt)
token_time =  datetime.timedelta(days=365)
JWT_AUTH = {
    'JWT_EXPIRATION_DELTA': token_time,             # 有效期设置
    'JWT_REFRESH_EXPIRATION_DELTA': token_time,     # 刷新有效期
    'JWT_RESPONSE_PAYLOAD_HANDLER': '.utils.custom_jwt_response_payload_handler',   # 自定义返回认证通过后的数据
}
  • utils.py
from django.contrib.auth import authenticate, get_user_model
from rest_framework_jwt.utils import jwt_encode_handler
from rest_framework_jwt.settings import api_settings


def custom_payload_handler(user):
    '''自定义token内含值
    :param user:  user auth model
    :return: 计算token的基本信息
    '''

    # jwt token payload的基本信息: user_id 用户主键id, 用户first_name, 用户电话号码 phone
    payload = {
        'user_id': user.pk,
        'username': user.username,
        'exp': datetime.datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA,
        # 'phone': user.phone
        'user_secret': str(uuid.uuid4())    # uuid
    }

    if api_settings.JWT_ALLOW_REFRESH:
        payload['orig_iat'] = timegm(
            datetime.datetime.utcnow().utctimetuple()
        )

    if api_settings.JWT_AUDIENCE is not None:
        payload['aud'] = api_settings.JWT_AUDIENCE

    if api_settings.JWT_ISSUER is not None:
        payload['iss'] = api_settings.JWT_ISSUER

    return payload

def generate_user_token(user):
    """生成用户token"""
    user_model = get_user_model()

    payload = custom_payload_handler(user)
    token = jwt_encode_handler(payload)

    return token

def custom_jwt_response_payload_handler(token, user=None):
    """
    自定义jwt认证成功返回的数据
    :token  返回的jwt
    :user   当前登录的用户信息[对象]
    :request 当前本次客户端提交过来的数据

    Example:
        return {
            'token': token,
            'user': UserSerializer(user, context={'request': request}).data
        }
    """
    data = {
        'code': 10000,
        'results': {
            'token': token,
            'id': user.id,
            'username': user.username,
            ....
        }
    }
    return data

def jwt_get_secret_key(payload=None):
    """获取用户的secret_key(例如uuid)
    For enhanced security you may want to use a secret key based on user.

    This way you have an option to logout only this user if:
        - token is compromised
        - password is changed
        - etc.
    """
    return user.user_secret

# 分页
class PagePagination(LimitOffsetPagination):
    """分页"""
    # page_size = 1
    limit_query_param = 'limit'
    offset_query_param = 'offset'
    max_limit = 20

    def get_paginated_response(self, data):
        ret = dict([
            ('code', 10000,
            ('errMsg', ''),
            ('count', self.count),
            ('previous', self.get_previous_link()),
            ('next', self.get_next_link()),
        ])
        if isinstance(data, dict):
            ret.update(**data)
        else:
            ret.update({
                'results': data
            })
        return Response(OrderedDict(ret))
  • views.py
from rest_framework_jwt.views import JSONWebTokenAPIView
from rest_framework_jwt.settings import api_settings

from utils import custom_jwt_response_payload_handler

class CustomLoginJSONWebToken(JSONWebTokenAPIView):
    """
    自定义登录
    """
    serializer_class = CustomJSONWebTokenSerializer

    def post(self, request, *args, **kwargs):
        serializer = self.get_serializer(data=request.data)

        if serializer.is_valid():
            user = serializer.object.get('user') or request.user
            token = serializer.object.get('token')
            # 可自定义返回认证成功后的数据,settings中的JWT_AUTH中的JWT_RESPONSE_PAYLOAD_HANDLER设置
            # 这里还可以写需要的相应逻辑
            response_data = custom_jwt_response_payload_handler(token, user, request)
            response = Response(response_data)
            if api_settings.JWT_AUTH_COOKIE:
                expiration = (datetime.utcnow() +
                              api_settings.JWT_EXPIRATION_DELTA)
                response.set_cookie(api_settings.JWT_AUTH_COOKIE,
                                    token,
                                    expires=expiration,
                                    httponly=True)
            return response

        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
  • serializers.py
from rest_framework_jwt.serializers import JSONWebTokenSerializer
from django.contrib.auth import authenticate, get_user_model
from utils import generate_user_token

class CustomJSONWebTokenSerializer(JSONWebTokenSerializer):
    """自定义账号密码登录序列号"""

    def validate(self, attrs):
        # 密码账户验证
        username = attrs.get(self.username_field)
        password = attrs.get('password', None)

        credentials = {
            self.username_field: username,
            'password': password
        }
        if all(credentials.values()):
            user = authenticate(**credentials)

            if user:
                if not user.is_active:
                    msg = {'code': 10996, 'errMsg': '用户被冻结'}
                    raise serializers.ValidationError(msg)

                # payload = jwt_payload_handler(user)

                return {
                    # 'token': jwt_encode_handler(payload),
                    'token': generate_user_token(user),
                    'user': user
                }
            else:
                # 若认证失败
                msg = {'code': 10991, 'errMsg': '用户名和密码错误'}
                raise serializers.ValidationError(msg)
        else:
            msg = {'code': 10997, 'errMsg': 'username和password为必填字段'}
            raise serializers.ValidationError(msg)


from rest_framework_jwt.serializers import VerifyJSONWebTokenSerializer

class CustomVerifyJSONWebTokenSerializer(VerifyJSONWebTokenSerializer):
    """
    多个用户表时自定义验证
    中间件中使用
    """
    def _check_user(self, payload):
        # print("-------自定义验证的payload:", payload)
        username = jwt_get_username_from_payload(payload)

        if not username:
            msg = _('Invalid payload.')
            raise serializers.ValidationError(msg)

        # Make sure user exists
        token_type = payload.get("type", None)
        if token_type == "manager":
            try:
            # 表1
                user = UserManager.objects.get_by_natural_key(username)
            except UserManager.DoesNotExist:
                msg = _("User doesn't exist.")
                raise serializers.ValidationError(msg)
            if not user.is_active:
                msg = _('User account is disabled.')
                raise serializers.ValidationError(msg)
        else:
            try:
            # 表2
                user = UserStuInfo.objects.filter(id=payload.get("user_id", None), sno=payload.get("sno", None), is_del=False).first()
            except UserStuInfo.DoesNotExist:
                msg = _("User doesn't exist.")
                raise serializers.ValidationError(msg)

        return user
  • authentication.py(多个用户表时的登录验证)
from rest_framework_jwt.authentication import JSONWebTokenAuthentication, jwt_get_username_from_payload


class CustomAuthenticate(JSONWebTokenAuthentication):
    """多个用户表自定义设置登录选项"""

    def authenticate_credentials(self, payload):
        """
        Returns an active user that matches the payload's user id and email.
        """
        # User = get_user_model()
        username = jwt_get_username_from_payload(payload)

        # if

        if not username:
            msg = _('Invalid payload.')
            raise exceptions.AuthenticationFailed(msg)
        token_type = payload.get("type", None)
        if token_type == "manager":
            try:
                # 举例用户表1
                user = UserManager.objects.get_by_natural_key(username)
            except user.DoesNotExist:
                msg = _('Invalid signature.')
                raise exceptions.AuthenticationFailed(msg)
            if not user.is_active:
                msg = _('User account is disabled.')
                raise exceptions.AuthenticationFailed(msg)
        else:
            try:
                # 举例用户表2
                user = UserStuInfo.objects.filter(id=payload.get("user_id", None), sno=payload.get("sno", None), is_del=False).first()
            except user.DoesNotExist:
                msg = _('Invalid signature.')
                raise exceptions.AuthenticationFailed(msg)
        return user
  • middleware.py
from uuid import uuid4
import json

from django.http import HttpResponse
from jwt import InvalidSignatureError
from rest_framework.exceptions import ValidationError
from django.utils.deprecation import MiddlewareMixin
from rest_framework_jwt.utils import jwt_decode_handler

from authentication import CustomVerifyJSONWebTokenSerializer

# 1.每次登录 response 处理 记录 jwt
# 2.每次请求判断 jwt是否与表中相等(相当于用户异设备登录获取了新的jwt)  不等 就修改uuid


class ValidTokenMiddleware(MiddlewareMixin):

    def process_request(self, request):
        # 用于处理 所有带 jwt 的请求
        jwt_token = request.META.get('HTTP_AUTHORIZATION', None)
        if jwt_token is not None and jwt_token != '':
            data = {
                'token': request.META['HTTP_AUTHORIZATION'].split(' ')[1],
            }
            try:
                # valid_data = VerifyJSONWebTokenSerializer().validate(data)        # 原来的
                valid_data = CustomVerifyJSONWebTokenSerializer().validate(data)    # 多用户自定义后的
                user = valid_data['user']
                # if user:
                    # print("------------请求时用户的uuid:{0}".format(user.user_secret))
            except (InvalidSignatureError, ValidationError):
                # 找不到用户
                data = json.dumps({"code": 10000, "errMsg": "用户未登录"})
                return HttpResponse(data, content_type='application/json', status=400)
            # if user.user_jwt != data['token']:
             # 解析token,这里面就有获取用户的user_secret,~~所以需要重写jwt_get_secret_key(不需要)~~
            decode_token = jwt_decode_handler(data['token'])       
            # print("------------请求时带的token:{0}".format(decode_token))
            if not user:
                data = json.dumps({"code": 10000, "errMsg": "用户未登录"})
                return HttpResponse(data, content_type='application/json', status=400)
            elif str(user.user_secret) != decode_token.get("user_secret"):  
                user.user_secret = uuid4()
                user.save()
                data = json.dumps({"code": 10000, "errMsg": "用户未登录"})
                return HttpResponse(data, content_type='application/json', status=400)

    def process_response(self, request, response):
        # 仅用于处理 login请求
        # print("----------", request.META['PATH_INFO'])

        MANAGER_LOGIN_PATH = "/api/teacher/login/"
        STUDENT_LOGIN_PATH = "/api/student/login/"

        if request.META['PATH_INFO'] in (MANAGER_LOGIN_PATH, STUDENT_LOGIN_PATH):
            try:
                rep_data = response.data
            except AttributeError as e:
                print("报错信息:", e.args)
            results = rep_data.get('results', None)
            if results:
                # valid_data = VerifyJSONWebTokenSerializer().validate(results)         # 原来的
                valid_data = CustomVerifyJSONWebTokenSerializer().validate(results)     # 多用户自定义后的
                user = valid_data['user']
                # user.user_jwt = rep_data['results']['token']
                decode_token = jwt_decode_handler(rep_data['results']['token'])
                # print("--------登录后的token:", decode_token)
                user.user_secret = decode_token.get("user_secret")
                user.save()
                return response
            else:
                return response
        else:
            return response

results matching ""

    No results matching ""