Python API Authorization

Sample Project

Download a sample project specific to this tutorial configured with your Auth0 API Keys.

System Requirements
  • flask 0.11.1
  • python-jose 1.3.2
  • flask-cors 3.0.2
Show requirements

To restrict access to the resources served by your API, a check needs to be made to determine whether the incoming request contains valid authorization information. There are various methods for including authorization information in a request, but for integration with Auth0, your API needs to check for a valid JSON Web Token (JWT). When users log into your application, they will receive an id_token and an access_token which are both JWTs. The specific JWT that needs to be sent to your API is the access_token.

Install the Dependencies

This quickstart demonstrates how to add authorization to your Python API using Flask. Add the following dependencies to your requirements.txt:

flask
python-jose
flask-cors

Create the JWT Validation Decorator

By default, your API will be set up to use RS256 as the algorithm for signing tokens. Since RS256 works by using a private/public keypair, tokens can be verified against the public key for your Auth0 account. This public key is accessible at https://YOUR_AUTH0_DOMAIN/.well-known/jwks.json.

Add a decorator which verifies the access_token against your JWKS.

import json
from os import environ as env, path
import urllib

from functools import wraps
from flask import Flask, request, jsonify, _app_ctx_stack
from flask_cors import cross_origin
from jose import jwt

auth0_domain = 'YOUR_AUTH0_DOMAIN'
api_audience = YOUR_API_AUDIENCE

app = Flask(__name__)


# Format error response and append status code

def handle_error(error, status_code):
    resp = jsonify(error)
    resp.status_code = status_code
    return resp

def get_token_auth_header():
    """Obtains the access token from the Authorization Header
    """
    auth = request.headers.get("Authorization", None)
    if not auth:
        return handle_error({"code": "authorization_header_missing",
                             "description":
                                 "Authorization header is expected"}, 401)

    parts = auth.split()

    if parts[0].lower() != "bearer":
        return handle_error({"code": "invalid_header",
                             "description":
                                 "Authorization header must start with"
                                 "Bearer"}, 401)
    elif len(parts) == 1:
        return handle_error({"code": "invalid_header",
                             "description": "Token not found"}, 401)
    elif len(parts) > 2:
        return handle_error({"code": "invalid_header",
                             "description": "Authorization header must be"
                                            "Bearer token"}, 401)

    token = parts[1]
    return token

def requires_auth(f):
    """Determines if the access token is valid
    """
    @wraps(f)
    def decorated(*args, **kwargs):
        token = get_token_auth_header()
        jsonurl = urllib.urlopen("https://"+AUTH0_DOMAIN+"/.well-known/jwks.json")
        jwks = json.loads(jsonurl.read())
        unverified_header = jwt.get_unverified_header(token)
        rsa_key = {}
        for key in jwks["keys"]:
            if key["kid"] == unverified_header["kid"]:
                rsa_key = {
                    "kty": key["kty"],
                    "kid": key["kid"],
                    "use": key["use"],
                    "n": key["n"],
                    "e": key["e"]
                }
        if rsa_key:
            try:
                payload = jwt.decode(
                    token,
                    rsa_key,
                    algorithms=unverified_header["alg"],
                    audience=API_AUDIENCE,
                    issuer="https://"+AUTH0_DOMAIN+"/"
                )
            except jwt.ExpiredSignatureError:
                return handle_error({"code": "token_expired",
                                     "description": "token is expired"}, 401)
            except jwt.JWTClaimsError:
                return handle_error({"code": "invalid_claims",
                                     "description": "incorrect claims,"
                                                    "please check the audience and issuer"}, 401)
            except Exception:
                return handle_error({"code": "invalid_header",
                                     "description": "Unable to parse authentication"
                                                    "token."}, 400)

            _app_ctx_stack.top.current_user = payload
            return f(*args, **kwargs)
        return handle_error({"code": "invalid_header",
                             "description": "Unable to find appropriate key"}, 400)
    return decorated

Use this Decorator in your Methods

You can now use the decorator in any routes that require authentication.

# Controllers API

# This doesn't need authentication
@app.route("/ping")
@cross_origin(headers=['Content-Type', 'Authorization'])
def ping():
    return "All good. You don't need to be authenticated to call this"

# This does need authentication
@app.route("/secured/ping")
@cross_origin(headers=['Content-Type', 'Authorization'])
@requires_auth
def securedPing():
    return "All good. You only get this message if you're authenticated"

Protect individual endpoints

Individual routes can be configured to look for a particular scope in the access_token by using the following:

def requires_scope(required_scope):
    """Determines if the required scope is present in the access token
    Args:
        required_scope (str): The scope required to access the resource
    """
    token = get_token_auth_header()
    unverified_claims = jwt.get_unverified_claims(token)
    token_scopes = unverified_claims["scope"].split()
    for token_scope in token_scopes:
        if token_scope == required_scope:
            return True
    return False

Then, establish what scopes are needed in order to access the route. In this case example:scope is used:

@APP.route("/secured/private/ping")
@cross_origin(headers=["Content-Type", "Authorization"])
@cross_origin(headers=["Access-Control-Allow-Origin", "*"])
@requires_auth
def secured_private_ping():
    """A valid access token and an appropriate scope are required to access this route
    """
    if requires_scope("example:scope"):
        return "All good. You're authenticated and the access token has the appropriate scope"
    return "You don't have access to this resource"

