Using SQL Service Broker for asynchronous external script (R / Python) execution in OLTP systems

Authored by Arvind Shyamsundar (Microsoft)
Credits: Prasad Wagle, Srinivasa Babu Valluri, Arun Jayapal, Ranga Bondada, Anand Joseph (members of the Sayint by Zen3 team)
Reviewers: Nellie Gustafsson, Umachandar Jayachandran, Dimitri Furman (Microsoft)

This blog post was inspired our recent work with the Sayint dev team, who are a part of Zen3 Infosolutions. SQLCAT has been working with them in their process of adopting SQL Server 2017. Microsoft introduced the ability to invoke external Python scripts in SQL Server 2017, and this capability to effectively move ‘intelligence’ closer to the data, was a big motivation factor for the Sayint team to adopt SQL Server 2017.

Sayint application Overview

The Sayint application has many layers through which data flows:

  • It all starts with the Automatic Speech Recognition (ASR) server. This server takes audio recordings of conversations between a customer and a call center executive, and then converts that to text.
  • This text is then recorded into a database and in parallel sent to a message queue.
  • The Sayint application uses several Python scripts which are operationalized in a separate application server to read the same call transcript from the message queue.
  • Once the message is read by the Python script; it then processes that call transcript in various ways, one of which is to find ‘n-grams’ in the text.
  • These n-grams are then recorded into the same database. The diagram below shows the data flow at a high level:

Sayint Architecture

The multiple paths that the data takes, and the external application server, are moving parts which add to the latency and complexity of the solution. This is one of the places where the in-database execution of Python code in SQL Server 2017 appeared to be a valuable simplification for the Sayint development team.

Motivation

Given the capability for external script execution in SQL Server, one scenario is that some external (Python) scripts which are fairly lightweight, might be invoked very frequently in a OLTP system. In fact there are special optimizations now in SQL Server 2017 to directly invoke specific types of ML models using the new native / real-time scoring functions.

But what if you had a (more expensive / long running) external script which you ideally wanted to run as ‘quickly’ as possible when new data arrives, but did not want that expensive external script to block the OLTP insert operation?

That is the scenario we address in this blog post. Here, we will show you how you can use the asynchronous execution mechanism offered by SQL Server Service Broker to ‘queue’ up data inside SQL Server which can then be asynchronously passed to a Python script, and the results of that Python script then stored back into SQL Server.

This is effectively similar to the external message queue pattern but has some key advantages:

  • The solution is integrated within the data store, leading to fewer moving parts and lower complexity
  • Because the solution is in-database, we don’t need to make copies of the data. We just need to know what data has to be processed (effectively a ‘pointer to the data’ is what we need).

Service Broker also offers options to govern the number of readers of the queue, thereby ensuring predictable throughput without affecting core database operations.

Implementation Details

We assume that you have some background about how SQL Server Service Broker helps decouple execution of code inside of SQL Server. If not, please take some time to review the documentation on the feature. There are also many excellent articles in the public domain on the general application of Service Broker.

The chart below shows the sequence of operations that is involved in our solution:image

With that overview in our mind, let us take a look at sample code which illustrates exactly how this works!

Tables and Indexes

First let’s create a table which contains the raw call data. The ASR application inserts into this table. Note the call_id which will serve as a ‘unique key’ for incoming calls.

CREATE TABLE dbo.BaseTable (
     call_id    INT PRIMARY KEY NOT NULL,
     callrecord NVARCHAR (MAX)
);

Next, let’s create a table which will store the ‘shredded’ n-grams corresponding to each call_id:

CREATE TABLE dbo.NGramTable (
     call_id   INT            NOT NULL,
     ngramid   INT            IDENTITY (1, 1) NOT NULL,
     ngramtext NVARCHAR (MAX),
     CONSTRAINT PK_NGramTable PRIMARY KEY (ngramid),
     INDEX CCI_NGramTable CLUSTERED COLUMNSTORE,
     INDEX NC_NGramTable NONCLUSTERED (call_id)
);

In the above table, note that we have a Clustered Columnstore Index declared on this table. Here’s why:

  • The primary purpose of that is to enable high performance querying on aggregates (such as COUNT.) The second purpose is to save on space.
  • Secondly, given that n-grams will be naturally repetitive in nature, the dictionary compression offered by Columnstore Indexes is really effective in saving storage space.

