# Load MarTech Data into SingleStore

This tutorial creates a MarTech data application that delivers ads to users based on their behavior and location history. This application tracks user preferences, purchases, and geolocation to provide instant offers based on their location. The application enables you to monitor conversion rates and purchases, allowing you to add locations.

## Prerequisites

Connect to a SingleStore Helios deployment or a SingleStore instance deployed on a self-managed cluster running on AWS.

## Create Database and Tables

Connect to your SingleStore Helios deployment and run the following SQL commands to create a database named **martech** and twelve associated tables.

For SingleStore Helios deployments, you can also run these commands in the **SQL Editor**, select **Cloud Portal > Editor** > **Open SQL Editor**.

> **📝 Note**: The **SQL Editor** only runs the queries that you select, so ensure you have them all selected before selecting **Run**.

```sql
DROP DATABASE IF EXISTS martech;
CREATE DATABASE martech;
USE martech;

CREATE ROWSTORE REFERENCE TABLE cities (
  city_id bigint(20) NOT NULL,
  city_name text COLLATE utf8mb4_bin NOT NULL,
  center geographypoint NOT NULL,
  diameter double DEFAULT NULL,
  PRIMARY KEY (city_id)
); 

CREATE TABLE locations (
  city_id bigint(20) NOT NULL,
  subscriber_id bigint(20) NOT NULL,
  ts datetime(6) NOT NULL SERIES TIMESTAMP,
  lonlat geographypoint NOT NULL,
  olc_8 text COLLATE utf8mb4_bin NOT NULL,
  SHARD KEY __SHARDKEY (city_id,subscriber_id),
  SORT KEY ts (ts),
  KEY city_id (city_id,subscriber_id) USING HASH,
  KEY olc_8 (olc_8) USING HASH
);

CREATE TABLE notifications (
  ts datetime(6) NOT NULL SERIES TIMESTAMP,
  city_id bigint(20) NOT NULL,
  subscriber_id bigint(20) NOT NULL,
  offer_id bigint(20) NOT NULL,
  cost_cents bigint(20) NOT NULL,
  lonlat geographypoint NOT NULL,
  SHARD KEY __SHARDKEY (city_id,subscriber_id),
  SORT KEY ts (ts)
);

CREATE ROWSTORE REFERENCE TABLE offers (
  offer_id bigint(20) NOT NULL AUTO_INCREMENT,
  customer text COLLATE utf8mb4_bin NOT NULL,
  enabled tinyint(1) NOT NULL DEFAULT 1,
  notification_zone geography NOT NULL,
  segment_ids JSON COLLATE utf8mb4_bin NOT NULL,
  notification_content text COLLATE utf8mb4_bin NOT NULL,
  notification_target text COLLATE utf8mb4_bin NOT NULL,
  maximum_bid_cents bigint(20) NOT NULL,
  PRIMARY KEY (offer_id),
  KEY customer (customer),
  KEY notification_target (notification_target),
  KEY notification_zone (notification_zone) WITH (RESOLUTION = 8)
);

CREATE TABLE purchases (
  city_id bigint(20) NOT NULL,
  subscriber_id bigint(20) NOT NULL,
  ts datetime(6) NOT NULL SERIES TIMESTAMP,
  vendor text COLLATE utf8mb4_bin NOT NULL,
  SHARD KEY __SHARDKEY (city_id,subscriber_id),
  SORT KEY ts (ts),
  KEY vendor (vendor) USING HASH
);

CREATE TABLE requests (
  city_id bigint(20) NOT NULL,
  subscriber_id bigint(20) NOT NULL,
  ts datetime(6) NOT NULL SERIES TIMESTAMP,
  domain text COLLATE utf8mb4_bin NOT NULL,
  SHARD KEY __SHARDKEY (city_id,subscriber_id),
  SORT KEY ts (ts),
  KEY domain (domain) USING HASH
);

CREATE ROWSTORE REFERENCE TABLE segments (
  segment_id bigint(20) NOT NULL,
  valid_interval enum('minute','hour','day','week','month') COLLATE utf8mb4_bin NOT NULL,
  filter_kind enum('olc_8','request','purchase') COLLATE utf8mb4_bin NOT NULL,
  filter_value text COLLATE utf8mb4_bin NOT NULL,
  PRIMARY KEY (segment_id),
  UNIQUE KEY valid_interval (valid_interval,filter_kind,filter_value),
  KEY filter_kind (filter_kind,filter_value)
);

CREATE ROWSTORE TABLE sessions (
  session_id text COLLATE utf8mb4_bin NOT NULL,
  is_controller tinyint(1) NOT NULL DEFAULT 0,
  expires_at datetime(6) NOT NULL,
  PRIMARY KEY (session_id)
);

CREATE ROWSTORE TABLE subscriber_segments (
  city_id bigint(20) NOT NULL,
  subscriber_id bigint(20) NOT NULL,
  segment_id bigint(20) NOT NULL,
  expires_at datetime(6) NOT NULL,
  PRIMARY KEY (city_id,subscriber_id,segment_id),
  SHARD KEY city_id (city_id,subscriber_id)
);

CREATE ROWSTORE TABLE subscribers (
  city_id bigint(20) NOT NULL,
  subscriber_id bigint(20) NOT NULL,
  current_location geographypoint NOT NULL,
  PRIMARY KEY (city_id,subscriber_id),
  KEY current_location (current_location)
);

CREATE ROWSTORE TABLE subscribers_last_notification (
  city_id bigint(20) NOT NULL,
  subscriber_id bigint(20) NOT NULL,
  last_notification datetime(6) DEFAULT NULL,
  PRIMARY KEY (city_id,subscriber_id),
  KEY last_notification (last_notification)
);

CREATE ROWSTORE TABLE worldcities (
  city_id bigint(20) NOT NULL,
  city_name text COLLATE utf8mb4_bin NOT NULL,
  center geographypoint NOT NULL,
  PRIMARY KEY (city_id),
  KEY center (center)
);

```

