Load Stock Trading Data into SingleStore

This tutorial shows how to run analytical queries with low latency on a SingleStore database containing over 5 million rows of simulated stock trading data.

Prerequisites

Connect to a SingleStore Helios deployment or a SingleStore instance deployed on a self-managed cluster running on AWS.

Create Database and Tables

Connect to your SingleStore deployment and run the following SQL commands to create a database named trades and two associated tables.

For SingleStore Helios deployments, you can also run these commands in the SQL Editor, select Cloud Portal > Develop > Data Studio > SQL Editor.

Note

The SQL Editor only runs the queries you have selected, so make sure you have them all selected before selecting Run.

DROP DATABASE IF EXISTS trades;
CREATE DATABASE trades;
USE trades;
CREATE TABLE company (
symbol char(5) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
name varchar(500) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
last_sale varchar(10) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
market_cap varchar(15) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
IPO_year float DEFAULT NULL,
sector varchar(80) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
industry varchar(80) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
summary_quote varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
extra varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
SORT KEY __UNORDERED ()
, SHARD KEY ()
);
CREATE TABLE trade (
id bigint(20) NOT NULL,
stock_symbol char(5) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
shares decimal(18,4) NOT NULL,
share_price decimal(18,4) NOT NULL,
trade_time datetime(6) NOT NULL,
SORT KEY stock_symbol (stock_symbol),
SHARD KEY __SHARDKEY (stock_symbol)
);

Create Functions

This part of the tutorial shows how to create functions that are used to develop the stored procedures. This tutorial uses user-defined functions (UDFs). For related information, refer to CREATE FUNCTION (UDF).

Note

The SQL Editor only runs the queries you have selected, so make sure you have them all selected before selecting Run.

Run the following SQL commands to create the user-defined functions (UDFs):

USE trades;
DELIMITER //
CREATE OR REPLACE FUNCTION marketcap_to_DECIMAL(s varchar(15) CHARACTER SET utf8 COLLATE utf8_general_ci NULL) RETURNS decimal(18,2) NULL AS
DECLARE
m CHAR(1) = SUBSTR(s, LENGTH(s), 1);
raw_v DECIMAL(18,2) = SUBSTR(s, 2, LENGTH(s) - 1);
v DECIMAL(18,2) = NULL;
BEGIN
IF m = "B" THEN
v = raw_v * 1000;
ELSE
v = raw_v;
END IF;
RETURN v;
END //
DELIMITER ;

Create Stored Procedures

This part of the tutorial shows how to create stored procedures that are used to load data using pipelines.

Note

The SQL Editor only runs the queries you have selected, so make sure you have them all selected before selecting Run.

Run the following SQL commands to create the stored procedures:

USE trades;
DELIMITER //
CREATE OR REPLACE PROCEDURE iter_stocks(iterations int(11) NULL) RETURNS void AS
DECLARE
tickers ARRAY(CHAR(5));
prices ARRAY(DECIMAL(18,4));
last_ids ARRAY(bigINT);
counts ARRAY(INT);
next_id bigINT = 1;
ticker CHAR(5);
price DECIMAL(18,4);
c INT;
rand DECIMAL(18,4);
tickers_q QUERY(t CHAR(5), p DECIMAL(18,4), lid BIGINT, c INT) = SELECT stock_symbol, share_price, MIN(id), COUNT(*) FROM trade GROUP BY stock_symbol;
q ARRAY(RECORD(t CHAR(5), p DECIMAL(18,4), lid bigINT, c INT));
q_count QUERY(c INT) = SELECT COUNT(*) FROM trade;
total_c INT;
BEGIN
q = COLLECT(tickers_q);
tickers = CREATE_ARRAY(LENGTH(q));
prices = CREATE_ARRAY(LENGTH(q));
last_ids = CREATE_ARRAY(LENGTH(q));
counts = CREATE_ARRAY(LENGTH(q));
total_c = SCALAR(q_count);
FOR r IN 0..LENGTH(q)-1 LOOP
tickers[r] = q[r].t;
prices[r] = q[r].p;
last_ids[r] = q[r].lid;
counts[r] = q[r].c;
END LOOP;
FOR j IN 0..(iterations-1) LOOP
FOR i IN 0..LENGTH(tickers)-1 LOOP
ticker = tickers[i];
price = prices[i];
next_id = last_ids[i];
c = counts[i];
rand = POW(-1, FLOOR(RAND()*2)) * RAND();
INSERT INTO trade
SELECT id + total_c, stock_symbol, shares, share_price + rand, trade_time FROM trade WHERE stock_symbol = ticker AND id >= next_id;
prices[i] = price + rand;
last_ids[i] = next_id + total_c;
END LOOP;
END LOOP;
END //
CREATE OR REPLACE PROCEDURE seed_trades(num_trades int(11) NULL) RETURNS int(11) NULL AS
DECLARE
ranked_companies ARRAY(RECORD(symbol CHAR(5), _rank INT));
DECLARE
q QUERY(symbol CHAR(5), _rank INT) =
SELECT symbol, rank() OVER (ORDER BY marketcap_to_DECIMAL(market_cap)) AS _rank
FROM company
WHERE LENGTH(symbol) < 5
ORDER BY _rank DESC LIMIT 200;
i INT = 0;
rank_num INT;
next_id INT = 1;
sym CHAR(5);
price_base DECIMAL(18,4);
current_prices ARRAY(INT);
l ARRAY(RECORD(symbol CHAR(5), _rank INT));
BEGIN
l = collect(q);
FOR r IN l LOOP
i += 1;
rank_num = r._rank;
sym = r.symbol;
price_base = FLOOR(rand() * 50) + 50;
FOR j IN 1..((rank_num / 10) + RAND() * 10) LOOP
INSERT trade VALUES(
next_id,
sym,
FLOOR(1 + RAND() * 10) * 100,
price_base,
DATE_ADD(NOW(), INTERVAL RAND() * 6 HOUR));
next_id += 1;
IF next_id > num_trades THEN RETURN(next_id); END IF;
END LOOP;
END LOOP;
RETURN(next_id);
END //
DELIMITER ;