In the Sayint application, there is a need to perform full-text search queries over the n-grams (“show me all n-grams which contain a specific keyword”). To support those queries, we proceed to create a SQL Full-Text Index on the n-gram text column:

CREATE FULLTEXT CATALOG [mydb_catalog] WITH ACCENT_SENSITIVITY = ON

CREATE FULLTEXT INDEX ON [dbo].NGramTable
     (ngramtext)
     KEY INDEX PK_NGramTable
     ON mydb_catalog;

Service Broker Objects

With the tables and indexes created, we now proceed to create the necessary Service Broker constructs. First, we enable Service Broker for this database:

ALTER DATABASE mydb SET ENABLE_BROKER;

Well, that was simple enough! Next, let’s create message types and associated contracts:

CREATE MESSAGE TYPE [RequestMessage]
     VALIDATION = WELL_FORMED_XML;

CREATE MESSAGE TYPE [ReplyMessage]
     VALIDATION = WELL_FORMED_XML;

CREATE CONTRACT [SampleContract]
     ([RequestMessage] SENT BY INITIATOR, [ReplyMessage] SENT BY TARGET);

Lastly we create the Queues and associated Services. We have to do this for both the target and initiator, though the initiator queue will not really store any messages in our specific implementation.

CREATE QUEUE TargetQueue;

CREATE SERVICE [TargetService]
     ON QUEUE TargetQueue
     ([SampleContract]);

CREATE QUEUE InitiatorQueue;

CREATE SERVICE [InitiatorService]
     ON QUEUE InitiatorQueue;

Stored Procedures

We first create a stored procedure which takes a list of call_ids and compute the n-grams for those calls. This stored procedure is the one that actually invokes the Python code using sp_execute_external_script.

CREATE PROCEDURE GenerateNGrams
(
     -- varchar is okay here because we only have numbers
     @listOfCallIds varchar(max)
)
AS
BEGIN
     EXECUTE sp_execute_external_script @language = N'Python', @script = N'
import nltk
from nltk import corpus
import pandas as pd
import string
from revoscalepy import RxSqlServerData, rx_data_step
mydict = []
translate_table = dict((ord(char), None) for char in string.punctuation)
def generate_ngrams(row):
     inputText = row["InputText"]
     nopunc = inputText.translate(translate_table)
     raw_words = nltk.word_tokenize(nopunc)
     
     words = [word for word in raw_words if word not in corpus.stopwords.words("english")]
     
     my_bigrams = list(nltk.bigrams(words))
     my_trigrams = nltk.trigrams(words)
     
     for bigram in my_bigrams:
         rowdict = {}
         rowdict["call_id"] = row["call_id"]
         rowdict["ngram_text"] = (bigram[0] + " " + bigram[1])
         mydict.append(rowdict)
     return
mydict = []
result = InputDataSet.apply(lambda row: generate_ngrams(row), axis=1)
OutputDataSet = pd.DataFrame(mydict)
'
, @input_data_1 = N' SELECT B.call_id, B.callrecord as InputText FROM BaseTable B
WHERE EXISTS(SELECT * FROM STRING_SPLIT(@listOfCallIds, '','') as t WHERE CAST(t.value as int) = B.call_id)'
, @params = N'@listOfCallIds varchar(max)'
, @listOfCallIds = @listOfCallIds
END

Next, we create an “activation stored procedure” which is invoked by the Service Broker queue whenever there are messages on the queue. This activation procedure reads from the queue, constructs a list of call_id values and then invokes the previous “n-gram creator” stored procedure:

