サーバーアプリ+API:Pythonによるcronジョブの実装

サーバー+APIアーキテクチャシナリオ」のドキュメントを参照してください。

Pythonでのサーバープロセス実装で使用する全ソースコードは、こちらのGitHubリポジトリでご覧いただけます。

アクセストークン取得する

HTTP要求をAuth0の/oauth/token APIエンドポイントに行うため、jsonurllib、およびurllib2のライブラリーを使用します。

以下が実装例です。

def main():
  import json, urllib, urllib2, httplib

  # Configuration Values
  domain = "{yourDomain}" # Your Auth0 Domain
  api_identifier = "API_IDENTIFIER" # API Identifier of your API
  client_id = "{yourClientId}" # Client ID of your Machine-to-Machine Application
  client_secret = "{yourClientSecret}" # Client Secret of your Machine to Machine Application
  api_url = "http://localhost:8080/timesheets/upload"
  grant_type = "client_credentials" # OAuth 2.0 flow to use

  # Get an access token from Auth0
  base_url = "https://{domain}".format(domain=domain)
  data = urllib.urlencode({'client_id': client_id,
                            'client_secret': client_secret,
                            'audience': api_identifier,
                            'grant_type': grant_type})
  req = urllib2.Request(base_url + "/oauth/token", data, headers={"Accept": "application/x-www-form-urlencoded"})
  response = urllib2.urlopen(req)
  resp_body = response.read()
  oauth = json.loads(resp_body)
  access_token = oauth['access_token']

# Standard boilerplate to call the main() function.
if __name__ == '__main__':
  main()

Was this helpful?

/

テストするには、コードを変更してaccess_token変数を表示し、python cron.pyを使用してプロセスを実行します。

APIを呼び出す

以下の手順で実装を行います。

  • タイムシートデータを含むJSONオブジェクトを構築し、timesheet変数に割り当てます。

  • urllib2.Requestを使用して、API URLとtimesheet変数コンテンツを要求本文に追加します。

  • Authorizationヘッダーを要求に追加します。

  • Content-Typeヘッダーをapplication/jsonに設定します。

  • urllib2.urlopenを使用してAPIを呼び出し、いくつかのエラー処理を追加します。json.loadsを使用して応答を取得し、コンソールに表示します。

以下が実装例です(簡潔にするため一部のコードを省略しています)。

def main():
  # import libraries - code omitted

  # Configuration Values - code omitted

  # Get an Access Token from Auth0 - code omitted

  #Post new timesheet to API
  timesheet = {'user_id': '007',
                          'date': '2017-05-10T17:40:20.095Z',
                          'project': 'StoreZero',
                          'hours': 5}
  req = urllib2.Request(api_url, data = json.dumps(timesheet))
  req.add_header('Authorization', 'Bearer ' + access_token)
  req.add_header('Content-Type', 'application/json')

  try:
    response = urllib2.urlopen(req)
    res = json.loads(response.read())
    print 'Created timesheet ' + str(res['id']) + ' for employee ' + str(res['user_id'])
  except urllib2.HTTPError, e:
    print 'HTTPError = ' + str(e.code) + ' ' + str(e.reason)
  except urllib2.URLError, e:
    print 'URLError = ' + str(e.reason)
  except httplib.HTTPException, e:
    print 'HTTPException'
  except Exception, e:
    print 'Generic Exception' + str(e)

# Standard boilerplate to call the main() function - code omitted

Was this helpful?

/

これをテストするには、APIが稼働しており、python cron.pyを使用してプロセスを実行していることを確認してください。

以上で完了です。