# Load Stock Trading Data into SingleStore

This tutorial shows how to run analytical queries with low latency on a SingleStore database containing over 5 million rows of simulated stock trading data.

## Prerequisites

Connect to a SingleStore Helios deployment or a SingleStore instance deployed on a self-managed cluster running on AWS.

## Create Database and Tables

Connect to your SingleStore deployment and run the following SQL commands to create a database named **trades** and two associated tables.

For SingleStore Helios deployments, you can also run these commands in the **SQL Editor**, select **Cloud Portal > Editor > SQL Editor**.

> **📝 Note**: The **SQL Editor** only runs the queries that you select, so ensure you have them all selected before selecting **Run**.

```sql
DROP DATABASE IF EXISTS trades;
CREATE DATABASE trades;
USE trades;

CREATE TABLE company (
  symbol char(5) COLLATE utf8mb4_bin NOT NULL,
  name varchar(500) COLLATE utf8mb4_bin DEFAULT NULL,
  last_sale varchar(10) COLLATE utf8mb4_bin DEFAULT NULL,
  market_cap varchar(15) COLLATE utf8mb4_bin DEFAULT NULL,
  IPO_year float DEFAULT NULL,
  sector varchar(80) COLLATE utf8mb4_bin DEFAULT NULL,
  industry varchar(80) COLLATE utf8mb4_bin DEFAULT NULL,
  summary_quote varchar(50) COLLATE utf8mb4_bin DEFAULT NULL,
  extra varchar(50) COLLATE utf8mb4_bin DEFAULT NULL,
  SORT KEY __UNORDERED ()
  , SHARD KEY () 
);

CREATE TABLE trade (
  id bigint(20) NOT NULL,
  stock_symbol char(5) COLLATE utf8mb4_bin NOT NULL,
  shares decimal(18,4) NOT NULL,
  share_price decimal(18,4) NOT NULL,
  trade_time datetime(6) NOT NULL,
  SORT KEY stock_symbol (stock_symbol),
  SHARD KEY __SHARDKEY (stock_symbol)
);

```

## Create Functions