Make a Call to your API

You can now make requests to your secure API by providing the access_token as an Authorization header in your requests.


curl --request GET \
  --url http://localhost:8000/path_to_your_api \
  --header 'authorization: Bearer YOUR_ACCESS_TOKEN_HERE'
var client = new RestClient("http://localhost:8000/path_to_your_api");
var request = new RestRequest(Method.GET);
request.AddHeader("authorization", "Bearer YOUR_ACCESS_TOKEN_HERE");
IRestResponse response = client.Execute(request);
package main

import (
	"fmt"
	"net/http"
	"io/ioutil"
)

func main() {

	url := "http://localhost:8000/path_to_your_api"

	req, _ := http.NewRequest("GET", url, nil)

	req.Header.Add("authorization", "Bearer YOUR_ACCESS_TOKEN_HERE")

	res, _ := http.DefaultClient.Do(req)

	defer res.Body.Close()
	body, _ := ioutil.ReadAll(res.Body)

	fmt.Println(res)
	fmt.Println(string(body))

}
HttpResponse<String> response = Unirest.get("http://localhost:8000/path_to_your_api")
  .header("authorization", "Bearer YOUR_ACCESS_TOKEN_HERE")
  .asString();
var settings = {
  "async": true,
  "crossDomain": true,
  "url": "http://localhost:8000/path_to_your_api",
  "method": "GET",
  "headers": {
    "authorization": "Bearer YOUR_ACCESS_TOKEN_HERE"
  }
}

$.ajax(settings).done(function (response) {
  console.log(response);
});
var request = require("request");

var options = { method: 'GET',
  url: 'http://localhost:8000/path_to_your_api',
  headers: { authorization: 'Bearer YOUR_ACCESS_TOKEN_HERE' } };

request(options, function (error, response, body) {
  if (error) throw new Error(error);

  console.log(body);
});
#import <Foundation/Foundation.h>

NSDictionary *headers = @{ @"authorization": @"Bearer YOUR_ACCESS_TOKEN_HERE" };

NSMutableURLRequest *request = [NSMutableURLRequest requestWithURL:[NSURL URLWithString:@"http://localhost:8000/path_to_your_api"]
                                                       cachePolicy:NSURLRequestUseProtocolCachePolicy
                                                   timeoutInterval:10.0];
[request setHTTPMethod:@"GET"];
[request setAllHTTPHeaderFields:headers];

NSURLSession *session = [NSURLSession sharedSession];
NSURLSessionDataTask *dataTask = [session dataTaskWithRequest:request
                                            completionHandler:^(NSData *data, NSURLResponse *response, NSError *error) {
                                                if (error) {
                                                    NSLog(@"%@", error);
                                                } else {
                                                    NSHTTPURLResponse *httpResponse = (NSHTTPURLResponse *) response;
                                                    NSLog(@"%@", httpResponse);
                                                }
                                            }];
[dataTask resume];
$curl = curl_init();

curl_setopt_array($curl, array(
  CURLOPT_PORT => "8000",
  CURLOPT_URL => "http://localhost:8000/path_to_your_api",
  CURLOPT_RETURNTRANSFER => true,
  CURLOPT_ENCODING => "",
  CURLOPT_MAXREDIRS => 10,
  CURLOPT_TIMEOUT => 30,
  CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
  CURLOPT_CUSTOMREQUEST => "GET",
  CURLOPT_HTTPHEADER => array(
    "authorization: Bearer YOUR_ACCESS_TOKEN_HERE"
  ),
));

$response = curl_exec($curl);
$err = curl_error($curl);

curl_close($curl);

if ($err) {
  echo "cURL Error #:" . $err;
} else {
  echo $response;
}
import http.client

conn = http.client.HTTPConnection("localhost:8000")

headers = { 'authorization': "Bearer YOUR_ACCESS_TOKEN_HERE" }

conn.request("GET", "/path_to_your_api", headers=headers)

res = conn.getresponse()
data = res.read()

print(data.decode("utf-8"))
require 'uri'
require 'net/http'

url = URI("http://localhost:8000/path_to_your_api")

http = Net::HTTP.new(url.host, url.port)

request = Net::HTTP::Get.new(url)
request["authorization"] = 'Bearer YOUR_ACCESS_TOKEN_HERE'

response = http.request(request)
puts response.read_body
import Foundation

let headers = ["authorization": "Bearer YOUR_ACCESS_TOKEN_HERE"]

var request = NSMutableURLRequest(URL: NSURL(string: "http://localhost:8000/path_to_your_api")!,
                                        cachePolicy: .UseProtocolCachePolicy,
                                    timeoutInterval: 10.0)
request.HTTPMethod = "GET"
request.allHTTPHeaderFields = headers

let session = NSURLSession.sharedSession()
let dataTask = session.dataTaskWithRequest(request, completionHandler: { (data, response, error) -> Void in
  if (error != nil) {
    println(error)
  } else {
    let httpResponse = response as? NSHTTPURLResponse
    println(httpResponse)
  }
})

dataTask.resume()
Previous Tutorial
1. Getting Started
Use Auth0 for FREECreate free Account