## Create Functions

This part of the tutorial shows how to create functions that are used to develop the stored procedures. There are two types of functions used in this tutorial, namely user-defined functions (UDFs) and table-valued functions (TVFs). For related information, refer to [CREATE FUNCTION (UDF)](https://docs.singlestore.com/cloud/reference/sql-reference/procedural-sql-reference/create-function-udf.md) and [CREATE FUNCTION (TVF)](https://docs.singlestore.com/cloud/reference/sql-reference/procedural-sql-reference/create-function-tvf.md).

> **📝 Note**: The **SQL Editor** only runs the queries that you select, so ensure you have them all selected before selecting **Run**.

Run the following SQL commands to create the user-defined functions (UDFs):

```sql
USE martech;

DELIMITER //
CREATE OR REPLACE FUNCTION date_add_dynamic(_dt DATETIME(6), _interval enum('second','minute','hour','day','week','month')) RETURNS DATETIME(6) AS
   BEGIN
       RETURN CASE _interval
           WHEN "second" THEN _dt + INTERVAL 1 SECOND
           WHEN "minute" THEN _dt + INTERVAL 1 MINUTE
           WHEN "hour" THEN _dt + INTERVAL 1 HOUR
           WHEN "day" THEN _dt + INTERVAL 1 DAY
           WHEN "week" THEN _dt + INTERVAL 1 WEEK
           WHEN "month" THEN _dt + INTERVAL 1 MONTH
       END;
   END //

CREATE OR REPLACE FUNCTION date_sub_dynamic(_dt DATETIME(6) NULL, _interval enum('second','minute','hour','day','week','month') CHARACTER SET utf8 COLLATE utf8_general_ci NULL) RETURNS DATETIME(6) NULL AS
 BEGIN
   RETURN CASE _interval
     WHEN "second" THEN _dt - INTERVAL 1 SECOND
     WHEN "minute" THEN _dt - INTERVAL 1 MINUTE
     WHEN "hour" THEN _dt - INTERVAL 1 HOUR
     WHEN "day" THEN _dt - INTERVAL 1 DAY
     WHEN "week" THEN _dt - INTERVAL 1 WEEK
     WHEN "month" THEN _dt - INTERVAL 1 MONTH
   END;
 END //


CREATE OR REPLACE FUNCTION encode_open_location_code(_lonlat geographypoint NULL, codeLength int(11) NULL DEFAULT 12)
   RETURNS text CHARACTER SET utf8 COLLATE utf8_general_ci NULL
   AS
   DECLARE
       SEPARATOR_ text = '+';
       SEPARATOR_POSITION_ int = 8;
       PADDING_CHARACTER_ text = '0';
       CODE_ALPHABET_ text = '23456789CFGHJMPQRVWX';
       ENCODING_BASE_ int = CHARACTER_LENGTH(CODE_ALPHABET_);
       LATITUDE_MAX_ int = 90;
       LONGITUDE_MAX_ int = 180;
       MAX_DIGIT_COUNT_ int = 15;
       PAIR_CODE_LENGTH_ int = 10;
       PAIR_PRECISION_ decimal = POWER(ENCODING_BASE_, 3);
       GRID_CODE_LENGTH_ int = MAX_DIGIT_COUNT_ - PAIR_CODE_LENGTH_;
       GRID_COLUMNS_ int = 4;
       GRID_ROWS_ int = 5;
       FINAL_LAT_PRECISION_ decimal = PAIR_PRECISION_ * POWER(GRID_ROWS_, MAX_DIGIT_COUNT_ - PAIR_CODE_LENGTH_);
       FINAL_LNG_PRECISION_ decimal = PAIR_PRECISION_ * POWER(GRID_COLUMNS_, MAX_DIGIT_COUNT_ - PAIR_CODE_LENGTH_);
       latitude double = geography_latitude(_lonlat);
       longitude double = geography_longitude(_lonlat);
    code text = '';
       latVal decimal = 0;
       lngVal decimal = 0;
       latDigit smallint;
       lngDigit smallint;
       ndx smallint;
       i_ smallint;
   BEGIN
       IF ((codeLength < 2) OR ((codeLength < PAIR_CODE_LENGTH_) AND (codeLength % 2 = 1)) OR (codeLength > MAX_DIGIT_COUNT_)) THEN
           RAISE USER_EXCEPTION(CONCAT('Invalid Open Location Code length - ', codeLength));
       END IF;
        latVal = floor(round((latitude + LATITUDE_MAX_) * FINAL_LAT_PRECISION_, 6));
       lngVal = floor(round((longitude + LONGITUDE_MAX_) * FINAL_LNG_PRECISION_, 6));
        IF (codeLength > PAIR_CODE_LENGTH_) THEN
           i_ = 0;
       WHILE (i_ < (MAX_DIGIT_COUNT_ - PAIR_CODE_LENGTH_)) LOOP
           latDigit = latVal % GRID_ROWS_;
           lngDigit = lngVal % GRID_COLUMNS_;
           ndx = (latDigit * GRID_COLUMNS_) + lngDigit;
           code = concat(substr(CODE_ALPHABET_, ndx + 1, 1), code);
           latVal = latVal DIV GRID_ROWS_;
           lngVal = lngVal DIV GRID_COLUMNS_;
           i_ = i_ + 1;
       END LOOP;
       ELSE
           latVal = latVal DIV power(GRID_ROWS_, GRID_CODE_LENGTH_);
           lngVal = lngVal DIV power(GRID_COLUMNS_, GRID_CODE_LENGTH_);
       END IF;
        i_ = 0;
       WHILE (i_ < (PAIR_CODE_LENGTH_ / 2)) LOOP
           code = concat(substr(CODE_ALPHABET_, (lngVal % ENCODING_BASE_) + 1, 1), code);
           code = concat(substr(CODE_ALPHABET_, (latVal % ENCODING_BASE_) + 1, 1), code);
           latVal = latVal DIV ENCODING_BASE_;
           lngVal = lngVal DIV ENCODING_BASE_;
           i_ = i_ + 1;
       END LOOP;
        code = concat(
           substr(code, 1, SEPARATOR_POSITION_),
           SEPARATOR_,
           substr(code, SEPARATOR_POSITION_ + 1)
       );
        IF (codeLength > SEPARATOR_POSITION_) THEN
           RETURN substr(code, 1, codeLength+1);
       ELSE
           RETURN substr(code, 1, codeLength);
       END IF;
   END //
DELIMITER ;

```

Run the following SQL commands to create the table-valued functions (TVFs):

```sql
USE martech;

CREATE OR REPLACE FUNCTION dynamic_subscriber_segments_locations(_since DATETIME(6) NULL, _until DATETIME(6) NULL) RETURNS TABLE AS
   RETURN
       SELECT locations.city_id AS city_id,
           locations.subscriber_id AS subscriber_id,
           segments.segment_id AS segment_id,
           MAX(date_add_dynamic(locations.ts,segments.valid_interval)) AS expires_at
       FROM (segments as segments  JOIN locations as locations )
       WHERE ((segments.filter_kind = 'olc_8')
           AND
               (segments.filter_value = locations.olc_8)
           AND
               (locations.ts >= date_sub_dynamic( NOW(6),segments.valid_interval)) AND (locations.ts >= _since) AND (locations.ts < _until))
       GROUP BY 1, 2, 3;
  

CREATE OR REPLACE FUNCTION dynamic_subscriber_segments_requests(_since DATETIME(6) NULL, _until DATETIME(6) NULL) RETURNS TABLE AS
   RETURN
       SELECT requests.city_id AS city_id,
           requests.subscriber_id AS subscriber_id,
           segments.segment_id AS segment_id,
           MAX(date_add_dynamic(requests.ts,segments.valid_interval)) AS expires_at
       FROM (segments as segments  JOIN requests as requests )
       WHERE ((segments.filter_kind = 'request')
           AND
               (segments.filter_value = requests.domain)
           AND
               (requests.ts >= date_sub_dynamic( NOW(6),segments.valid_interval))
           AND
               (requests.ts >= _since) AND (requests.ts < _until))
       GROUP BY 1, 2, 3;

CREATE OR REPLACE FUNCTION dynamic_subscriber_segments_purchases(_since DATETIME(6) NULL, _until DATETIME(6) NULL) RETURNS TABLE AS
   RETURN
       SELECT purchases.city_id AS city_id,
           purchases.subscriber_id AS subscriber_id,
           segments.segment_id AS segment_id,
           MAX(date_add_dynamic(purchases.ts,segments.valid_interval)) AS expires_at
       FROM (segments as segments  JOIN purchases as purchases )
       WHERE ((segments.filter_kind = 'purchase')
           AND
               (segments.filter_value = purchases.vendor)
           AND
               (purchases.ts >= date_sub_dynamic( NOW(6),segments.valid_interval)) AND (purchases.ts >= _since) AND (purchases.ts < _until))
       GROUP BY 1, 2, 3;


CREATE OR REPLACE FUNCTION dynamic_subscriber_segments(_since DATETIME(6) NULL, _until DATETIME(6) NULL) RETURNS TABLE AS
   RETURN
       SELECT dynamic_subscriber_segments_locations.city_id AS city_id,
           dynamic_subscriber_segments_locations.subscriber_id AS subscriber_id,
           dynamic_subscriber_segments_locations.segment_id AS segment_id,
           dynamic_subscriber_segments_locations.expires_at AS expires_at
       FROM dynamic_subscriber_segments_locations(_since, _until) as dynamic_subscriber_segments_locations 
       UNION ALL
       SELECT dynamic_subscriber_segments_requests.city_id AS city_id,
           dynamic_subscriber_segments_requests.subscriber_id AS subscriber_id,
           dynamic_subscriber_segments_requests.segment_id AS segment_id,
           dynamic_subscriber_segments_requests.expires_at AS expires_at
       FROM dynamic_subscriber_segments_requests(_since, _until) as dynamic_subscriber_segments_requests 
       UNION ALL
       (SELECT dynamic_subscriber_segments_purchases.city_id AS city_id,
           dynamic_subscriber_segments_purchases.subscriber_id AS subscriber_id,
           dynamic_subscriber_segments_purchases.segment_id AS segment_id,
           dynamic_subscriber_segments_purchases.expires_at AS expires_at
       FROM dynamic_subscriber_segments_purchases(_since, _until) as dynamic_subscriber_segments_purchases );

CREATE OR REPLACE FUNCTION match_offers_to_subscribers(_interval enum('second','minute','hour','day','week','month') CHARACTER SET utf8 COLLATE utf8_general_ci NULL)
   RETURNS TABLE
   AS
   RETURN WITH phase_1 AS
   (SELECT offers.offer_id AS offer_id,
       offers.customer AS customer,
       offers.enabled AS enabled,
       offers.notification_zone AS notification_zone,
       offers.segment_ids AS segment_ids,
       offers.notification_content AS notification_content,
       offers.notification_target AS notification_target,
       offers.maximum_bid_cents AS maximum_bid_cents,
       subscribers.city_id AS city_id,
       subscribers.subscriber_id AS subscriber_id,
       subscribers.current_location AS current_location
   FROM (offers as offers  JOIN (subscribers as subscribers
       LEFT JOIN subscribers_last_notification as subscribers_last_notification
       WITH (table_convert_subselect = TRUE)
       ON ((subscribers.city_id = subscribers_last_notification.city_id)
       AND (subscribers.subscriber_id = subscribers_last_notification.subscriber_id))))
       WHERE ((offers.enabled = 1) AND  GEOGRAPHY_CONTAINS(offers.notification_zone,subscribers.current_location)
       AND ( ISNULL(subscribers_last_notification.last_notification)
       OR (subscribers_last_notification.last_notification < date_sub_dynamic( NOW(),_interval)))
       AND (NOT  EXISTS( SELECT n.ts AS ts, n.city_id AS city_id, n.subscriber_id AS subscriber_id,
           n.offer_id AS offer_id, n.cost_cents AS cost_cents,
           n.lonlat AS lonlat FROM notifications as n 
           WHERE ((n.ts > date_sub_dynamic( NOW(),_interval))
           AND (offers.offer_id = n.offer_id) AND (subscribers.city_id = n.city_id)
           AND (subscribers.subscriber_id = n.subscriber_id)) LIMIT 1 )))), phase_2 AS (SELECT phase_1.offer_id AS offer_id,
               phase_1.customer AS customer, phase_1.enabled AS enabled, phase_1.notification_zone AS notification_zone,
               phase_1.segment_ids AS segment_ids, phase_1.notification_content AS notification_content,
               phase_1.notification_target AS notification_target, phase_1.maximum_bid_cents AS maximum_bid_cents,
               phase_1.city_id AS city_id, phase_1.subscriber_id AS subscriber_id, phase_1.current_location AS current_location, 
               ROW_NUMBER() OVER (PARTITION BY phase_1.city_id, phase_1.offer_id, phase_1.subscriber_id) AS num_matching_segments
           FROM (( phase_1 AS phase_1 JOIN  TABLE( JSON_TO_ARRAY(phase_1.segment_ids)) AS segment_ids)
               LEFT  JOIN subscriber_segments as segment 
               ON ((phase_1.city_id = segment.city_id) AND (phase_1.subscriber_id = segment.subscriber_id)
               AND ((segment_ids.table_col:>bigint(20) NULL) = segment.segment_id))))
               SELECT phase_2.city_id AS city_id, phase_2.subscriber_id AS subscriber_id, 
                   LAST_VALUE(phase_2.offer_id) OVER (PARTITION BY phase_2.city_id, phase_2.subscriber_id ORDER BY phase_2.maximum_bid_cents) AS best_offer_id, 
                   LAST_VALUE(phase_2.maximum_bid_cents) OVER (PARTITION BY phase_2.city_id, phase_2.subscriber_id ORDER BY phase_2.maximum_bid_cents) AS cost_cents,
                   phase_2.current_location AS current_location
               FROM  phase_2 AS phase_2
               WHERE ( JSON_LENGTH(phase_2.segment_ids) = phase_2.num_matching_segments)
               GROUP BY 1, 2;

```

## Create Stored Procedures

This part of the tutorial shows how to create stored procedures that are used to load data using pipelines.

> **📝 Note**: The **SQL Editor** only runs the queries that you select, so ensure you have them all selected before selecting **Run**.

Run the following SQL commands to create the stored procedures:

```sql
USE martech;

DELIMITER //
CREATE OR REPLACE PROCEDURE process_locations(_batch QUERY(subscriber_id BIGINT(20), offset_x DOUBLE, offset_y DOUBLE)) AS
   DECLARE
       _expanded QUERY(city_id BIGINT, subscriber_id BIGINT, lonlat GEOGRAPHYPOINT) = SELECT
           city_id, subscriber_id,
           GEOGRAPHY_POINT(
           GEOGRAPHY_LONGITUDE(center) + (offset_x * diameter),
           GEOGRAPHY_LATITUDE(center) + (offset_y * diameter)
           ) AS lonlat
       FROM _batch, cities;
   BEGIN
       INSERT INTO subscribers (city_id, subscriber_id, current_location)
       SELECT city_id, subscriber_id, lonlat
       FROM _expanded
       ON DUPLICATE KEY UPDATE current_location = VALUES(current_location);

       INSERT INTO locations (city_id, subscriber_id, ts, lonlat, olc_8)
       SELECT
           city_id,
           subscriber_id,
           now(6) AS ts,
           lonlat,
           encode_open_location_code(lonlat, 8) AS olc_8
       FROM _expanded;
END //

CREATE OR REPLACE PROCEDURE process_purchases(_batch QUERY(subscriber_id BIGINT(20), vendor TEXT)) AS
   BEGIN
       INSERT INTO purchases (city_id, subscriber_id, ts, vendor)
       SELECT city_id, subscriber_id, now(6) AS ts, vendor
       FROM _batch, cities;
END //

CREATE OR REPLACE PROCEDURE process_requests(_batch QUERY(subscriber_id BIGINT(20), domain TEXT)) AS
   BEGIN
       INSERT INTO requests (city_id, subscriber_id, ts, domain)
       SELECT city_id, subscriber_id, now(6) AS ts, domain
       FROM _batch, cities;
END//

CREATE OR REPLACE PROCEDURE prune_segments(_until datetime(6)) AS
   BEGIN
       DELETE FROM subscriber_segments WHERE expires_at <= _until;
END //

CREATE OR REPLACE PROCEDURE run_matching_process(_interval enum('second','minute','hour','day','week','month')) RETURNS BIGINT(20) AS
   DECLARE
       _ts DATETIME = NOW(6);
       _count BIGINT;
   BEGIN
       INSERT INTO notifications SELECT _ts, * FROM match_offers_to_subscribers(_interval);

       _count = row_count();

       INSERT INTO subscribers_last_notification
       SELECT city_id, subscriber_id, ts
       FROM notifications
       WHERE ts = _ts
       ON DUPLICATE KEY UPDATE last_notification = _ts;

       RETURN _count;
END //

CREATE OR REPLACE PROCEDURE update_segments(_since DATETIME(6), _until DATETIME(6)) AS
   BEGIN
       INSERT INTO subscriber_segments
       SELECT * FROM dynamic_subscriber_segments(_since, _until)
       ON DUPLICATE KEY UPDATE expires_at = VALUES(expires_at);
END //

CREATE OR REPLACE PROCEDURE update_sessions(_session_id TEXT, _lease_duration_sections INT(11)) AS
   DECLARE
       _num_alive_controllers QUERY(c INT) =
           SELECT COUNT(*) FROM sessions
           WHERE is_controller AND expires_at > NOW(6);

       _num_transactions QUERY(i INT) = SELECT @@trancount;
   BEGIN
       INSERT INTO sessions
       SET
       session_id = _session_id,
       expires_at = NOW() + INTERVAL _lease_duration_sections SECOND
   ON DUPLICATE KEY UPDATE expires_at = VALUES(expires_at);

   START TRANSACTION;

   IF SCALAR(_num_alive_controllers) = 0 THEN
       UPDATE sessions
       SET is_controller = (session_id = _session_id);
   END IF;

   ECHO SELECT
       session_id, is_controller, expires_at
   FROM sessions
   WHERE session_id = _session_id;

   COMMIT;

   DELETE FROM sessions
   WHERE NOW(6) > (expires_at + INTERVAL (_lease_duration_sections * 2) SECOND);

   EXCEPTION
       WHEN OTHERS THEN
       IF SCALAR(_num_transactions) > 0 THEN
           ROLLBACK;
       END IF;
END //


DELIMITER ;

```

## Load Data with Pipelines

This part of the tutorial shows how to ingest MarTech data from a public AWS S3 bucket into the SingleStore database using pipelines.

> **📝 Note**: The **SQL Editor** only runs the queries that you select, so ensure you have them all selected before selecting **Run**.

1. Run the following SQL commands to create the pipelines:
   ```sql
   USE martech;

   CREATE OR REPLACE PIPELINE cities
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/cities.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE cities;

   CREATE OR REPLACE PIPELINE locations
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/locations.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE locations;

   CREATE OR REPLACE PIPELINE notifications
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/notifications.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE notifications;

   CREATE OR REPLACE PIPELINE offers
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/offers.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE offers;

   CREATE OR REPLACE PIPELINE purchases
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/purchases.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE purchases;

   CREATE OR REPLACE PIPELINE requests
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/requests.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE requests;

   CREATE OR REPLACE PIPELINE segments
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/segments.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE segments;

   CREATE OR REPLACE PIPELINE sessions
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/sessions.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE sessions;

   CREATE OR REPLACE PIPELINE subscriber_segments
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/subscriber_segments.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE subscriber_segments;

   CREATE OR REPLACE PIPELINE subscribers
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/subscribers.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE subscribers;

   CREATE OR REPLACE PIPELINE subscribers_last_notification
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/subscribers_last_notification.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE subscribers_last_notification;

   CREATE OR REPLACE PIPELINE worldcities
   AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/worldcities.csv'
   CONFIG '{"region":"us-east-1"}'
   SKIP DUPLICATE KEY ERRORS
   INTO TABLE worldcities;
   ```

2. Run the following SQL commands to start the pipelines:
   ```sql
   USE martech;
   START ALL PIPELINES;

   ```

Once the **Success** message is returned for all the created pipelines, SingleStore starts ingesting the data from the S3 bucket.

## Verify the Pipeline Success

Query the `pipelines_files` information schema view to inspect the progress of the pipelines. For example, run the following command to check whether the `worldcities` pipeline has finished ingesting data.

```sql
SELECT * FROM information_schema.pipelines_files 
WHERE pipeline_name = "worldcities";
```

You can also run the pipelines in foreground to easily verify that all the data has been ingested.

```sql
START PIPELINE <pipeline_name> FOREGROUND;
```

For SingleStore Helios deployments, you can also monitor the progress of your pipelines on the Cloud Portal, select **Ingestion > Pipelines**, and then select a deployment from the list.

## Run Queries on Data

You can start running queries once the data is loaded into your SingleStore database. Here are some example queries to be used with the MarTech dataset.

## Query 1: Send Notifications

This query calls a function that matches offers with subscribers and sends notifications to subscribers.

```sql
USE martech;

INSERT INTO notifications (ts,city_id, subscriber_id, offer_id, cost_cents, lonlat)
SELECT now(),* FROM match_offers_to_subscribers("second");

```

## Query 2: Analyze Conversion Rate

This query calculates the number of customers who have made purchases so far and determines the conversion rate.

> **📝 Note**: As time progresses, more users will have made purchases, and the conversion rate may change.

```sql
USE martech;

SELECT
    *, 
    CONCAT(FORMAT((total_conversions / total_notifications) * 100,2),"%") AS conversion_rate, 
RANK() OVER (ORDER BY ((total_conversions / total_notifications)) desc) AS rank
FROM (
    SELECT
        offer_notification.customer,offer_notification.notification_zone,
        COUNT(offer_notification.offer_id) as total_notifications,
        COUNT(purchases.vendor) as total_conversions 
    FROM (
        SELECT offers.offer_id, offers.customer , notifications.ts, notifications.city_id, notifications.subscriber_id, offers.notification_zone
        FROM offers, notifications
        WHERE offers.offer_id = notifications.offer_id
    ) offer_notification
    LEFT JOIN purchases ON
        offer_notification.city_id = purchases.city_id
        AND offer_notification.subscriber_id = purchases.subscriber_id
        AND purchases.ts > offer_notification.ts
        AND purchases.vendor = offer_notification.customer
    GROUP BY 1
    LIMIT 5	/* remove LIMIT to see full result */
);


```

```output

+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------+-------------------+-----------------+------+
| customer    | notification_zone                                                                                                                                                          | total_notifications | total_conversions | conversion_rate | rank |
+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------+-------------------+-----------------+------+
| Eayo        | POLYGON((-74.00250000 40.73750000, -74.00000000 40.73750000, -74.00000000 40.74000000, -74.00250000 40.74000000, -74.00250000 40.73750000))                          |                 104 |                46 | 44.23%          |    1 |
| Linklinks   | POLYGON((-73.99750000 40.74000000, -73.99500000 40.74000000, -73.99500000 40.74250000, -73.99750000 40.74250000, -73.99750000 40.74000000))                          |                  99 |                29 | 29.29%          |    2 |
| Eabox       | POLYGON((-73.99000000 40.74000000, -73.98750000 40.74000000, -73.98750000 40.74250000, -73.99000000 40.74250000, -73.99000000 40.74000000))                          |                 587 |               158 | 26.92%          |    3 |
| Oyoba       | POLYGON((-74.00500000 40.73500000, -74.00250000 40.73500000, -74.00250000 40.73750000, -74.00500000 40.73750000, -74.00500000 40.73500000))                          |                 273 |                73 | 26.74%          |    4 |
| Brightbean  | POLYGON((-73.98250000 40.72500000, -73.98000000 40.72500000, -73.98000000 40.72750000, -73.98250000 40.72750000, -73.98250000 40.72500000))                          |                 164 |                25 | 15.24%          |    5 |
+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------+-------------------+-----------------+------+

```

## Connect to the Analytics Application

To connect the MarTech data application to an analytics application and monitor the conversion rate of previously sent notifications, visit <https://digital-marketing.labs.singlestore.com/>. Enter the connection configurations of your SingleStore deployment:

![A connection configuration page of real time digital marketing for martech analytics application.](https://images.contentstack.io/v3/assets/bltac01ee6daa3a1e14/blt82fc5afa7922c774/6a3330994fb13c0bc9333a4f/Real_time_digital_marketing-dj0acL.png)

Select the **Connect** button. The web-based analytics application opens and shows a visual representation of the data.

![A dashboard of martech analytics application](https://images.contentstack.io/v3/assets/bltac01ee6daa3a1e14/bltdff2ce6d5490a234/6a3330fb22bc51510e424745/Martech_dashboard-bXh6sP.png)

***

Modified at: October 28, 2024

Source: [/cloud/getting-started-with-singlestore-helios/next-steps-and-examples/sample-data/load-martech-data-into-singlestore/](https://docs.singlestore.com/cloud/getting-started-with-singlestore-helios/next-steps-and-examples/sample-data/load-martech-data-into-singlestore/)

(An index of the documentation is available at /llms.txt)
