用github 帐号登录之tornado 实现

用github 帐号登录之tornado 实现,主要面向开发者的可以使用这个第三方登录。在gist 上发现的,直接拿来,简单修改一下。

用github 帐号登录之tornado 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# coding: utf-8
#
# Copyright (c) Alexandr Emelin. BSD license.
# All rights reserved.
#
"""
class GithubAuthHandler(BaseHandler, auth.GithubMixin):

    x_site_token = 'application'

    @tornado.web.asynchronous
    def get(self):
        redirect_uri = "{0}://{1}{2}".format(
            self.request.protocol,
            self.request.host,
            self.reverse_url("auth_github")
        )
        params = {
            'redirect_uri': redirect_uri,
            'client_id':    self.opts['github_client_id'],
            'state':        self.x_site_token
        }

        code = self.get_argument('code', None)

        # Seek the authorization
        if code:
            # For security reason, the state value (cross-site token) will be
            # retrieved from the query string.
            params.update({
                'client_secret': self.opts['github_client_secret'],
                'success_callback': self._on_auth,
                'error_callback': self._on_error,
                'code':  code,
                'state': self.get_argument('state', None)
            })
            self.get_authenticated_user(**params)
            return

        # Redirect for user authentication
        self.get_authenticated_user(**params)

    @coroutine
    def _on_auth(self, user, access_token=None):
        if not user:
            raise tornado.web.HTTPError(500, "Github auth failed")
        user_data, error = yield storage.get_or_create_user(
            self.db,
            user['email']
        )
        if error:
            raise tornado.web.HTTPError(500, "Auth failed")
        self.set_secure_cookie("user", tornado.escape.json_encode(user_data))
        self.redirect(self.reverse_url("main"))

    def _on_error(self, code, body=None, error=None):
        if body:
            logging.error(body)
        if error:
            logging.error(error)
        raise tornado.web.HTTPError(500, "Github auth failed")
"""

from tornado.escape import json_decode
import codecs
import json
import re

try:
    from urllib import urlencode
except ImportError:
    # python 3
    from urllib.parse import urlencode
    
from tornado import httpclient
from tornado.auth import OAuth2Mixin


class GithubMixin(OAuth2Mixin):
    """GitHub OAuth2 Authentication

    To authenticate with GitHub, first register your application at
    https://github.com/settings/applications/new to get the client ID and
    secret.
    """

    _API_BASE_HEADERS = {
        'Accept': 'application/json',
        'User-Agent': 'Tornado OAuth'
    }
    _OAUTH_ACCESS_TOKEN_URL = 'https://github.com/login/oauth/access_token'
    _OAUTH_AUTHORIZE_URL = 'https://github.com/login/oauth/authorize'
    _OAUTH_USER_URL = 'https://api.github.com/user?access_token='

    def get_authenticated_user(self, redirect_uri, client_id, state,
                               client_secret=None, code=None,
                               success_callback=None,
                               error_callback=None):
        """ Fetches the authenticated user

        :param redirect_uri: the redirect URI
        :param client_id: the client ID
        :param state: the unguessable random string to protect against
                      cross-site request forgery attacks
        :param client_secret: the client secret
        :param code: the response code from the server
        :param success_callback: the success callback used when fetching
                                 the access token succeeds
        :param error_callback: the callback used when fetching the access
                               token fails
        """
        if code:
            self._fetch_access_token(
                code,
                success_callback,
                error_callback,
                redirect_uri,
                client_id,
                client_secret,
                state
            )

            return

        params = {
            'redirect_uri': redirect_uri,
            'client_id':    client_id,
            'extra_params': {
                'state': state
            }
        }

        self.authorize_redirect(**params)

    def _fetch_access_token(self, code, success_callback, error_callback,
                            redirect_uri, client_id, client_secret, state):
        """ Fetches the access token.

        :param code: the response code from the server
        :param success_callback: the success callback used when fetching
                                 the access token succeeds
        :param error_callback: the callback used when fetching the access
                               token fails
        :param redirect_uri: the redirect URI
        :param client_id: the client ID
        :param client_secret: the client secret
        :param state: the unguessable random string to protect against
                      cross-site request forgery attacks
        :return:
        """
        if not (client_secret and success_callback and error_callback):
            raise ValueError(
                'The client secret or any callbacks are undefined.'
            )

        params = {
            'code':          code,
            'redirect_url':  redirect_uri,
            'client_id':     client_id,
            'client_secret': client_secret,
            'state':         state
        }

        http = httpclient.AsyncHTTPClient()

        callback_sharing_data = {}

        def use_error_callback(response, decoded_body):
            data = {
                'code': response.code,
                'body': decoded_body
            }

            if response.error:
                data['error'] = response.error

            error_callback(**data)

        def decode_response_body(response):
            """ Decodes the JSON-format response body

            :param response: the response object
            :type response: tornado.httpclient.HTTPResponse

            :return: the decoded data
            """
            # Fix GitHub response.
            if response.body:
                body = codecs.decode(response.body, 'ascii')
                body = re.sub('"', '\"', body)
                body = re.sub("'", '"', body)
                body = json.loads(body)
            else:
                body = ''

            if response.error:
                use_error_callback(response, body)

                return None

            return body

        def on_authenticate(response):
            """ The callback handling the authentication

            :param response: the response object
            :type response: tornado.httpclient.HTTPResponse
            """
            body = decode_response_body(response)

            if not body:
                return

            if 'access_token' not in body:
                use_error_callback(response, body)

                return

            callback_sharing_data['access_token'] = body['access_token']

            http.fetch(
                '{}{}'.format(
                    self._OAUTH_USER_URL, callback_sharing_data['access_token']
                ),
                on_fetching_user_information,
                headers=self._API_BASE_HEADERS
            )

        def on_fetching_user_information(response):
            """ The callback handling the data after fetching the user info

            :param response: the response object
            :type response: tornado.httpclient.HTTPResponse
            """
            # Fix GitHub response.
            user = decode_response_body(response)

            if not user:
                return

            success_callback(user, callback_sharing_data['access_token'])

        # Request the access token.
        http.fetch(
            self._OAUTH_ACCESS_TOKEN_URL,
            on_authenticate,
            method='POST',
            body=urlencode(params),
            headers=self._API_BASE_HEADERS
        )

本文网址: https://pylist.com/topic/82.html 转摘请注明来源

Suggested Topics

ssdb 全文搜索的实现

ssdb 作为key-value 数据库,底层没有提供全文搜索的功能,只能在应用层作检索。...

用 misaka 实现 gfm Markdown 格式

misaka 是python 世界性能最好的 markdown 解析库,gfm (GitHub Flavored Markdown)是 github 扩展的格式。可以通过下面的方式用 misaka 实现 gfm 解析...

Leave a Comment