Cassandra and IoT queries: Are they a good match?

Cassandra and IoT queries: Are they a good match?

Cassandra is known to be an industry standard Database for storing Big Data (Volume, Velocity, and Variability), and an IoT platform is really a canonical representation of such a use case. We present here benchmarking results for some typical IoT query cases we were experiencing in the design of ORION, our IoT platform. The aim is to understand when, and perhaps more importantly, how Cassandra can play an important role for this important use case.

Authors: Arbab Khalil, Affan Syed

Introduction

At AN10, we are developing an entire Internet of Things ecosystem, building the entire value chain of IoT products and services: from intelligent sensors and gateways at the edge, to our industrial IoT platform ORION, to high-performance applications and solution suites for specific industries.

The core of our entire product & service offering is ORION, a Big Data capable, highly scalable and full-featured IoT platform. An IoT platform provides the capabilities of IoT device management, scalable data management & analytics, and robust APIs on top. The idea is for the platform to do the heavy lifting, and provide applications with IoT capabilities through API calls. Building this platform has required reviewing, testing, and adapting quite a few data stacks, tools and technologies. As an open company, we want to share our experiences with using these technologies as part of a series of blogs. These will take the form of an evidence-based article, showing real-world results.

The data-analytics stack for ORION

One of the basic functions of an IoT platform is that it  meant to scale and receive data from million of devices every minute.  As such, we decided to stick with using a big data stack that matches, at the first cut,  our need to scale: thus we went with the SMACK (Spark, Mesos, Akka, Cassandra, Kafka) stack [1].  To summarize this stack, it allows horizontal scalability in every aspect of a Big Data stack. Cassandra scales the big data storage at high write speeds, Spark provides a distributed analytic engine, Mesos provides the cluster and resource monitoring and management capability, while Kafka and Akka enable a queue-based microservices architecture to build features around a real-time data pipeline.

We use  Cassandra, where the table schema has to be optimized (denormalized) for the type of queries we expect the platform to execute. This is a well known tradeoff (footnote: while there are a host of wonderful articles about how to build schema in Cassandra for IoT, this article is not it), but we wanted to better evaluate the different schema models for the type of queries, generally, we expect our platform to handle.

we wanted to better evaluate the different schema models for the type of queries, generally, we expect our platform to handle.

A generic IoT query

IoT application have a general nature where devices report sensor data on regular intervals (as well as irregular events, but we will leave those aside for the current discussion). This sampling rate implies a time-series data which demands windowed queries; however, IoT applications have another topological mapping that requires queries to range over a group of devices (lets assume uniquely identified by a device id) — this can for region specific or collocation related analysis.

Thus, typically, while the need to look at real-time data can be separated per device, analytical queries will demand queries that range over both time and device ids . This is the generic query we were experiencing being demanded by our customers in the IoT domain.

While using several best-practices for time-series schema design in Cassandra, there are a few other constraints that crop up that make no particular schema design a clear winner, and require detailed benchmarking (thus this effort). One such is the issue of ensuring that partition sizes are not exceptionally large, so we need a way to have a deterministic break down on a size, e.g. monthly basis. Another, concern is the issue of partitioning such that all data does not go to a single partition at the same time (e.g. hotspots due to partition key having ONLY date).  These all make the denormalization of data for Cassandra difficult, especially for the case where the range over devices and time cannot be restricted, either.

In order to evaluate, we came up with a few schema configurations, where we majorly change the PRIMARY_KEY (determining the performance of queries). We consider a simple device with a packet containing the dev_id, date and timestamp for a  single voltage sensor (different sensors will have different tables for now, we can also keep them in one, but again a distraction from the current evaluation).

#### C* table1 ####
CREATE TABLE test_timeseries.timeseries_with_date1 (
    dev_id text,
    day date,
    rec_time timestamp,
    voltage float,
    PRIMARY KEY ((dev_id, rec_time), day)
) WITH CLUSTERING ORDER BY (day ASC)
#### C* table2 ####
CREATE TABLE test_timeseries.timeseries_with_date2 (
    dev_id text,
    day date,
    rec_time timestamp,
    voltage float,
    PRIMARY KEY (dev_id, day, rec_time)
) WITH CLUSTERING ORDER BY (day ASC, rec_time ASC)
#### C* table3 ####
CREATE TABLE test_timeseries.timeseries_with_date3 (
    day date,
    dev_id text,
    rec_time timestamp,
    voltage float,
    PRIMARY KEY (day, dev_id, rec_time)
) WITH CLUSTERING ORDER BY (dev_id ASC, rec_time ASC)
#### C* table4 ####
CREATE TABLE test_timeseries.timeseries_with_date4 (
    day date,
    dev_id text,
    rec_time timestamp,
    voltage float,
    PRIMARY KEY ((day, dev_id), rec_time)
) WITH CLUSTERING ORDER BY (rec_time ASC)
#### C* table5 ####
CREATE TABLE test_timeseries.timeseries_with_date5 (
    dev_id text,
    day date,
    rec_time timestamp,
    voltage float,
    PRIMARY KEY ((dev_id, day), rec_time)

) WITH CLUSTERING ORDER BY (rec_time ASC)

