Today we will take a look at Sparks's module for MLlib or its builtin machine learning library Sparks MLlib Guide . KMeans is a popular clustering method. Clustering methods are used when there is no class to be predicted but instances are divided into groups or clusters. The clusters hopefully will represent some mechanism at play that draws the instance to a particular cluster. The instances assigned to the cluster should have a strong resemblance to each other. A typical use case for KMeans is segmentation of data. For example suppose you are studying heart disease and you have a theory that individuals with heart disease are overweight. You have collected data from individuals with and without heart disease and measurements of their weight like their body mass index, waisttohip ratio, skinfold thickness, and actual weight. KMeans is used to cluster the data into groups for further analysis and to test the theory. You can find out more about KMeans on Wikipedia Wikipedia KMeans .
The data that we are going to use in today's example is stock market data with the ConnorsRSI indicator. You can learn more about ConnorsRSI at ConnorsRSI. Below is a sample of the data. ConnorsRSI is a composite indicator made up from RSI_CLOSE_3, PERCENT_RANK_100, and RSI_STREAK_2. We will use these attributes as well as the actual ConnorsRSI (CRSI) and RSI2 to pass into our KMeans algorithm. The calculation of this data is already normalized from 0 to 100. The other columns like ID, LABEL, RTN5, FIVE_DAY_GL, and CLOSE we will use to do further analysis once we cluster the instances. They will not be passed into the KMeans algorithm.
Sample Data (CSV): 1988 instances of SPY SPY
Column  Description 
ID  Used to uniquely identify the instance. date:symbol 
Label  If the close was up or down from the previous days close. 
RTN5  The return from the past 5 days. 
FIVE_DAY_GL  The return from the next 5 days. 
Close  Closing price. 
RSI2  Relative Strength Index (2 days). 
RSI_CLOSE_3  Relative Strength Index (3 days). 
RSI_STREAK_2  Relative Strength Index (2 days) for streak durations based on the closing price. 
PERCENT_RANK_100  The percentage rank value over the last 100 days. This is a rank that compares todays return to the last 100 returns. 
CRSI  The ConnorsRSI Indicator. (RSI_CLOSE_3 + RSI_STREAK_2 + PERCENT_RANK) / 3 
ID  LABEL  RTN5  FIVE_DAY_GL  CLOSE  RSI2  RSI_CLOSE_3  PERCENT_RANK_100  RSI_STREAK_2  CRSI 
20150916:SPY  UP  2.76708  3.28704  200.18  91.5775  81.572  84  73.2035  79.5918 
20150915:SPY  UP  0.521704  2.29265  198.46  83.4467  72.9477  92  60.6273  75.1917 
20150914:SPY  DN  1.77579  0.22958  196.01  47.0239  51.3076  31  25.807  36.0382 
20150911:SPY  UP  0.60854  0.65569  196.74  69.9559  61.0005  76  76.643  71.2145 
20150910:SPY  UP  0.225168  1.98111  195.85  57.2462  53.9258  79  65.2266  66.0508 
20150909:SPY  DN  1.5748  2.76708  194.79  42.8488  46.1728  7  31.9797  28.3842 
20150908:SPY  UP  0.12141  0.521704  197.43  73.7949  64.0751  98  61.2696  74.4483 
20150904:SPY  DN  3.35709  1.77579  192.59  22.4626  31.7166  6  28.549  22.0886 
The KMeans algorithm needs to be told how many clusters (K) the instances should be grouped into. For our example let's start with two clusters to see if they have a relationship to the label, "UP" or "DN". The Apache Spark scala documentation has the details on all the methods for KMeans and KMeansModel at KMeansModel
Below is the scala code which you can run in a zeppelin notebook or sparkshell on your HDInsight cluster with Spark. HDInsight
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.sql.functions._
// load file and remove header
val data = sc.textFile("wasb:///data/spykmeans.csv")
val header = data.first
val rows = data.filter(l => l != header)
// define case class
case class CC1(ID: String, LABEL: String, RTN5: Double, FIVE_DAY_GL: Double, CLOSE: Double, RSI2: Double, RSI_CLOSE_3: Double, PERCENT_RANK_100: Double, RSI_STREAK_2: Double, CRSI: Double)
// comma separator split
val allSplit = rows.map(line => line.split(","))
// map parts to case class
val allData = allSplit.map( p => CC1( p(0).toString, p(1).toString, p(2).trim.toDouble, p(3).trim.toDouble, p(4).trim.toDouble, p(5).trim.toDouble, p(6).trim.toDouble, p(7).trim.toDouble, p(8).trim.toDouble, p(9).trim.toDouble))
// convert rdd to dataframe
val allDF = allData.toDF()
// convert back to rdd and cache the data
val rowsRDD = allDF.rdd.map(r => (r.getString(0), r.getString(1), r.getDouble(2), r.getDouble(3), r.getDouble(4), r.getDouble(5), r.getDouble(6), r.getDouble(7), r.getDouble(8), r.getDouble(9) ))
rowsRDD.cache()
// convert data to RDD which will be passed to KMeans and cache the data. We are passing in RSI2, RSI_CLOSE_3, PERCENT_RANK_100, RSI_STREAK_2 and CRSI to KMeans. These are the attributes we want to use to assign the instance to a cluster
val vectors = allDF.rdd.map(r => Vectors.dense( r.getDouble(5), r.getDouble(6), r.getDouble(7), r.getDouble(8), r.getDouble(9) ))
vectors.cache()
//KMeans model with 2 clusters and 20 iterations
val kMeansModel = KMeans.train(vectors, 2, 20)
//Print the center of each cluster
kMeansModel.clusterCenters.foreach(println)
// Get the prediction from the model with the ID so we can link them back to other information
val predictions = rowsRDD.map{r => (r._1, kMeansModel.predict(Vectors.dense(r._6, r._7, r._8, r._9, r._10) ))}
// convert the rdd to a dataframe
val predDF = predictions.toDF("ID", "CLUSTER")
The code imports some methods for Vector, KMeans and SQL that we need. It then loads the .csv file from disk and removes the header that have our column descriptions. We then define a case class, split the columns by comma and map the data into the case class. We then convert the RDD into a dataframe. Next we map the dataframe back to an RDD and cache the data. We then create an RDD for the 5 columns we want to pass to the KMeans algorithm and cache the data. We want the RDD cached because KMeans is a very iterative algorithm. The caching helps speed up performance. We then create the kMeansModel passing in the vector RDD that has our attributes and specifying we want two clusters and 20 iterations. We then print out the centers for all the clusters. Now that the model is created, we get our predictions for the clusters with an ID so that we can uniquely identify each instance with the cluster it was assigned to. We then convert this back to a dataframe to analyze.
Below is a subset of the allDF dataframe with our data.
Below is a subset of our predDF dataframe with the ID and the CLUSTER. We now have a unique identifier and which cluster the KMeans algoritm assigned it to. Also displayed is the mean for each of the attributes passed into the KMeans algorithm for each cluster. Cluster 0 and Cluster 1. You can see that the means are very close in each cluster. For Cluster 0 it is around 27 and for cluster 1 it is around 71.
Because the allDF and predDF dataframes have a common column we can join them and do more analysis.
// join the dataframes on ID (spark 1.4.1)
val t = allDF.join(prdDF, "ID")
Now we have all of our data combined with the CLUSTER that the KMeans algorithm assigned each instance to and we can continue our investigation.
Let's display a subset of each cluster. It looks like cluster 0 is mostly DN labels and has attributes averaging around 27 like the centers of the clusters indicated. Cluster 1 is mostly UP labels and the attributes average is around 71.
// review a subset of each cluster
t.filter("CLUSTER = 0").show()
t.filter("CLUSTER = 1").show()
Let's get descriptive statistics on each of our clusters. This is for all the instances in each cluster and not just a subset. This gives us the count, mean, stddev, min, max for all numeric values in the dataframe. We filter each by CLUSTER.
// get descriptive statistics for each cluster
t.filter("CLUSTER = 0").describe().show()
t.filter("CLUSTER = 1").describe().show()
So what can we infer from the output of our KMeans clusters?