This part of the tutorial shows how to create functions that are used to develop the stored procedures. This tutorial uses user-defined functions (UDFs). For related information, refer to [CREATE FUNCTION (UDF)](https://docs.singlestore.com/cloud/reference/sql-reference/procedural-sql-reference/create-function-udf.md).

> **📝 Note**: The **SQL Editor** only runs the queries that you select, so ensure you have them all selected before selecting **Run**.

Run the following SQL commands to create the user-defined functions (UDFs):

```sql
USE trades;

DELIMITER //

CREATE OR REPLACE FUNCTION marketcap_to_DECIMAL(s varchar(15) CHARACTER SET utf8 COLLATE utf8_general_ci NULL) RETURNS decimal(18,2) NULL AS
   DECLARE
     m CHAR(1) = SUBSTR(s, LENGTH(s), 1);
     raw_v DECIMAL(18,2) = SUBSTR(s, 2, LENGTH(s) - 1);
     v DECIMAL(18,2) = NULL;
   BEGIN
       IF m = "B" THEN
         v = raw_v * 1000;
       ELSE
         v = raw_v;
       END IF;
     RETURN v;
   END //

DELIMITER ;
```

## Create Stored Procedures

This part of the tutorial shows how to create stored procedures that are used to load data using pipelines.

> **📝 Note**: The **SQL Editor** only runs the queries that you select, so ensure you have them all selected before selecting **Run**.

Run the following SQL commands to create the stored procedures:

```sql
USE trades;

DELIMITER //

CREATE OR REPLACE PROCEDURE iter_stocks(iterations int(11) NULL) RETURNS void AS
    DECLARE
         tickers ARRAY(CHAR(5));
         prices ARRAY(DECIMAL(18,4));
         last_ids ARRAY(bigINT);
         counts ARRAY(INT);
         next_id bigINT = 1;
         ticker CHAR(5);
         price DECIMAL(18,4);
         c INT;
         rand DECIMAL(18,4);
         tickers_q QUERY(t CHAR(5), p DECIMAL(18,4), lid BIGINT, c INT) = SELECT stock_symbol, share_price, MIN(id), COUNT(*) FROM trade GROUP BY stock_symbol;
         q ARRAY(RECORD(t CHAR(5), p DECIMAL(18,4), lid bigINT, c INT));
         q_count QUERY(c INT) = SELECT COUNT(*) FROM trade;
         total_c INT;
    BEGIN
         q = COLLECT(tickers_q);
         tickers = CREATE_ARRAY(LENGTH(q));
         prices = CREATE_ARRAY(LENGTH(q));
         last_ids = CREATE_ARRAY(LENGTH(q));
         counts = CREATE_ARRAY(LENGTH(q));
         total_c = SCALAR(q_count);
         FOR r IN 0..LENGTH(q)-1 LOOP
              tickers[r] = q[r].t;
              prices[r] = q[r].p;
              last_ids[r] = q[r].lid;
              counts[r] = q[r].c;
         END LOOP;
         FOR j IN 0..(iterations-1) LOOP
              FOR i IN 0..LENGTH(tickers)-1 LOOP
                   ticker = tickers[i];
                   price = prices[i];
                   next_id = last_ids[i];
                   c = counts[i];
                   rand = POW(-1, FLOOR(RAND()*2)) * RAND();
                   INSERT INTO trade
                   SELECT id + total_c, stock_symbol, shares, share_price + rand, trade_time FROM trade WHERE stock_symbol = ticker AND id >= next_id;
                   prices[i] = price + rand;
                   last_ids[i] = next_id + total_c;
              END LOOP;
         END LOOP;
    END //

CREATE OR REPLACE PROCEDURE seed_trades(num_trades int(11) NULL) RETURNS int(11) NULL AS
    DECLARE
         ranked_companies ARRAY(RECORD(symbol CHAR(5), _rank INT));
    DECLARE
         q QUERY(symbol CHAR(5), _rank INT) =
              SELECT symbol, rank() OVER (ORDER BY marketcap_to_DECIMAL(market_cap)) AS _rank
              FROM company
              WHERE LENGTH(symbol) < 5
              ORDER BY _rank DESC LIMIT 200;
         i INT = 0;
         rank_num INT;
         next_id INT = 1;
         sym CHAR(5);
         price_base DECIMAL(18,4);
         current_prices ARRAY(INT);
         l ARRAY(RECORD(symbol CHAR(5), _rank INT));
    BEGIN
         l = collect(q);
         FOR r IN l LOOP
              i += 1;
              rank_num = r._rank;
              sym = r.symbol;
              price_base = FLOOR(rand() * 50) + 50;
              FOR j IN 1..((rank_num / 10) + RAND() * 10) LOOP
                   INSERT trade VALUES(
                        next_id,
                        sym,
                        FLOOR(1 + RAND() * 10) * 100,
                        price_base,
                        DATE_ADD(NOW(), INTERVAL RAND() * 6 HOUR));
                   next_id += 1;
                   IF next_id > num_trades THEN RETURN(next_id); END IF;
              END LOOP;
         END LOOP;
         RETURN(next_id);
    END //


DELIMITER ;

```

## Load Data with Pipelines

This part of the tutorial shows how to ingest stock trades data from a public AWS S3 bucket into the SingleStore database using pipelines.

> **📝 Note**: The **SQL Editor** only runs the queries that you select, so ensure you have them all selected before selecting **Run**.

1. Run the following SQL commands to create the pipelines:
   ```sql
   USE trades;

   CREATE OR REPLACE PIPELINE company
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/trades/company.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE company;

   CREATE OR REPLACE PIPELINE trade
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/trades/trade.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE trade;
   ```

2. Run the following SQL commands to start the pipelines:
   ```sql
   USE trades;
   START ALL PIPELINES;

   ```

Once the **Success** message is returned for all the created pipelines, SingleStore starts ingesting the data from the S3 bucket.

## Verify the Pipeline Success

Query the `pipelines_files` information schema view to inspect the progress of the pipelines. For example, run the following command to check whether the `company` pipeline has finished ingesting data.

```sql
SELECT * FROM information_schema.pipelines_files 
WHERE pipeline_name = "company";
```

You can also run the pipelines in foreground to easily verify that all the data has been ingested.

```sql
START PIPELINE <pipeline_name> FOREGROUND;
```

For SingleStore Helios deployments, you can also monitor the progress of your pipelines on the Cloud Portal, select **Ingestion > Pipelines**, and then select a deployment from the list.

## Run Queries on Data

You can start running queries once the data is loaded into your SingleStore database. Here are some example queries to be used with the Stocks Trading dataset.

## Query 1: Finds the most traded stocks

This query finds the most traded stocks.

```sql
USE trades;

SELECT stock_symbol, COUNT(*) AS c
    FROM trade
    GROUP BY stock_symbol
    ORDER BY c DESC LIMIT 5;


```

```output

+--------------+-------+
| stock_symbol | c     |
+--------------+-------+
| TIG          | 30401 |
| FB           | 30300 |
| MNGA         | 30300 |
| QCOM         | 30199 |
| KHC          | 30199 |
+--------------+-------+

```

## Query 2: Finds the most volatile stocks

This query finds the most volatile stocks which have the highest variance in prices.

```sql
USE trades;

SELECT stock_symbol, VARIANCE(share_price) var
    FROM trade
    GROUP BY stock_symbol
    ORDER BY var DESC 
    LIMIT 5; /* Remove LIMIT to see full result*/


```

```output

+--------------+-------------+
| stock_symbol | var         |
+--------------+-------------+
| IBKR         | 32.24672973 |
| XRAY         | 28.09745295 |
| ODFL         | 25.18033964 |
| ALGN         | 24.47217863 |
| VXUS         | 24.11941421 |
+--------------+-------------+

```

## Query 3: Portfolio Aggregation

This is a portfolio aggregation query that uses Common Table Expression (CTE), JOIN, and window functions. It also computes minimum, maximum, standard deviation, weighted average, and percentiles for each company stock.

```sql
USE trades;

WITH folio AS (
    SELECT id, stock_symbol, shares, share_price, trade_time
    FROM trade
  ),
AggCalcs AS (
    SELECT
        stock_symbol AS ACsymb,
        MAX(share_price) AS pmax,
        MIN(share_price) AS pmin,
        STD(share_price) AS pstd,
        SUM(share_price*shares)/SUM(shares) AS avg_pps,  ## Weighted Average
        SUM(share_price*shares) AS total_pvalue
    FROM trade
    GROUP BY 1
)
SELECT
    DISTINCT folio.stock_symbol,
    avg_pps,
    pmin,
    pmax,
    percentile_cont(.25) WITHIN group (ORDER BY share_price) OVER (PARTITION BY stock_symbol) AS Q1,
    percentile_cont(.5) WITHIN group (ORDER BY share_price) OVER (PARTITION BY stock_symbol)  AS median,
    percentile_cont(.75) WITHIN group (ORDER BY share_price) OVER (PARTITION BY stock_symbol) AS Q3
FROM folio
JOIN AggCalcs ON (folio.stock_symbol = ACsymb)
ORDER BY folio.stock_symbol
LIMIT 5; /* Remove LIMIT to see full result*/


```

```output

+--------------+-----------------+---------+---------+-------------+-------------+-------------+
| stock_symbol | avg_pps         | pmin    | pmax    | Q1          | median      | Q3          |
+--------------+-----------------+---------+---------+-------------+-------------+-------------+
| AABA         | 90.104679207921 | 86.7900 | 93.5923 | 88.89430000 | 90.02250000 | 91.32900000 |
| AAL          | 81.627300000000 | 78.1108 | 85.8548 | 79.65410000 | 80.57210000 | 84.51420000 |
| AAPL         | 74.203350495050 | 71.3886 | 77.0869 | 73.46370000 | 74.15330000 | 75.05420000 |
| ABMD         | 77.400400990099 | 70.7616 | 85.1131 | 73.57460000 | 76.65900000 | 80.89240000 |
| ACGL         | 73.807070297030 | 70.4365 | 79.1696 | 72.21950000 | 73.09320000 | 75.56110000 |
+--------------+-----------------+---------+---------+-------------+-------------+-------------+

```

***

Modified at: October 28, 2024

Source: [/cloud/getting-started-with-singlestore-helios/next-steps-and-examples/sample-data/load-stock-trading-data-into-singlestore/](https://docs.singlestore.com/cloud/getting-started-with-singlestore-helios/next-steps-and-examples/sample-data/load-stock-trading-data-into-singlestore/)

(An index of the documentation is available at /llms.txt)