Evaluation Framework:

We are using a virtual machine with 8 vCPUs and 30 GB memory running Ubuntu Server 16.04. Both Cassandra and Spark are running on same machine with single node deployment. We are using Cassandra:3.11.2, Spark 2.2.0 and Spark-Cassandra-Connector:2.0.7-s_2.11 version.  We use Spark to enable any query optimization possible using its Catalyst engine.

Testing Procedure

Data-set contains data from 1000 devices, each sending one packet per minute over a total duration of two months (July and Aug 2017) or 62 days. Each table contains 89.28 million entries.

Initially we benchmarked these 5 tables with three different querying methods available in Spark. We ran a simple count query on each table for different number of devices over variable time duration in days.  We will discuss each method one by one:

1. where() function of DataFrame API: Conditions on day and dev_id are given through where() function followed by cunt() function on dataframe. This query is performed as follows:

### Loading Table from Cassandra ###
table1_df = spark.read.format("org.apache.spark.sql.cassandra")\
    .options(keyspace = 'test_timeseries', table = 'timeseries_with_date').load()
#### Querying table ####
query_table1_df = table1_df.where((col("day") >= "2017-07-01") & (col("day") <= "2017-07-04") & \
    (col("dev_id").isin(devices))).count()

2. Spark SQL Query using range queries: Here we are running queries using Spark SQL API and gives duration through range. It makes writing queries simple and improves code readability. This query is performed as follows:

#### Loading Table from Cassandra and Registering Temp View ####
spark.read.format("org.apache.spark.sql.cassandra")\
    .options(keyspace = 'test_timeseries', table = 'timeseries_with_date')\
    .load().createOrReplaceTempView("table1")
