Metric Data Analysis using the Apache Spark

A blog post by Jiri Kremser

cassandra | metrics | spark



The goal of this blog post is to show how to easily connect Apache Spark to the Cassandra with the metric data; perform simple transformation on the data as well as some examples of calculating correlation between the two time series data streams. Last but not least we will show how to create a k-means clustering model.

Spark logo

The situation is visualized on the figure bellow. After the WildFly Agent was collecting the data for some time period and reporting them to Hawkular Metrics. Hawkular Metrics stores them into Cassandra to which we will be connecting with Spark.

Components interaction

Initialization

First we need to create a spark context with the spark.cassandra.connection.host property correctly set.

val conf = new SparkConf()
  .setAppName("HelloHawkular")
  .setMaster("local")
  .set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(conf)

It is very simple, the only assumption here is that the spark-cassandra-connector jar is on the class path.

Note
Apache Spark is completely written in Scala so also the examples are written in Scala. Although, it provides also Java and Python APIs, using the language it has been written in is the best choice, because one can track down the calls in the (native) source code.

Reading the data from Cassandra

Let’s create a "Resilient Distributed Dataset" (RDD) from Cassandra table called "data" in "hawkular_metrics" keyspace. Resilient Distributed Datasets have plenty of useful operations defined. RDD is a leitmotif in Spark, all the data manipulation is represented by chaining the operations on RDDs. It is an immutable structure that inherently supports parallelization. Generally speaking, RDDs can be created as a result of calling a method on another RDD(s) or by calling a method the SparkContext.

val rdd: RDD[CassandraRow] = sc.cassandraTable("hawkular_metrics", "data")
println(rdd.count)
println(rdd.first)

This prints the number of rows (in my case 245609) and the very first row, just to have an idea how it looks like:

CassandraRow{tenant_id: hawkular, type: 0, metric: MI~R~[e69a19f7-76e7-4fd2-8ed5-864d1570f3ff/Local~/deployment=hawkular-alerts-rest.war/subsystem=ejb3/singleton-bean=PartitionManagerImpl]~MT~Singleton EJB Metrics~Execution Time, dpart: 0, time: baef5a00-6e31-11e6-96d2-5f826fbc8eb1, aggregates: {}, availability: null, data_retention: null, l_value: null, n_value: 0.0, s_value: null, tags: {}}

The data table contains all the metrics so we need to filter only what we are interested in. RDD provides a groupBy method that takes a function that is applied on all the data in the RDD and returns the key by which the grouping should be done. We can group by the metric. This can be useful when working with more metrics. But for our purposes, let’s just do simple filter method.

val feedId = "e69a19f7-76e7-4fd2-8ed5-864d1570f3ff"
val metric1 = "Total Memory"
val metric2 = "Available Memory"
val total: RDD[Double] = rdd.filter(x => {
    val metricId = x.getString("metric")
    metricId.contains(feedId) && metricId.contains(metric1)
  })
  .map(_.getDouble("n_value"))
  .repartition(8)

val free: RDD[Double] = rdd.filter(x => {
    val metricId = x.getString("metric")
    metricId.contains(feedId) && metricId.contains(metric2)
  })
  .map(_.getDouble("n_value"))
  .repartition(8)

The repartition() method needs to be called because methods like filter, flatMap and similar have impact on the number of elements in each partition. We need those two RDDs to have the same amount of partitions and the same amount of elements in each partition in order to be able to apply some statistical methods on them. Method called coalesce is an alternative option to use here. Perhaps even better because it does not shuffle the data over the network in case of multi node environment. However, the data model of Hawkular metrics ensures that data from one metric are always on the same node.

Now, we have two RDDs. The total and free that represent the amounts of memory that was "free" on the platform and the total value.

Doing something fancy

Let us create a derived RDD that will represent the used memory on the platform.

val used: RDD[Double] = total.zip(free).map(e => e._1 - e._2)

What we have done here is that we have created a new RDD, representing a metric data that were calculated using two other metric streams. The transformation was trivial, but RDD provides also methods for aggregation.

Now, let’s calculate the correlation between the used and free memory. There should be some correlation, right?

val correlation: Double = Statistics.corr(used, free, "pearson")
println(s"Correlation is: $correlation")

This will print Correlation is: -0.9999999999999781. That means there is total negative correlation between the two (the higher the first one, the lower the second one). This example was artificial, but in general, one can calculate the correlation between any two metric streams. Of course, correlation doesn’t imply causation, so we can’t extract any higher level information like business rules here. But we obtain some information that metrics are somehow related together and with further analysis we can for example detect, that change in one metric stream always precedes the change in the second metric stream. Again, it’s not causation, but something stronger than correlation. Granger causality test is possible method, nonetheless, this is out of the scope of this blog post.

Last thing, I wanted to show is the k-means clustering.

val usedMemoryVector = used.map(x => Vectors.dense(x))
val numClusters = 3
val numIterations = 20
val clusters: KMeansModel = KMeans.train(usedMemoryVector, numClusters, numIterations)
println("Cluster centers:")
clusters.clusterCenters.foreach(println)

The code above will print the three cluster representatives. It is also possible to test the new data point by calling clusters.predics(point) that will return the expected cluster the point is attracted to.

There is much more in the MLlib. One can find outliers, common patterns, do a classification, etc.

Tip
All the code examples can be downloaded and run against the Cassandra with the metric data.

Summary

By using the spark-cassandra-connector, we were able to get the data from C* as RDD, call the simple operations on RDDs and some smart functions from the MLlib package, but we entirely neglected the temporal aspect of the data. The only reason it worked reasonably well is because the data is sorted in the table by the timestamp. There is a Spark library for time series data out there called spark-ts. Nice improvement would be connecting the spark-cassandra-connector and spark-ts to provide convenient way of working with time series data on top of Cassandra.

It is also worth mentioning that Spark does not offer only data processing on the data that was obtained by some "data pump". Spark is so popular because it allows running the algorithms close to the data. In our case we had only single headed Cassandra, but in general it can be run on a cluster consisting of multiple nodes. This, of course, requires the wisely chosen partition keys in the schema respecting it when working with RDDs.




Published by Jiri Kremser on 31 August 2016

redhatlogo-white

© 2016 | Hawkular is released under Apache License v2.0