snsary.contrib.datastax

Sends batches of Readings as “mutations” to a specified DataStax Astra DB GraphQL endpoint using the Python GQL client, as a means of inserting into a Cassandra table.

GraphQL replaces normal REST with a server-defined language / API in nested {} format. DataStax generates the API automatically based on the keyspaces and tables that exist.

GraphQL isn’t well suited to timeseries data: every insertion (“mutation”) must have a unique alias. The output compensates for this by using throwaway “r0”, “r1” aliases for each reading mutation. See the tests for an example. In order to simplify building each request, the output makes an initial request to get the schema for the keyspace in order to utilise the DSL feature.

Create an instance with .from_env(), which expects:

  • DATASTAX_URL

  • DATASTAX_TOKEN (needs API write permission)

Setting up Astra DB

Create a DB table in Astra as follows:

CREATE TABLE reading (
    host text,
    sensor text,
    metric text,
    timestamp timestamp,
    value double,
    PRIMARY KEY ((host,sensor,metric), timestamp)
)
WITH CLUSTERING ORDER BY (timestamp DESC)
AND default_time_to_live = 33696000;

The output specifies a TTL for each insertion, which may be configurable in future. Having a default TTL for the table is optional but reduces the risk of data remaining indefinitely. Note that it’s current not possible to specify other options like the compaction strategy. However, the DataStax docs say the default strategy is suitable for time series data.

Querying the data

Example GraphQL query for data in the table:

query readings {
  data:reading (
    options: {
      pageSize: 10000, limit: 10000
    },
    filter: {
      host: { eq: "raspberrypi" },
      sensor: { eq: "PMSx003" },
      metric: { eq: "pm25--max/minute" },
      timestamp: { gt: "${__from:date:iso}", lt: "${__to:date:iso}" }
    }
  ) {
    values {
      host, sensor, metric, timestamp, value
    }
  }
}

Note that the “$” variables are Grafana global variables.

Module Contents

class snsary.contrib.datastax.GraphQLOutput(url, token)

Bases: snsary.outputs.BatchOutput

TTL = 33696000
classmethod from_env()
publish_batch(readings)