Load Stock Trading Data into SingleStore
On this page
This tutorial shows how to run analytical queries with low latency on a SingleStore database containing over 5 million rows of simulated stock trading data.
Create Database and Tables
Connect to your SingleStore deployment and run the following SQL commands to create a database named trades and two associated tables.
For SingleStore Helios deployments, you can also run these commands in the SQL Editor, select Cloud Portal > Develop > Data Studio > SQL Editor.
Note
The SQL Editor only runs the queries that you select, so ensure you have them all selected before selecting Run.
DROP DATABASE IF EXISTS trades;CREATE DATABASE trades;USE trades;CREATE TABLE company (symbol char(5) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,name varchar(500) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,last_sale varchar(10) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,market_cap varchar(15) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,IPO_year float DEFAULT NULL,sector varchar(80) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,industry varchar(80) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,summary_quote varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,extra varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,SORT KEY __UNORDERED (), SHARD KEY ());CREATE TABLE trade (id bigint(20) NOT NULL,stock_symbol char(5) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,shares decimal(18,4) NOT NULL,share_price decimal(18,4) NOT NULL,trade_time datetime(6) NOT NULL,SORT KEY stock_symbol (stock_symbol),SHARD KEY __SHARDKEY (stock_symbol));
Create Functions
This part of the tutorial shows how to create functions that are used to develop the stored procedures.
Note
The SQL Editor only runs the queries that you select, so ensure you have them all selected before selecting Run.
Run the following SQL commands to create the user-defined functions (UDFs):
USE trades;DELIMITER //CREATE OR REPLACE FUNCTION marketcap_to_DECIMAL(s varchar(15) CHARACTER SET utf8 COLLATE utf8_general_ci NULL) RETURNS decimal(18,2) NULL ASDECLAREm CHAR(1) = SUBSTR(s, LENGTH(s), 1);raw_v DECIMAL(18,2) = SUBSTR(s, 2, LENGTH(s) - 1);v DECIMAL(18,2) = NULL;BEGINIF m = "B" THENv = raw_v * 1000;ELSEv = raw_v;END IF;RETURN v;END //DELIMITER ;
Create Stored Procedures
This part of the tutorial shows how to create stored procedures that are used to load data using pipelines.
Note
The SQL Editor only runs the queries that you select, so ensure you have them all selected before selecting Run.
Run the following SQL commands to create the stored procedures:
USE trades;DELIMITER //CREATE OR REPLACE PROCEDURE iter_stocks(iterations int(11) NULL) RETURNS void ASDECLAREtickers ARRAY(CHAR(5));prices ARRAY(DECIMAL(18,4));last_ids ARRAY(bigINT);counts ARRAY(INT);next_id bigINT = 1;ticker CHAR(5);price DECIMAL(18,4);c INT;rand DECIMAL(18,4);tickers_q QUERY(t CHAR(5), p DECIMAL(18,4), lid BIGINT, c INT) = SELECT stock_symbol, share_price, MIN(id), COUNT(*) FROM trade GROUP BY stock_symbol;q ARRAY(RECORD(t CHAR(5), p DECIMAL(18,4), lid bigINT, c INT));q_count QUERY(c INT) = SELECT COUNT(*) FROM trade;total_c INT;BEGINq = COLLECT(tickers_q);tickers = CREATE_ARRAY(LENGTH(q));prices = CREATE_ARRAY(LENGTH(q));last_ids = CREATE_ARRAY(LENGTH(q));counts = CREATE_ARRAY(LENGTH(q));total_c = SCALAR(q_count);FOR r IN 0..LENGTH(q)-1 LOOPtickers[r] = q[r].t;prices[r] = q[r].p;last_ids[r] = q[r].lid;counts[r] = q[r].c;END LOOP;FOR j IN 0..(iterations-1) LOOPFOR i IN 0..LENGTH(tickers)-1 LOOPticker = tickers[i];price = prices[i];next_id = last_ids[i];c = counts[i];rand = POW(-1, FLOOR(RAND()*2)) * RAND();INSERT INTO tradeSELECT id + total_c, stock_symbol, shares, share_price + rand, trade_time FROM trade WHERE stock_symbol = ticker AND id >= next_id;prices[i] = price + rand;last_ids[i] = next_id + total_c;END LOOP;END LOOP;END //CREATE OR REPLACE PROCEDURE seed_trades(num_trades int(11) NULL) RETURNS int(11) NULL ASDECLAREranked_companies ARRAY(RECORD(symbol CHAR(5), _rank INT));DECLAREq QUERY(symbol CHAR(5), _rank INT) =SELECT symbol, rank() OVER (ORDER BY marketcap_to_DECIMAL(market_cap)) AS _rankFROM companyWHERE LENGTH(symbol) < 5ORDER BY _rank DESC LIMIT 200;i INT = 0;rank_num INT;next_id INT = 1;sym CHAR(5);price_base DECIMAL(18,4);current_prices ARRAY(INT);l ARRAY(RECORD(symbol CHAR(5), _rank INT));BEGINl = collect(q);FOR r IN l LOOPi += 1;rank_num = r._rank;sym = r.symbol;price_base = FLOOR(rand() * 50) + 50;FOR j IN 1..((rank_num / 10) + RAND() * 10) LOOPINSERT trade VALUES(next_id,sym,FLOOR(1 + RAND() * 10) * 100,price_base,DATE_ADD(NOW(), INTERVAL RAND() * 6 HOUR));next_id += 1;IF next_id > num_trades THEN RETURN(next_id); END IF;END LOOP;END LOOP;RETURN(next_id);END //DELIMITER ;
Load Data with Pipelines
This part of the tutorial shows how to ingest stock trades data from a public AWS S3 bucket into the SingleStore database using pipelines.
Note
The SQL Editor only runs the queries that you select, so ensure you have them all selected before selecting Run.
-
Run the following SQL commands to create the pipelines:
USE trades;CREATE OR REPLACE PIPELINE companyAS LOAD DATA S3 's3://singlestore-docs-example-datasets/trades/company.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE company;CREATE OR REPLACE PIPELINE tradeAS LOAD DATA S3 's3://singlestore-docs-example-datasets/trades/trade.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE trade; -
Run the following SQL commands to start the pipelines:
USE trades;START ALL PIPELINES;
Once the Success message is returned for all the created pipelines, SingleStore starts ingesting the data from the S3 bucket.
Verify the Pipeline Success
Query the pipelines_
information schema view to inspect the progress of the pipelines.company
pipeline has finished ingesting data.
SELECT * FROM information_schema.pipelines_filesWHERE pipeline_name = "company";
You can also run the pipelines in foreground to easily verify that all the data has been ingested.
START PIPELINE <pipeline_name> FOREGROUND;
For SingleStore Helios deployments, you can also monitor the progress of your pipelines on the Cloud Portal, select Deployments > <your_
Run Queries on Data
You can start running queries once the data is loaded into your SingleStore database.
Query 1: Finds the most traded stocks
This query finds the most traded stocks.
USE trades;SELECT stock_symbol, COUNT(*) AS cFROM tradeGROUP BY stock_symbolORDER BY c DESC LIMIT 5;
+--------------+-------+
| stock_symbol | c |
+--------------+-------+
| TIG | 30401 |
| FB | 30300 |
| MNGA | 30300 |
| QCOM | 30199 |
| KHC | 30199 |
+--------------+-------+
5 rows in set (0.11 sec)
Query 2: Finds the most volatile stocks
This query finds the most volatile stocks which have the highest variance in prices.
USE trades;SELECT stock_symbol, VARIANCE(share_price) varFROM tradeGROUP BY stock_symbolORDER BY var DESCLIMIT 5; /* Remove LIMIT to see full result*/
+--------------+-------------+
| stock_symbol | var |
+--------------+-------------+
| IBKR | 32.24672973 |
| XRAY | 28.09745295 |
| ODFL | 25.18033964 |
| ALGN | 24.47217863 |
| VXUS | 24.11941421 |
+--------------+-------------+
5 rows in set (1.74 sec)
Query 3: Portfolio Aggregation
This is a portfolio aggregation query that uses Common Table Expression (CTE), JOIN, and window functions.
USE trades;WITH folio AS (SELECT id, stock_symbol, shares, share_price, trade_timeFROM trade),AggCalcs AS (SELECTstock_symbol AS ACsymb,MAX(share_price) AS pmax,MIN(share_price) AS pmin,STD(share_price) AS pstd,SUM(share_price*shares)/SUM(shares) AS avg_pps, ## Weighted AverageSUM(share_price*shares) AS total_pvalueFROM tradeGROUP BY 1)SELECTDISTINCT folio.stock_symbol,avg_pps,pmin,pmax,percentile_cont(.25) WITHIN group (ORDER BY share_price) OVER (PARTITION BY stock_symbol) AS Q1,percentile_cont(.5) WITHIN group (ORDER BY share_price) OVER (PARTITION BY stock_symbol) AS median,percentile_cont(.75) WITHIN group (ORDER BY share_price) OVER (PARTITION BY stock_symbol) AS Q3FROM folioJOIN AggCalcs ON (folio.stock_symbol = ACsymb)ORDER BY folio.stock_symbolLIMIT 5; /* Remove LIMIT to see full result*/
+--------------+-----------------+---------+---------+-------------+-------------+-------------+
| stock_symbol | avg_pps | pmin | pmax | Q1 | median | Q3 |
+--------------+-----------------+---------+---------+-------------+-------------+-------------+
| AABA | 90.104679207921 | 86.7900 | 93.5923 | 88.89430000 | 90.02250000 | 91.32900000 |
| AAL | 81.627300000000 | 78.1108 | 85.8548 | 79.65410000 | 80.57210000 | 84.51420000 |
| AAPL | 74.203350495050 | 71.3886 | 77.0869 | 73.46370000 | 74.15330000 | 75.05420000 |
| ABMD | 77.400400990099 | 70.7616 | 85.1131 | 73.57460000 | 76.65900000 | 80.89240000 |
| ACGL | 73.807070297030 | 70.4365 | 79.1696 | 72.21950000 | 73.09320000 | 75.56110000 |
+--------------+-----------------+---------+---------+-------------+-------------+-------------+
5 rows in set (3.78 sec)
Last modified: October 28, 2024