#### Querying table where devices is a string having list of all dev_id's ####
query_table1 = spark.sql("SELECT COUNT(1) FROM table1 WHERE day >= cast('2017-07-01' as date) AND \
day <= cast('2017-07-15' as date) AND dev_id  IN(" + devices + ")" 

3. Spark SQL Query using IN() : Here we are running queries using Spark SQL API and giving dates as list through IN(). This is to check weather range performs better or passing days through IN(). This query is performed as follows:

query1 = "SELECT COUNT(1) FROM table1 WHERE day IN " + dates + " AND dev_id  IN " + devices
## dates is string containing all days and devices is string containing all dev_id's
query_table1 = spark.sql(query1)

Results of these queries are shown in Without Year sheet in results sections. Time format is minutes:seconds.milliseconds (MM:SS.sss)

Adding another field for performance

After reading  Jon Haddad’s article [2] about “Cassandra Time Series Data Modeling For Massive Scale”, we added an extra field of year in partition keys of above four PRIMARY KEYs. Our new dataset contains data from 1000 devices, corresponding to sending one packet per minute over a total duration of four months (Nov 2017 to Feb 2018) or 120 days, thus ensuring two different partitions due to years. Now we have four additional  tables with the same schema but different primary keys:

  1.     PRIMARY KEY ((year, dev_id), day, rec_time)
  2.     PRIMARY KEY ((year, day), dev_id, rec_time)
  3.     PRIMARY KEY ((year, dev_id, day), rec_time)
  4.     PRIMARY KEY ((year, day, dev_id), rec_time)

 

Results:

We performed our experiments and the results were compiled in a publicly visible sheet shared here [6]. While we encourage the reader to look at it in details, here we replicate one table of the sheet for quick analysis.

 

ParametersPRIMARY KEY
Results of Range QueriesResult of IN() Queries
DevicesDays((dev_id, rec_time), day)(dev_id, day, rec_time)(day, dev_id, rec_time)((day, dev_id), rec_time)((dev_id, day ), rec_time)(dev_id, day, rec_time)(day, dev_id, rec_time)((day, dev_id), rec_time)((dev_id, day ), rec_time)
1704:52.12200:00.23000:03.01205:42.77705:18.06700:03.32101:08.45307:07.43706:42.06200:00.006
11506:14.78100:00.27900:03.06105:41.01905:11.89200:02.98702:20.63006:53.11606:15.10100:00.008
13108:39.62900:00.44200:03.04005:43.67305:20.66300:02.95704:36.41306:52.87206:30.83000:00.011
16212:01.71200:00.66300:02.95905:37.10705:11.03500:03.35610:05.55007:28.42706:44.36100:00.012
200704:49.88300:12.85408:34.84905:36.60505:09.08502:02.89501:03.80606:56.28406:33.10900:00.195
2001506:11.30900:26.25408:25.70105:32.40505:08.72202:01.87902:24.66607:14.92106:28.77400:00.398
2003108:35.60200:52.57408:29.67605:38.05005:13.89502:05.05104:46.59207:04.96506:18.81000:00.484
2006212:03.53201:35.37608:50.61105:51.09805:26.87602:04.11009:09.96606:48.37206:25.20300:01.980
*All704:50.97601:03.67208:18.47505:33.61705:01.34011:09.87701:06.49306:40.87106:02.69400:00.636
*All1506:08.79302:11.35808:26.81505:30.56405:04.73311:09.10102:13.58706:58.01006:55.27500:01.071
*All3108:29.88104:25.93008:29.87005:35.54705:07.08711:39.15904:34.02806:52.81706:00.13800:03.681
*All6211:50.36707:47.10708:23.72605:26.09205:02.73711:08.55708:47.44107:00.55606:31.97900:06.879

 

Evaluation of Results

Here we will briefly discuss the behavior of each PRIMARY KEY with respect to query time and its effect on C* cluster.

  1. PRIMARY KEY ((dev_id, rec_time), day) :  Our queries are based on day and dev id not on rec_time, so having rec_time before day proved to be a worst case in most of cases.
  2. PRIMARY KEY (dev_id, day, rec_time) : It seems to be the best case in terms of execution time in both range queries and IN() queries. This has a major problem of large partition size because all data of one device is stored in single partition.
  3. PRIMARY KEY (day, dev_id, rec_time) : With range queries it didn’t performed well but with IN() queries the execution time is directly proportional to number of days irrespective of number of devices. This has problem of hotspot in C* cluster as single partition contains data of whole day which also results in large partition size.
  4. PRIMARY KEY ((day, dev_id), rec_time) : The execution time is directly proportional to total table size, almost same for different number of days and number of devices. Although it solves the problem of large partitions and hotspot in C* cluster.
  5. PRIMARY KEY ((dev_id, day), rec_time) : Its behaviour is almost similar to 4th one.
  6. PRIMARY KEY ((year, dev_id), day, rec_time) : It is good for lower number of devices but worst cast for higher number of devices. It somehow tries to solve the problem of all data of single device in one partition as it created a partition over year and device id.
  7. PRIMARY KEY ((year, day), dev_id, rec_time) : The execution time only depends on number of days only, irrespective of number of devices being queried but it is never the worst case. Moreover it has same hotspot issue as mentioned in 2nd PRIMARY KEY.
  8. PRIMARY KEY ((year, dev_id, day), rec_time) : Its behavior is also to 4th PRIMARY KEY. Moreover adding year field just adds an additional cost as date is already there.
  9. PRIMARY KEY ((year, day, dev_id), rec_time) : Its behavior is similar to 8th PRIMARY KEY.

 

Other Observations:

We observed that having a multi-field partition key allows for fast querying only if the “=” is used going left to right. If an IN() (for specifying eg. range of time or list of devices) is used once that order, than any further usage of IN() removes any benefit (i.e. a near full table scan).

Another useful observation was that using the IN() to query for days is less useful than putting in a range query.

Conclusion:

As such, it seems that using Cassandra for serving OLTP and OLAP queries for an IoT use-case is not, perhaps, the best decision. This requires us to know combine Cassandra’s industry strength robustness and performance, with another DB that enables fast queries for the typical IoT use case.

The main takeaways from this benchmarking is that we don’t have a single schema to answer our (IoT) use case without any drawbacks. Thus while the ((day, dev_id), rec_time) gives a constant response, it is dependent entirely on the total data size (full scan). On the other hand, (dev_id, day, rec_time) and its counterpart (day, dev_id, rec_time) provide acceptable results, we have the issue of very large partition space in the first, and hotspot while writing for the latter case.

As such, it seems that using Cassandra for serving OLTP and OLAP queries for an IoT use-case is not, perhaps, the best decision. This requires us to know combine Cassandra’s industry strength robustness and performance, with another DB that enables fast queries for the typical IoT use case. We are currently evaluating Druid [3], InfluxDB [4], and Kudu [5]  as companion database in this regard. We will keep everyone posted about the final decision and performance results.

 

No Comments

Post a Comment