Home > Aria Crescendo Documentation > Data Streaming > Example Data Streaming Code

Example Data Streaming Code

Following are fully functional examples, in Python3, for using the Aria Data Streaming web service. You can download a .zip file comprising all 5 of these Python3 scripts here: 20200624sample_datastreaming_python.zip


  pip install sseclient
  pip install PyJWT
  pip install requests

from sseclient import SSEClient
from aria_oauth2 import AriaOauth2
from status_store import StreamStatusStore
from demo_args import DemoCliArgs
from demo_handlers import SseHandlers

args = DemoCliArgs()  # get command line args for this demo
status_store = StreamStatusStore()

# Make an SSE message stream
messages = SSEClient(
    args.sse_service_url,  # URL of SSE service
    auth = AriaOauth2(args.aria_client_id, args.aria_client_secret, args.oauth2_url),  # authenticator
    last_id = status_store.get_last_id()  # if a last id was stored locally, start there

# Handle messages as they come in off the stream
handlers = SseHandlers()
for msg in messages:
    # Handle each message as it comes in

    # Call the status store with the latest event id.  If we crash, we will restart at this point.


import jwt  # to install: pip install PyJWT
import requests  # to install: pip install requests
import time
from requests.auth import AuthBase

class AriaOauth2(AuthBase):
    Authenticator that retrieves access token from Aria auth service.
    There does not appear to be an off-the-shelf lib that does this at this time, a value that can be assigned
    to an http request's "auth" parameter that retrieves the access token using an Oauth2 grant type request.

    def __init__(self, client_id, client_secret, oauth2_url):
        :param client_id:  The Aria client's id
        :param client_secret:  The Aria client's secret
        :param oauth2_url:  The URL of the authentication service's token endpoint
        self.client_id = client_id
        self.client_secret = client_secret
        self.oauth2_url = oauth2_url
        self.token = None
        self.expires = None

    def __call__(self, r):
        If we don't have an access token in hand, or it has expired, get a new token from the auth service.
        Then set the access token in the request's Authorization header.
        now = time.time()
        if not self.token or now > self.expires:
            self.token = self.get_fresh_token()
            self.expires = now + self.token['expires_in']

        r.headers['Authorization'] = 'Bearer ' + self.token['access_token']
        return r

    def get_fresh_token(self):
        Get an authorization header that contains a valid access token for the client

        # The token used to sign the grant request itself
        jwt_token = jwt.encode({'iat': int(time.time())}, self.client_secret, algorithm='HS256')

        # Make the request to the auth service to get an access token for the client
        resp = requests.post(
            data = {'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': jwt_token},
            verify = False,
            allow_redirects = False

        json_resp = resp.json()

        if 'access_token' in json_resp:
            return json_resp
        elif 'error' in json_resp:
            raise Exception("OAuth failed: %s: %s" % (json_resp['error'], json_resp.get('error_description')))
            raise Exception("OAuth failed: %s" % (str(json_resp)))


import argparse

class DemoCliArgs(object):
    Comm  These inputs are required to set up an SSE client stream:
        * URL of an SSE service
        * URL of an OAuth2 service (for authentication)
        * Aria client ID (for authentication)
        * Aria client secret (for authentication)

    def __init__(self):
        # Define the command line argument parser for this example code
        parser = argparse.ArgumentParser()
        parser.add_argument('-e', '--sse',      help='URL of a Server-Sent Events source')
        parser.add_argument('-o', '--oauth',    help='URL of an OAuth2 token service')
        parser.add_argument('-c', '--clientid', help='Aria client ID')
        parser.add_argument('-s', '--secret',   help='Secret for the Aria client')

        # Get argument values from the command line
        args = parser.parse_args()
        self.sse_service_url = args.sse
        self.oauth2_url      = args.oauth
        self.client_id       = args.clientid
        self.client_secret   = args.secret


# Handlers for the stream events

def handle_create(data):
    """The data part of a 'create' event has two lines, ref of the created entity, then its data"""
    lines = data.split("\n", 2)
    print("CREATE to: %s\ndata: %s" % (lines[0], lines[1]))

def handle_update(data):
    """The data part of a 'update' event has two lines, ref of the updated entity, then its data"""
    lines = data.split("\n", 2)
    print("UPDATE to: %s\ndata: %s" % (lines[0], lines[1]))

def handle_delete(data):
    """The data part of a 'delete' event has one line, ref of the deleted entity"""
    print("DELETE %s" % (data))

def handle_load(data):
    """The data part of a 'load' event has two lines, ref of the created entity, then its data"""
    lines = data.split("\n", 2)
    print("LOAD to: %s\ndata: %s" % (lines[0], lines[1]))

def handle_heartbeat(data):
    """These messages appear periodically just to keep the stream connection alive."""

class SseHandlers(object):
    def __init__(self):
        # Map event type to handler
        self.event_handlers = {
            "create": handle_create,
            "update": handle_update,
            "delete": handle_delete,
            "load": handle_load,
            "message": handle_heartbeat

    def handleMsg(self, msg):
        # Get the handler for the event type.  Call that handler with the event's data
        # event_handlers.get(msg.event)(msg.data)


class StreamStatusStore(object):
    A dummy store for a stream's last event id.

    The purpose of this is to save the stream's last event id so that in the event that this app crashes,
    the app knows where to restart.  A real implementation should periodically write the value to some kind of
    persistent store, such as a file system, memcache or database.  On restart the implementation should reread
    the value.

    def __init__(self):
        self.saved_last_id = None

    def save_last_id(self, last_id):
        self.saved_last_id = last_id

    def get_last_id(self):
        return self.saved_last_id


Data Streaming Overview
Data Stream Client & Server-Sent Events (SSE)
Consuming Aria Data Streams
Subscribing to Objects in the Data Streams
Data Streaming Authentication
Data Streaming Logical Object Model
Data Streaming JSON Representation
Example Data Streaming Code
Data Streaming Frequently Asked Questions

Last modified


This page has no custom tags.


This page has no classifications.