Load Data with Pipelines

This part of the tutorial shows how to ingest stock trades data from a public AWS S3 bucket into the SingleStore database using pipelines.

Note

The SQL Editor only runs the queries you have selected, so make sure you have them all selected before selecting Run.

  1. Run the following SQL commands to create the pipelines:

    USE trades;
    CREATE OR REPLACE PIPELINE company
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/trades/company.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE company;
    CREATE OR REPLACE PIPELINE trade
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/trades/trade.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE trade;
  2. Run the following SQL commands to start the pipelines:

    USE trades;
    START ALL PIPELINES;

Once the Success message is returned for all the created pipelines, SingleStore starts ingesting the data from the S3 bucket.

Verify the Pipeline Success

Query the pipelines_files information schema view to inspect the progress of the pipelines. For example, run the following command to check whether the company pipeline has finished ingesting data.

SELECT * FROM information_schema.pipelines_files
WHERE pipeline_name = "company";

You can also run the pipelines in foreground to easily verify that all the data has been ingested.

START PIPELINE <pipeline_name> FOREGROUND;

For SingleStore Helios deployments, you can also monitor the progress of your pipelines on the Cloud Portal, select Deployments > <your_workspace_group> > Databases > trades > Pipelines.

Run Queries on Data

You can start running queries once the data is loaded into your SingleStore database. Here are some example queries to be used with the Stocks Trading dataset.

Query 1: Finds the most traded stocks

This query finds the most traded stocks.

USE trades;
SELECT stock_symbol, COUNT(*) AS c
FROM trade
GROUP BY stock_symbol
ORDER BY c DESC LIMIT 5;
+--------------+-------+
| stock_symbol | c     |
+--------------+-------+
| TIG          | 30401 |
| FB           | 30300 |
| MNGA         | 30300 |
| QCOM         | 30199 |
| KHC          | 30199 |
+--------------+-------+
5 rows in set (0.11 sec)

Query 2: Finds the most volatile stocks

This query finds the most volatile stocks which have the highest variance in prices.

USE trades;
SELECT stock_symbol, VARIANCE(share_price) var
FROM trade
GROUP BY stock_symbol
ORDER BY var DESC
LIMIT 5; /* Remove LIMIT to see full result*/
+--------------+-------------+
| stock_symbol | var         |
+--------------+-------------+
| IBKR         | 32.24672973 |
| XRAY         | 28.09745295 |
| ODFL         | 25.18033964 |
| ALGN         | 24.47217863 |
| VXUS         | 24.11941421 |
+--------------+-------------+
5 rows in set (1.74 sec)

Query 3: Portfolio Aggregation

This is a portfolio aggregation query that uses Common Table Expression (CTE), JOIN, and window functions. It also computes minimum, maximum, standard deviation, weighted average, and percentiles for each company stock.

USE trades;
WITH folio AS (
SELECT id, stock_symbol, shares, share_price, trade_time
FROM trade
),
AggCalcs AS (
SELECT
stock_symbol AS ACsymb,
MAX(share_price) AS pmax,
MIN(share_price) AS pmin,
STD(share_price) AS pstd,
SUM(share_price*shares)/SUM(shares) AS avg_pps, ## Weighted Average
SUM(share_price*shares) AS total_pvalue
FROM trade
GROUP BY 1
)
SELECT
DISTINCT folio.stock_symbol,
avg_pps,
pmin,
pmax,
percentile_cont(.25) WITHIN group (ORDER BY share_price) OVER (PARTITION BY stock_symbol) AS Q1,
percentile_cont(.5) WITHIN group (ORDER BY share_price) OVER (PARTITION BY stock_symbol) AS median,
percentile_cont(.75) WITHIN group (ORDER BY share_price) OVER (PARTITION BY stock_symbol) AS Q3
FROM folio
JOIN AggCalcs ON (folio.stock_symbol = ACsymb)
ORDER BY folio.stock_symbol
LIMIT 5; /* Remove LIMIT to see full result*/
+--------------+-----------------+---------+---------+-------------+-------------+-------------+
| stock_symbol | avg_pps         | pmin    | pmax    | Q1          | median      | Q3          |
+--------------+-----------------+---------+---------+-------------+-------------+-------------+
| AABA         | 90.104679207921 | 86.7900 | 93.5923 | 88.89430000 | 90.02250000 | 91.32900000 |
| AAL          | 81.627300000000 | 78.1108 | 85.8548 | 79.65410000 | 80.57210000 | 84.51420000 |
| AAPL         | 74.203350495050 | 71.3886 | 77.0869 | 73.46370000 | 74.15330000 | 75.05420000 |
| ABMD         | 77.400400990099 | 70.7616 | 85.1131 | 73.57460000 | 76.65900000 | 80.89240000 |
| ACGL         | 73.807070297030 | 70.4365 | 79.1696 | 72.21950000 | 73.09320000 | 75.56110000 |
+--------------+-----------------+---------+---------+-------------+-------------+-------------+
5 rows in set (3.78 sec)

Last modified: October 28, 2024

Was this article helpful?