CREATE PROCEDURE TargetActivProc
AS
BEGIN
DECLARE @RecvReqDlgHandle AS UNIQUEIDENTIFIER;
DECLARE @RecvReqMsg AS XML;
DECLARE @RecvReqMsgName AS sysname;
DECLARE @listOfCallIds varchar(max);
WHILE (1 = 1)
     BEGIN
         BEGIN TRANSACTION;
         WAITFOR (RECEIVE TOP (1) @RecvReqDlgHandle = conversation_handle, @RecvReqMsg = message_body, @RecvReqMsgName = message_type_name FROM TargetQueue),  TIMEOUT 5000;
         IF (@@ROWCOUNT = 0)
             BEGIN
                 ROLLBACK;
                 BREAK;
             END
         IF @RecvReqMsgName = N'RequestMessage'
             BEGIN
                 -- Populate a local variable with a comma separated list of {call_id} values which are new
                 -- so that the external script invoked by GenerateNGrams can operate on the associated data
                 -- We avoid making a copy of the InputText itself as it will then occupy space in a temp table
                 -- as well as in a Pandas DF later on
                 SELECT @listOfCallIds = STRING_AGG(T.c.value('./@call_id', 'varchar(100)'), ',')
                 FROM   @RecvReqMsg.nodes('/Inserted') AS T(c);
                -- Call the SPEES wrapper procedure to generate n-grams and then insert those into the n-gram table
                 INSERT NGramTable (call_id, ngramtext)
                 EXECUTE GenerateNGrams @listOfCallIds;
                END CONVERSATION @RecvReqDlgHandle;
             END
         ELSE
             IF @RecvReqMsgName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
                 BEGIN
                     END CONVERSATION @RecvReqDlgHandle;
                 END
             ELSE
                 IF @RecvReqMsgName = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
                     BEGIN
                         END CONVERSATION @RecvReqDlgHandle;
                     END
         COMMIT TRANSACTION;
     END
END

We then associate the activation procedure with the queue:

ALTER QUEUE TargetQueue WITH ACTIVATION (STATUS = ON, PROCEDURE_NAME = TargetActivProc, MAX_QUEUE_READERS = 5, EXECUTE AS SELF);

Trigger

We finally create a trigger on the BaseTable which will be invoked automatically for each INSERT operation. This trigger takes the list of call_id values which have been inserted and transforms that into a XML document (using FOR XML) which is then inserted into the Service Broker queue:

CREATE OR ALTER TRIGGER trg_Insert_BaseTable
     ON dbo.BaseTable
     FOR INSERT
     AS BEGIN
            DECLARE @InitDlgHandle AS UNIQUEIDENTIFIER;
            DECLARE @RequestMsg AS XML;
            
            SELECT @RequestMsg = (SELECT call_id
                                  FROM   Inserted
                                  FOR    XML AUTO);
            
            BEGIN TRANSACTION;
           BEGIN DIALOG @InitDlgHandle
                FROM SERVICE [InitiatorService]
                TO SERVICE N'TargetService'
                ON CONTRACT [SampleContract]
                WITH ENCRYPTION = OFF;
            
            -- Send a message on the conversation           
            SEND ON CONVERSATION (@InitDlgHandle) MESSAGE TYPE [RequestMessage] (@RequestMsg);
           COMMIT TRANSACTION;
        END

Unit Testing

Let’s now unit test the operation of the trigger, activation procedure and the n-gram computation using Python! Let’s insert 4 rows in one go into the base table, simulating what the ASR application might do:

TRUNCATE TABLE BaseTable;
TRUNCATE TABLE NGramTable;
INSERT  INTO BaseTable
VALUES (1, 'the quick brown fox jumps over the lazy dog'),
(2, 'the lazy dog now jumps over the quick brown fox'),
(3, 'But I must explain to you how all this mistaken idea of denouncing of a pleasure and praising pain was born and I will give you a complete account of the system, and expound the actual teachings of the great explorer of the truth, the master-builder of human happiness. No one rejects, dislikes, or avoids pleasure itself, because it is pleasure, but because those who do not know how to pursue pleasure rationally encounter consequences that are extremely painful. Nor again is there anyone who loves or pursues or desires to obtain pain of itself, because it is pain, but occasionally circumstances occur in which toil and pain can procure him some great pleasure. To take a trivial example, which of us ever undertakes laborious physical exercise, except to obtain some advantage from it? But who has any right to find fault with a man who chooses to enjoy a pleasure that has no annoying consequences, or one who avoids a pain that produces no resultant pleasure?'),
(4, 'On the other hand, we denounce with righteous indignation and dislike men who are so beguiled and demoralized by the charms of pleasure of the moment, so blinded by desire, that they cannot foresee the pain and trouble that are bound to ensue; and equal blame belongs to those who fail in their duty through weakness of will, which is the same as saying through shrinking from toil and pain. These cases are perfectly simple and easy to distinguish. In a free hour, when our power of choice is untrammeled and when nothing prevents our being able to do what we like best, every pleasure is to be welcomed and every pain avoided. But in certain circumstances and owing to the claims of duty or the obligations of business it will frequently occur that pleasures have to be repudiated and annoyances accepted. The wise man therefore always holds in these matters to this principle of selection: he rejects pleasures to secure other greater pleasures, or else he endures pains to avoid worse pains.');