Cluster 0 has lower ConnorsRSI (CRSI), with a mean of 27. Cluster 1 has higher CRSI, with a mean of 71. Could these be areas to initiate buy and sells signals?

Cluster 0 has mostly DN labels, and Cluster 1 has mostly UP labels.

Cluster 0 has a mean of .28 % gain five days later, while cluster 1 has a loss of .03 five days later.

Cluster 0 has a mean loss of 1.22% five days before and cluster 1 has a gain of 1.15% five days before. Does this suggest markets revert to their mean?

Both clusters have min\max of 5 day returns between positive 19.40% to a loss of 19.79%.
This is just the tip of the iceberg with further questions, but gives an example of using HDInsight and spark to start your own KMeans analysis. Spark MLlib has many algorithms to explore including SVMs, logistic regression, linear regression, naïve bayes, decision trees, random forests, basic statistics, and more. The implementation of these algorithms in spark MLlib is for distributed clusters so you can do machine learning on big data. Next I think I'll run the analysis on all data for AMEX, NASDAQ and NYSE stock exchanges and see if the pattern holds up!
Bill
Thank u for this post. I was been the Above code in Sparkshell It works but when i run through maven project.
but when i compile the Above code i get the below error.
Thanks in Advance.
error: No TypeTag available for CC1
val allDF = allData.toDF()
This is my code and exactly i get the error at allData.toDF.
case class CC1(Phone: String, Vmail: Double, DayMins: Double, EveMins: Double, NightMins: Double, IntlMins: Double)
val allSplit = rows.map(line => line.split(","))
val allData = allSplit.map( p => CC1( p(0).toString, p(1).toDouble, p(2).toDouble, p(3).toDouble, p(4).toDouble, p(5).toDouble ))
import sqlContext.implicits._
val allDF = allData.toDF()
did u fix this issue?
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
import sqlContext.sql
import org.apache.spark.sql.functions._
Helpful. Thanks much!