Home > Aria Crescendo Documentation > Data Streaming > Example Data Streaming Code > Data Streaming Example Authenticator Code

Data Streaming Example Authenticator Code

Table of contents
No headers
import jwt  # to install: pip install PyJWT
import requests  # to install: pip install requests
import time
from requests.auth import AuthBase

class AriaOauth2(AuthBase):
    Authenticator that retrieves access token from Aria auth service.
    There does not appear to be an off-the-shelf lib that does this at this time, a value that can be assigned
    to an http request's "auth" parameter that retrieves the access token using an Oauth2 grant type request.

    def __init__(self, client_id, client_secret, oauth2_url):
        :param client_id:  The Aria client's id
        :param client_secret:  The Aria client's secret
        :param oauth2_url:  The URL of the authentication service's token endpoint
        self.client_id = client_id
        self.client_secret = client_secret
        self.oauth2_url = oauth2_url
        self.token = None
        self.expires = None

    def __call__(self, r):
        If we don't have an access token in hand, or it has expired, get a new token from the auth service.
        Then set the access token in the request's Authorization header.
        now = time.time()
        if not self.token or now > self.expires:
            self.token = self.get_fresh_token()
            self.expires = now + self.token['expires_in']

        r.headers['Authorization'] = 'Bearer ' + self.token['access_token']
        return r

    def get_fresh_token(self):
        Get an authorization header that contains a valid access token for the client

        # The token used to sign the grant request itself
        jwt_token = jwt.encode({'iat': int(time.time())}, self.client_secret, algorithm='HS256')

        # Make the request to the auth service to get an access token for the client
        resp = requests.post(
            data = {'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': jwt_token},
            verify = False,
            allow_redirects = False

        json_resp = resp.json()

        if 'access_token' in json_resp:
            return json_resp
        elif 'error' in json_resp:
            raise Exception("OAuth failed: %s: %s" % (json_resp['error'], json_resp.get('error_description')))
            raise Exception("OAuth failed: %s" % (str(json_resp)))

Download .py file

Last modified


This page has no custom tags.


This page has no classifications.