(In case you are wondering, the last 2 text samples in the unit test are taken from this Wikipedia page!)

Then we can take a look at the n-gram table:

SELECT *
FROM   NGramTable;

As you can see below, n-grams have been computed for multiple call_id values! (The results below have been truncated for readability; there are values for call_id 4 as well in the actual table.)

image

Now, let’s try a representative full-text query:

SELECT T.*
FROM   CONTAINSTABLE(NGramTable, ngramtext, 'fox') FT 
JOIN NGramTable T
ON T.ngramid = FT.[Key];

This query returns the following:

image

Stress Test

Let’s put this to the ultimate test – multiple threads concurrently inserting records. The ideal outcome is to see that the INSERT operations complete quickly, and the Python.exe processes run asynchronously and finish the n-gram generation in due course of time. To facilitate the stress test, we create a SEQUENCE object in SQL and an associated Stored procedure:

CREATE SEQUENCE CallIdsForTest AS INT
    START WITH 1  
    INCREMENT BY 1;
GO

CREATE OR ALTER PROCEDURE InsertCall
AS
BEGIN
	SET NOCOUNT ON

	INSERT  INTO BaseTable
	VALUES (NEXT VALUE FOR CallIdsForTest, 'But I must explain to you how all this mistaken idea of denouncing of a pleasure and praising pain was born and I will give you a complete account of the system, and expound the actual teachings of the great explorer of the truth, the master-builder of human happiness. No one rejects, dislikes, or avoids pleasure itself, because it is pleasure, but because those who do not know how to pursue pleasure rationally encounter consequences that are extremely painful. Nor again is there anyone who loves or pursues or desires to obtain pain of itself, because it is pain, but occasionally circumstances occur in which toil and pain can procure him some great pleasure. To take a trivial example, which of us ever undertakes laborious physical exercise, except to obtain some advantage from it? But who has any right to find fault with a man who chooses to enjoy a pleasure that has no annoying consequences, or one who avoids a pain that produces no resultant pleasure?')
END;

To simulate workload, we use the OStress utility from the RML Utilities package to simulate a number of connections. The specific OStress command line used is shown below:

ostress -S.\SQL2017 -dmydb -Q"exec InsertCall" -n100 -q

Each execution of the stored procedure should eventually result in 84 n-gram values being populated in the NGramTable. So with the above stress test, we should see 8400 records in the table. When we run the OStress command line, the 100 concurrent calls to the InsertCall stored procedure finish very quickly – in 1.403 seconds. This is good, because we do not want the front-end stored procedure to block in any way. The time it takes the NGramTable to be completely populated with the 8400 records is around 88 seconds. Almost all that time is actually spent in Python.exe crunching away at the text (converting to lower case, tokenizing, generating n-grams etc.)

Resource Governance

When running the above stress test (with default settings for SQL Server), you will notice that the Python.exe instances end up using almost 100% of the CPU on the machine. This is expected because text processing is a CPU-bound activity. To balance the resource usage and restrict the external Python processes from using too much CPU, we can use the external resource governor setting for maximum CPU used by external instances of R / Python. It is as easy as running the code below, no restart required!

ALTER EXTERNAL RESOURCE POOL [default] WITH (max_cpu_percent=30, AFFINITY CPU = AUTO)

ALTER RESOURCE GOVERNOR RECONFIGURE;

After setting this external pool CPU limit to 30%, the overall CPU usage in the system is reduced, with the obvious tradeoff in overall latency in populating the n-grams into their table. In this case, the total time to populate the 8400 n-grams is around 234 seconds, which when you think about it is just right (we would expect processing time to increase to around 3x the old duration, given we have just 30% of CPU resources.)

So by adjusting the Resource Governor limit you can achieve the right balance between resource usage on the system and external script processing time. More information about Resource Pools for external scripts is here and here.

Summary

In the actual Sayint application, the ability to query for n-grams containing a specific keyword is crucial. With this workflow within SQL Server 2017, that requirement is elegantly accomplished using a combination of several SQL Server features alongside the in-DB Python code – with the advantages of lower latency, data security and no data duplication.