#RioOlympics2016 Twitter Sentiment Analytics using Azure Stream Analytics & Azure ML with Predictive Analytics using PowerBI

[bing_translator]

Last year December 2015, Azure Machine Learning Team announced the availability of integration of Azure Machine Learning deployed web services into Azure Stream Analytics as 'Functions'. This feature is currently in preview till the date of writing this blog. So, in this blog post, we are going to see how to build a realtime twitter streaming sentiment analytics using Azure Service Bus Event Hub, Azure Stream Analytics + Azure ML predictive solution function & finally building visualization using Microsoft PowerBI.  

Here goes the end to end solution architecture diagram of realtime Twitter streaming sentiment analytics using raw tweets events collection to pushing events into Event Hub, then publishing through ASA through AML predictive analytics sentiment values & finally implementing realtime sentiment scoring dashboard using PowerBI.

 

[caption id="attachment_55" align="aligncenter" width="719"]Solution Architecture Solution Architecture[/caption]

 

So, first the realtime tweets are getting collected using Java eclipse code using Twitter4J library. So, the tweets event are collected based on this following keywords , #RioOlympics2016, #Rio2016 etc. & stored directly as .csv file into OneDrive. (Tweets event can be stored even directly event hubs as well).

package com.hadoop;

import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import com.opencsv.CSVWriter;

import twitter4j.FilterQuery;
import twitter4j.Place;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.User;
import twitter4j.conf.ConfigurationBuilder;

public class TwitterText{

public static void main(String[] args) {
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true);
cb.setOAuthConsumerKey("");
cb.setOAuthConsumerSecret("");
cb.setOAuthAccessToken("");
cb.setOAuthAccessTokenSecret("");

// gets Twitter instance with default credentials
TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();

StatusListener listener = new StatusListener() {

@Override
public void onException(Exception arg0) {
// TODO Auto-generated method stub

}

@Override
public void onDeletionNotice(StatusDeletionNotice arg0) {
// TODO Auto-generated method stub

}

@Override
public void onScrubGeo(long arg0, long arg1) {
// TODO Auto-generated method stub

}

@Override
public void onStatus(Status status) {
User user = status.getUser();

// gets Username
String username = status.getUser().getScreenName();
// System.out.println(username);
String profileLocation = user.getLocation();

// System.out.println(profileLocation);
long tweetId = status.getId();
// System.out.println(tweetId);
String content = status.getText();
// System.out.println(content +"\n");
String text = content;
String csv = "C:\\OneDrive\\RealtimeTweets\\RealTweets.csv";
String tweetText = null;
CSVWriter writer = null;
try {

tweetText = content;
writer = new CSVWriter(new FileWriter(csv,true));
String[] finalOutput = tweetText.split(",");
writer.writeNext(finalOutput,true);
System.out.println(tweetText);
writer.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

@Override
public void onTrackLimitationNotice(int arg0) {
// TODO Auto-generated method stub

}

@Override
public void onStallWarning(StallWarning arg0) {
// TODO Auto-generated method stub

}

};
FilterQuery fq = new FilterQuery();

String keywords[] = {"#Rio2016","#RioOlympics2016"};

fq.track(keywords);

twitterStream.addListener(listener);
twitterStream.filter(fq);

}
}

Here's how it looks the output.

TwitterJava

 

Next, customize the Service Bus event hub to push realtime tweets into Event hub from OneDrive Storage.

using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.VisualBasic.FileIO;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SentTweetsToEventHub
{
class Program
{

static string eventHubName = "<your Event Hub name>";
static string connectionString = GetServiceBusConnectionString();
static string data = string.Empty;

static void Main(string[] args)
{
string[] filePath = Directory.GetFiles(@"C:\", "*.csv");
string csv_file_path = string.Empty;
int size = filePath.Length;
for (int i = 0; i < size; i++)
{
Console.WriteLine(filePath[i]);
csv_file_path = filePath[i];
}

DataTable csvData = GetDataTableFromCSVFile(csv_file_path);
Console.WriteLine("Rows count:" + csvData.Rows.Count);
DataTable table = csvData;
foreach (DataRow row in table.Rows)
{
//// Console.WriteLine("---Row---");
foreach (var item in row.ItemArray)
{

data = item.ToString();
Console.Write(data);

var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
//while (true)
//{

try
{
foreach (DataRow rows in table.Rows)
{
var info = new Tweets
{

text = rows.ItemArray[0].ToString(),

};

var serializedString = JsonConvert.SerializeObject(info);
var message = data;
Console.WriteLine("{0}> Sending events: {1}", DateTime.Now.ToString(), serializedString.ToString());
eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(serializedString.ToString())));

}
}
catch (Exception ex)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("{0} > Exception: {1}", DateTime.Now.ToString(), ex.Message);
Console.ResetColor();
}
Task.Delay(200);
//}

}

}
// Console.ReadLine();

Console.WriteLine("Press Ctrl-C to stop the sender process");
Console.WriteLine("Press Enter to start now");
Console.ReadLine();

// SendingRandomMessages().Wait();

}

private static DataTable GetDataTableFromCSVFile(string csv_file_path)
{
DataTable csvData = new DataTable();
string data = string.Empty;
try
{
using (TextFieldParser csvReader = new TextFieldParser(csv_file_path))
{
csvReader.SetDelimiters(new string[] { "," });
csvReader.HasFieldsEnclosedInQuotes = true;

//read column names
string[] colFields = csvReader.ReadFields();
foreach (string column in colFields)
{
DataColumn datecolumn = new DataColumn(column);
datecolumn.AllowDBNull = true;
csvData.Columns.Add(datecolumn);
}
while (!csvReader.EndOfData)
{
string[] fieldData = csvReader.ReadFields();

for (int i = 0; i < fieldData.Length; i++)
{
if (fieldData[i] == "")
{
fieldData[i] = null;
}
}
csvData.Rows.Add(fieldData);

}
}
}
catch (Exception ex)
{

}
return csvData;
}

private static string GetServiceBusConnectionString()
{
string connectionString = ConfigurationManager.AppSettings["EventHubConnectionString"];
if (string.IsNullOrEmpty(connectionString))
{
Console.WriteLine("Did not find Service Bus connections string in appsettings (app.config)");
return string.Empty;
}
ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(connectionString);
builder.TransportType = Microsoft.ServiceBus.Messaging.TransportType.Amqp;
return builder.ToString();
}

static async Task SendingRandomMessages()
{
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
while (true)
{
try
{
var message = data;
Console.WriteLine("{0}> Sending events: {1}", DateTime.Now.ToString(), message);
await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
}
catch (Exception ex)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("{0} > Exception: {1}", DateTime.Now.ToString(), ex.Message);
Console.ResetColor();
}
await Task.Delay(200);
}

}

}
}

 

Now, configure the ASA job to add Azure Event Hub as data stream & add ASA-SQL query along with AML function. The ASA-SQL with Azure machine learning twitter sentiment predictive analytics function looks like the following.

ASA-SQL

 

Provision Twitter Predictive Sentiment analysis model from Cortana Intelligence gallery. Download the mini-twitter sentiment analysis predictive model from here . Take the ML web service URL & key from the Excel 2010 template & paste it within ASA 'Functions' tab to integrate AML analysis model.

AML

Next, add the output as Azure Blob storage to store the tweets & sentiment scores & implement powerbi sentiment analytics from it.

output

 

Finally, collect the sentiment scores output .csv files from the blob storage. Implement PowerBI dashboard from it.

Blob

 

The final Tweet events with sentiment scores output looks like as the following screenshot.

Scores PBI