Load MarTech Data into SingleStore

This tutorial creates a MarTech data application that delivers ads to users based on their behavior and location history. This application tracks user preferences, purchases, and geolocation to provide instant offers based on their location. The application enables you to monitor conversion rates and purchases, allowing you to add locations.

Prerequisites

Connect to a SingleStore Helios deployment or a SingleStore instance deployed on a self-managed cluster running on AWS.

Create Database and Tables

Connect to your SingleStore Helios deployment and run the following SQL commands to create a database named martech and twelve associated tables.

For SingleStore Helios deployments, you can also run these commands in the SQL Editor, select Cloud Portal > Develop > Data Studio > SQL Editor.

Note

The SQL Editor only runs the queries that you select, so ensure you have them all selected before selecting Run.

DROP DATABASE IF EXISTS martech;
CREATE DATABASE martech;
USE martech;
CREATE ROWSTORE REFERENCE TABLE cities (
city_id bigint(20) NOT NULL,
city_name text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
center geographypoint NOT NULL,
diameter double DEFAULT NULL,
PRIMARY KEY (city_id)
);
CREATE TABLE locations (
city_id bigint(20) NOT NULL,
subscriber_id bigint(20) NOT NULL,
ts datetime(6) NOT NULL SERIES TIMESTAMP,
lonlat geographypoint NOT NULL,
olc_8 text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
SHARD KEY __SHARDKEY (city_id,subscriber_id),
SORT KEY ts (ts),
KEY city_id (city_id,subscriber_id) USING HASH,
KEY olc_8 (olc_8) USING HASH
);
CREATE TABLE notifications (
ts datetime(6) NOT NULL SERIES TIMESTAMP,
city_id bigint(20) NOT NULL,
subscriber_id bigint(20) NOT NULL,
offer_id bigint(20) NOT NULL,
cost_cents bigint(20) NOT NULL,
lonlat geographypoint NOT NULL,
SHARD KEY __SHARDKEY (city_id,subscriber_id),
SORT KEY ts (ts)
);
CREATE ROWSTORE REFERENCE TABLE offers (
offer_id bigint(20) NOT NULL AUTO_INCREMENT,
customer text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
enabled tinyint(1) NOT NULL DEFAULT 1,
notification_zone geography NOT NULL,
segment_ids JSON COLLATE utf8mb4_bin NOT NULL,
notification_content text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
notification_target text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
maximum_bid_cents bigint(20) NOT NULL,
PRIMARY KEY (offer_id),
KEY customer (customer),
KEY notification_target (notification_target),
KEY notification_zone (notification_zone) WITH (RESOLUTION = 8)
);
CREATE TABLE purchases (
city_id bigint(20) NOT NULL,
subscriber_id bigint(20) NOT NULL,
ts datetime(6) NOT NULL SERIES TIMESTAMP,
vendor text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
SHARD KEY __SHARDKEY (city_id,subscriber_id),
SORT KEY ts (ts),
KEY vendor (vendor) USING HASH
);
CREATE TABLE requests (
city_id bigint(20) NOT NULL,
subscriber_id bigint(20) NOT NULL,
ts datetime(6) NOT NULL SERIES TIMESTAMP,
domain text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
SHARD KEY __SHARDKEY (city_id,subscriber_id),
SORT KEY ts (ts),
KEY domain (domain) USING HASH
);
CREATE ROWSTORE REFERENCE TABLE segments (
segment_id bigint(20) NOT NULL,
valid_interval enum('minute','hour','day','week','month') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
filter_kind enum('olc_8','request','purchase') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
filter_value text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
PRIMARY KEY (segment_id),
UNIQUE KEY valid_interval (valid_interval,filter_kind,filter_value),
KEY filter_kind (filter_kind,filter_value)
);
CREATE ROWSTORE TABLE sessions (
session_id text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
is_controller tinyint(1) NOT NULL DEFAULT 0,
expires_at datetime(6) NOT NULL,
PRIMARY KEY (session_id)
);
CREATE ROWSTORE TABLE subscriber_segments (
city_id bigint(20) NOT NULL,
subscriber_id bigint(20) NOT NULL,
segment_id bigint(20) NOT NULL,
expires_at datetime(6) NOT NULL,
PRIMARY KEY (city_id,subscriber_id,segment_id),
SHARD KEY city_id (city_id,subscriber_id)
);
CREATE ROWSTORE TABLE subscribers (
city_id bigint(20) NOT NULL,
subscriber_id bigint(20) NOT NULL,
current_location geographypoint NOT NULL,
PRIMARY KEY (city_id,subscriber_id),
KEY current_location (current_location)
);
CREATE ROWSTORE TABLE subscribers_last_notification (
city_id bigint(20) NOT NULL,
subscriber_id bigint(20) NOT NULL,
last_notification datetime(6) DEFAULT NULL,
PRIMARY KEY (city_id,subscriber_id),
KEY last_notification (last_notification)
);
CREATE ROWSTORE TABLE worldcities (
city_id bigint(20) NOT NULL,
city_name text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
center geographypoint NOT NULL,
PRIMARY KEY (city_id),
KEY center (center)
);

Create Functions

This part of the tutorial shows how to create functions that are used to develop the stored procedures. There are two types of functions used in this tutorial, namely user-defined functions (UDFs) and table-valued functions (TVFs). For related information, refer to CREATE FUNCTION (UDF) and CREATE FUNCTION (TVF).

Note

The SQL Editor only runs the queries that you select, so ensure you have them all selected before selecting Run.

Run the following SQL commands to create the user-defined functions (UDFs):

USE martech;
DELIMITER //
CREATE OR REPLACE FUNCTION date_add_dynamic(_dt DATETIME(6), _interval enum('second','minute','hour','day','week','month')) RETURNS DATETIME(6) AS
BEGIN
RETURN CASE _interval
WHEN "second" THEN _dt + INTERVAL 1 SECOND
WHEN "minute" THEN _dt + INTERVAL 1 MINUTE
WHEN "hour" THEN _dt + INTERVAL 1 HOUR
WHEN "day" THEN _dt + INTERVAL 1 DAY
WHEN "week" THEN _dt + INTERVAL 1 WEEK
WHEN "month" THEN _dt + INTERVAL 1 MONTH
END;
END //
CREATE OR REPLACE FUNCTION date_sub_dynamic(_dt DATETIME(6) NULL, _interval enum('second','minute','hour','day','week','month') CHARACTER SET utf8 COLLATE utf8_general_ci NULL) RETURNS DATETIME(6) NULL AS
BEGIN
RETURN CASE _interval
WHEN "second" THEN _dt - INTERVAL 1 SECOND
WHEN "minute" THEN _dt - INTERVAL 1 MINUTE
WHEN "hour" THEN _dt - INTERVAL 1 HOUR
WHEN "day" THEN _dt - INTERVAL 1 DAY
WHEN "week" THEN _dt - INTERVAL 1 WEEK
WHEN "month" THEN _dt - INTERVAL 1 MONTH
END;
END //
CREATE OR REPLACE FUNCTION encode_open_location_code(_lonlat geographypoint NULL, codeLength int(11) NULL DEFAULT 12)
RETURNS text CHARACTER SET utf8 COLLATE utf8_general_ci NULL
AS
DECLARE
SEPARATOR_ text = '+';
SEPARATOR_POSITION_ int = 8;
PADDING_CHARACTER_ text = '0';
CODE_ALPHABET_ text = '23456789CFGHJMPQRVWX';
ENCODING_BASE_ int = CHARACTER_LENGTH(CODE_ALPHABET_);
LATITUDE_MAX_ int = 90;
LONGITUDE_MAX_ int = 180;
MAX_DIGIT_COUNT_ int = 15;
PAIR_CODE_LENGTH_ int = 10;
PAIR_PRECISION_ decimal = POWER(ENCODING_BASE_, 3);
GRID_CODE_LENGTH_ int = MAX_DIGIT_COUNT_ - PAIR_CODE_LENGTH_;
GRID_COLUMNS_ int = 4;
GRID_ROWS_ int = 5;
FINAL_LAT_PRECISION_ decimal = PAIR_PRECISION_ * POWER(GRID_ROWS_, MAX_DIGIT_COUNT_ - PAIR_CODE_LENGTH_);
FINAL_LNG_PRECISION_ decimal = PAIR_PRECISION_ * POWER(GRID_COLUMNS_, MAX_DIGIT_COUNT_ - PAIR_CODE_LENGTH_);
latitude double = geography_latitude(_lonlat);
longitude double = geography_longitude(_lonlat);
code text = '';
latVal decimal = 0;
lngVal decimal = 0;
latDigit smallint;
lngDigit smallint;
ndx smallint;
i_ smallint;
BEGIN
IF ((codeLength < 2) OR ((codeLength < PAIR_CODE_LENGTH_) AND (codeLength % 2 = 1)) OR (codeLength > MAX_DIGIT_COUNT_)) THEN
RAISE USER_EXCEPTION(CONCAT('Invalid Open Location Code length - ', codeLength));
END IF;
latVal = floor(round((latitude + LATITUDE_MAX_) * FINAL_LAT_PRECISION_, 6));
lngVal = floor(round((longitude + LONGITUDE_MAX_) * FINAL_LNG_PRECISION_, 6));
IF (codeLength > PAIR_CODE_LENGTH_) THEN
i_ = 0;
WHILE (i_ < (MAX_DIGIT_COUNT_ - PAIR_CODE_LENGTH_)) LOOP
latDigit = latVal % GRID_ROWS_;
lngDigit = lngVal % GRID_COLUMNS_;
ndx = (latDigit * GRID_COLUMNS_) + lngDigit;
code = concat(substr(CODE_ALPHABET_, ndx + 1, 1), code);
latVal = latVal DIV GRID_ROWS_;
lngVal = lngVal DIV GRID_COLUMNS_;
i_ = i_ + 1;
END LOOP;
ELSE
latVal = latVal DIV power(GRID_ROWS_, GRID_CODE_LENGTH_);
lngVal = lngVal DIV power(GRID_COLUMNS_, GRID_CODE_LENGTH_);
END IF;
i_ = 0;
WHILE (i_ < (PAIR_CODE_LENGTH_ / 2)) LOOP
code = concat(substr(CODE_ALPHABET_, (lngVal % ENCODING_BASE_) + 1, 1), code);
code = concat(substr(CODE_ALPHABET_, (latVal % ENCODING_BASE_) + 1, 1), code);
latVal = latVal DIV ENCODING_BASE_;
lngVal = lngVal DIV ENCODING_BASE_;
i_ = i_ + 1;
END LOOP;
code = concat(
substr(code, 1, SEPARATOR_POSITION_),
SEPARATOR_,
substr(code, SEPARATOR_POSITION_ + 1)
);
IF (codeLength > SEPARATOR_POSITION_) THEN
RETURN substr(code, 1, codeLength+1);
ELSE
RETURN substr(code, 1, codeLength);
END IF;
END //
DELIMITER ;

Run the following SQL commands to create the table-valued functions (TVFs):

USE martech;
CREATE OR REPLACE FUNCTION dynamic_subscriber_segments_locations(_since DATETIME(6) NULL, _until DATETIME(6) NULL) RETURNS TABLE AS
RETURN
SELECT locations.city_id AS city_id,
locations.subscriber_id AS subscriber_id,
segments.segment_id AS segment_id,
MAX(date_add_dynamic(locations.ts,segments.valid_interval)) AS expires_at
FROM (segments as segments JOIN locations as locations )
WHERE ((segments.filter_kind = 'olc_8')
AND
(segments.filter_value = locations.olc_8)
AND
(locations.ts >= date_sub_dynamic( NOW(6),segments.valid_interval)) AND (locations.ts >= _since) AND (locations.ts < _until))
GROUP BY 1, 2, 3;
CREATE OR REPLACE FUNCTION dynamic_subscriber_segments_requests(_since DATETIME(6) NULL, _until DATETIME(6) NULL) RETURNS TABLE AS
RETURN
SELECT requests.city_id AS city_id,
requests.subscriber_id AS subscriber_id,
segments.segment_id AS segment_id,
MAX(date_add_dynamic(requests.ts,segments.valid_interval)) AS expires_at
FROM (segments as segments JOIN requests as requests )
WHERE ((segments.filter_kind = 'request')
AND
(segments.filter_value = requests.domain)
AND
(requests.ts >= date_sub_dynamic( NOW(6),segments.valid_interval))
AND
(requests.ts >= _since) AND (requests.ts < _until))
GROUP BY 1, 2, 3;
CREATE OR REPLACE FUNCTION dynamic_subscriber_segments_purchases(_since DATETIME(6) NULL, _until DATETIME(6) NULL) RETURNS TABLE AS
RETURN
SELECT purchases.city_id AS city_id,
purchases.subscriber_id AS subscriber_id,
segments.segment_id AS segment_id,
MAX(date_add_dynamic(purchases.ts,segments.valid_interval)) AS expires_at
FROM (segments as segments JOIN purchases as purchases )
WHERE ((segments.filter_kind = 'purchase')
AND
(segments.filter_value = purchases.vendor)
AND
(purchases.ts >= date_sub_dynamic( NOW(6),segments.valid_interval)) AND (purchases.ts >= _since) AND (purchases.ts < _until))
GROUP BY 1, 2, 3;
CREATE OR REPLACE FUNCTION dynamic_subscriber_segments(_since DATETIME(6) NULL, _until DATETIME(6) NULL) RETURNS TABLE AS
RETURN
SELECT dynamic_subscriber_segments_locations.city_id AS city_id,
dynamic_subscriber_segments_locations.subscriber_id AS subscriber_id,
dynamic_subscriber_segments_locations.segment_id AS segment_id,
dynamic_subscriber_segments_locations.expires_at AS expires_at
FROM dynamic_subscriber_segments_locations(_since, _until) as dynamic_subscriber_segments_locations
UNION ALL
SELECT dynamic_subscriber_segments_requests.city_id AS city_id,
dynamic_subscriber_segments_requests.subscriber_id AS subscriber_id,
dynamic_subscriber_segments_requests.segment_id AS segment_id,
dynamic_subscriber_segments_requests.expires_at AS expires_at
FROM dynamic_subscriber_segments_requests(_since, _until) as dynamic_subscriber_segments_requests
UNION ALL
(SELECT dynamic_subscriber_segments_purchases.city_id AS city_id,
dynamic_subscriber_segments_purchases.subscriber_id AS subscriber_id,
dynamic_subscriber_segments_purchases.segment_id AS segment_id,
dynamic_subscriber_segments_purchases.expires_at AS expires_at
FROM dynamic_subscriber_segments_purchases(_since, _until) as dynamic_subscriber_segments_purchases );
CREATE OR REPLACE FUNCTION match_offers_to_subscribers(_interval enum('second','minute','hour','day','week','month') CHARACTER SET utf8 COLLATE utf8_general_ci NULL)
RETURNS TABLE
AS
RETURN WITH phase_1 AS
(SELECT offers.offer_id AS offer_id,
offers.customer AS customer,
offers.enabled AS enabled,
offers.notification_zone AS notification_zone,
offers.segment_ids AS segment_ids,
offers.notification_content AS notification_content,
offers.notification_target AS notification_target,
offers.maximum_bid_cents AS maximum_bid_cents,
subscribers.city_id AS city_id,
subscribers.subscriber_id AS subscriber_id,
subscribers.current_location AS current_location
FROM (offers as offers JOIN (subscribers as subscribers
LEFT JOIN subscribers_last_notification as subscribers_last_notification
WITH (table_convert_subselect = TRUE)
ON ((subscribers.city_id = subscribers_last_notification.city_id)
AND (subscribers.subscriber_id = subscribers_last_notification.subscriber_id))))
WHERE ((offers.enabled = 1) AND GEOGRAPHY_CONTAINS(offers.notification_zone,subscribers.current_location)
AND ( ISNULL(subscribers_last_notification.last_notification)
OR (subscribers_last_notification.last_notification < date_sub_dynamic( NOW(),_interval)))
AND (NOT EXISTS( SELECT n.ts AS ts, n.city_id AS city_id, n.subscriber_id AS subscriber_id,
n.offer_id AS offer_id, n.cost_cents AS cost_cents,
n.lonlat AS lonlat FROM notifications as n
WHERE ((n.ts > date_sub_dynamic( NOW(),_interval))
AND (offers.offer_id = n.offer_id) AND (subscribers.city_id = n.city_id)
AND (subscribers.subscriber_id = n.subscriber_id)) LIMIT 1 )))), phase_2 AS (SELECT phase_1.offer_id AS offer_id,
phase_1.customer AS customer, phase_1.enabled AS enabled, phase_1.notification_zone AS notification_zone,
phase_1.segment_ids AS segment_ids, phase_1.notification_content AS notification_content,
phase_1.notification_target AS notification_target, phase_1.maximum_bid_cents AS maximum_bid_cents,
phase_1.city_id AS city_id, phase_1.subscriber_id AS subscriber_id, phase_1.current_location AS current_location,
ROW_NUMBER() OVER (PARTITION BY phase_1.city_id, phase_1.offer_id, phase_1.subscriber_id) AS num_matching_segments
FROM (( phase_1 AS phase_1 JOIN TABLE( JSON_TO_ARRAY(phase_1.segment_ids)) AS segment_ids)
LEFT JOIN subscriber_segments as segment
ON ((phase_1.city_id = segment.city_id) AND (phase_1.subscriber_id = segment.subscriber_id)
AND ((segment_ids.table_col:>bigint(20) NULL) = segment.segment_id))))
SELECT phase_2.city_id AS city_id, phase_2.subscriber_id AS subscriber_id,
LAST_VALUE(phase_2.offer_id) OVER (PARTITION BY phase_2.city_id, phase_2.subscriber_id ORDER BY phase_2.maximum_bid_cents) AS best_offer_id,
LAST_VALUE(phase_2.maximum_bid_cents) OVER (PARTITION BY phase_2.city_id, phase_2.subscriber_id ORDER BY phase_2.maximum_bid_cents) AS cost_cents,
phase_2.current_location AS current_location
FROM phase_2 AS phase_2
WHERE ( JSON_LENGTH(phase_2.segment_ids) = phase_2.num_matching_segments)
GROUP BY 1, 2;

Create Stored Procedures

This part of the tutorial shows how to create stored procedures that are used to load data using pipelines.

Note

The SQL Editor only runs the queries that you select, so ensure you have them all selected before selecting Run.

Run the following SQL commands to create the stored procedures:

USE martech;
DELIMITER //
CREATE OR REPLACE PROCEDURE process_locations(_batch QUERY(subscriber_id BIGINT(20), offset_x DOUBLE, offset_y DOUBLE)) AS
DECLARE
_expanded QUERY(city_id BIGINT, subscriber_id BIGINT, lonlat GEOGRAPHYPOINT) = SELECT
city_id, subscriber_id,
GEOGRAPHY_POINT(
GEOGRAPHY_LONGITUDE(center) + (offset_x * diameter),
GEOGRAPHY_LATITUDE(center) + (offset_y * diameter)
) AS lonlat
FROM _batch, cities;
BEGIN
INSERT INTO subscribers (city_id, subscriber_id, current_location)
SELECT city_id, subscriber_id, lonlat
FROM _expanded
ON DUPLICATE KEY UPDATE current_location = VALUES(current_location);
INSERT INTO locations (city_id, subscriber_id, ts, lonlat, olc_8)
SELECT
city_id,
subscriber_id,
now(6) AS ts,
lonlat,
encode_open_location_code(lonlat, 8) AS olc_8
FROM _expanded;
END //
CREATE OR REPLACE PROCEDURE process_purchases(_batch QUERY(subscriber_id BIGINT(20), vendor TEXT)) AS
BEGIN
INSERT INTO purchases (city_id, subscriber_id, ts, vendor)
SELECT city_id, subscriber_id, now(6) AS ts, vendor
FROM _batch, cities;
END //
CREATE OR REPLACE PROCEDURE process_requests(_batch QUERY(subscriber_id BIGINT(20), domain TEXT)) AS
BEGIN
INSERT INTO requests (city_id, subscriber_id, ts, domain)
SELECT city_id, subscriber_id, now(6) AS ts, domain
FROM _batch, cities;
END//
CREATE OR REPLACE PROCEDURE prune_segments(_until datetime(6)) AS
BEGIN
DELETE FROM subscriber_segments WHERE expires_at <= _until;
END //
CREATE OR REPLACE PROCEDURE run_matching_process(_interval enum('second','minute','hour','day','week','month')) RETURNS BIGINT(20) AS
DECLARE
_ts DATETIME = NOW(6);
_count BIGINT;
BEGIN
INSERT INTO notifications SELECT _ts, * FROM match_offers_to_subscribers(_interval);
_count = row_count();
INSERT INTO subscribers_last_notification
SELECT city_id, subscriber_id, ts
FROM notifications
WHERE ts = _ts
ON DUPLICATE KEY UPDATE last_notification = _ts;
RETURN _count;
END //
CREATE OR REPLACE PROCEDURE update_segments(_since DATETIME(6), _until DATETIME(6)) AS
BEGIN
INSERT INTO subscriber_segments
SELECT * FROM dynamic_subscriber_segments(_since, _until)
ON DUPLICATE KEY UPDATE expires_at = VALUES(expires_at);
END //
CREATE OR REPLACE PROCEDURE update_sessions(_session_id TEXT, _lease_duration_sections INT(11)) AS
DECLARE
_num_alive_controllers QUERY(c INT) =
SELECT COUNT(*) FROM sessions
WHERE is_controller AND expires_at > NOW(6);
_num_transactions QUERY(i INT) = SELECT @@trancount;
BEGIN
INSERT INTO sessions
SET
session_id = _session_id,
expires_at = NOW() + INTERVAL _lease_duration_sections SECOND
ON DUPLICATE KEY UPDATE expires_at = VALUES(expires_at);
START TRANSACTION;
IF SCALAR(_num_alive_controllers) = 0 THEN
UPDATE sessions
SET is_controller = (session_id = _session_id);
END IF;
ECHO SELECT
session_id, is_controller, expires_at
FROM sessions
WHERE session_id = _session_id;
COMMIT;
DELETE FROM sessions
WHERE NOW(6) > (expires_at + INTERVAL (_lease_duration_sections * 2) SECOND);
EXCEPTION
WHEN OTHERS THEN
IF SCALAR(_num_transactions) > 0 THEN
ROLLBACK;
END IF;
END //
DELIMITER ;

Load Data with Pipelines

This part of the tutorial shows how to ingest MarTech data from a public AWS S3 bucket into the SingleStore database using pipelines.

Note

The SQL Editor only runs the queries that you select, so ensure you have them all selected before selecting Run.

  1. Run the following SQL commands to create the pipelines:

    USE martech;
    CREATE OR REPLACE PIPELINE cities
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/cities.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE cities;
    CREATE OR REPLACE PIPELINE locations
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/locations.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE locations;
    CREATE OR REPLACE PIPELINE notifications
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/notifications.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE notifications;
    CREATE OR REPLACE PIPELINE offers
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/offers.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE offers;
    CREATE OR REPLACE PIPELINE purchases
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/purchases.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE purchases;
    CREATE OR REPLACE PIPELINE requests
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/requests.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE requests;
    CREATE OR REPLACE PIPELINE segments
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/segments.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE segments;
    CREATE OR REPLACE PIPELINE sessions
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/sessions.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE sessions;
    CREATE OR REPLACE PIPELINE subscriber_segments
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/subscriber_segments.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE subscriber_segments;
    CREATE OR REPLACE PIPELINE subscribers
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/subscribers.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE subscribers;
    CREATE OR REPLACE PIPELINE subscribers_last_notification
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/subscribers_last_notification.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE subscribers_last_notification;
    CREATE OR REPLACE PIPELINE worldcities
    AS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/worldcities.csv'
    CONFIG '{"region":"us-east-1"}'
    SKIP DUPLICATE KEY ERRORS
    INTO TABLE worldcities;
  2. Run the following SQL commands to start the pipelines:

    USE martech;
    START ALL PIPELINES;

Once the Success message is returned for all the created pipelines, SingleStore starts ingesting the data from the S3 bucket.

Verify the Pipeline Success

Query the pipelines_files information schema view to inspect the progress of the pipelines. For example, run the following command to check whether the worldcities pipeline has finished ingesting data.

SELECT * FROM information_schema.pipelines_files
WHERE pipeline_name = "worldcities";

You can also run the pipelines in foreground to easily verify that all the data has been ingested.

START PIPELINE <pipeline_name> FOREGROUND;

For SingleStore Helios deployments, you can also monitor the progress of your pipelines on the Cloud Portal, select Deployments > <your_workspace_group> > Databases > martech > Pipelines.

Run Queries on Data

You can start running queries once the data is loaded into your SingleStore database. Here are some example queries to be used with the MarTech dataset.

Query 1: Send Notifications

This query calls a function that matches offers with subscribers and sends notifications to subscribers.

USE martech;
INSERT INTO notifications (ts,city_id, subscriber_id, offer_id, cost_cents, lonlat)
SELECT now(),* FROM match_offers_to_subscribers("second");
Query OK, 5234 rows affected (2.166 s)

Query 2: Analyze Conversion Rate

This query calculates the number of customers who have made purchases so far and determines the conversion rate.

Note

As time progresses, more users will have made purchases, and the conversion rate may change.

USE martech;
SELECT
*,
CONCAT(FORMAT((total_conversions / total_notifications) * 100,2),"%") AS conversion_rate,
RANK() OVER (ORDER BY ((total_conversions / total_notifications)) desc) AS rank
FROM (
SELECT
offer_notification.customer,offer_notification.notification_zone,
COUNT(offer_notification.offer_id) as total_notifications,
COUNT(purchases.vendor) as total_conversions
FROM (
SELECT offers.offer_id, offers.customer , notifications.ts, notifications.city_id, notifications.subscriber_id, offers.notification_zone
FROM offers, notifications
WHERE offers.offer_id = notifications.offer_id
) offer_notification
LEFT JOIN purchases ON
offer_notification.city_id = purchases.city_id
AND offer_notification.subscriber_id = purchases.subscriber_id
AND purchases.ts > offer_notification.ts
AND purchases.vendor = offer_notification.customer
GROUP BY 1
LIMIT 5 /* remove LIMIT to see full result */
);
+----------+---------------------------------------------------------------------------------------------------------------------------------------------+---------------------+-------------------+-----------------+------+
| customer | notification_zone                                                                                                                           | total_notifications | total_conversions | conversion_rate | rank |
+----------+---------------------------------------------------------------------------------------------------------------------------------------------+---------------------+-------------------+-----------------+------+
| Jazzy    | POLYGON((-74.01000000 40.71250000, -74.00750000 40.71250000, -74.00750000 40.71500000, -74.01000000 40.71500000, -74.01000000 40.71250000)) |                  90 |                51 | 56.67%          |    1 |
| Eidel    | POLYGON((-74.00000000 40.74500000, -73.99750000 40.74500000, -73.99750000 40.74750000, -74.00000000 40.74750000, -74.00000000 40.74500000)) |                  78 |                34 | 43.59%          |    2 |
| Skibox   | POLYGON((-74.01500000 40.72500000, -74.01250000 40.72500000, -74.01250000 40.72750000, -74.01500000 40.72750000, -74.01500000 40.72500000)) |                  40 |                17 | 42.50%          |    3 |
| Kazu     | POLYGON((-73.99750000 40.72250000, -73.99500000 40.72250000, -73.99500000 40.72500000, -73.99750000 40.72500000, -73.99750000 40.72250000)) |                  88 |                24 | 27.27%          |    4 |
| Rhyloo   | POLYGON((-73.98250000 40.72250000, -73.98000000 40.72250000, -73.98000000 40.72500000, -73.98250000 40.72500000, -73.98250000 40.72250000)) |                 181 |                23 | 12.71%          |    5 |
+----------+---------------------------------------------------------------------------------------------------------------------------------------------+---------------------+-------------------+-----------------+------+
5 rows in set (0.51 sec)

Connect to the Analytics Application

To connect the MarTech data application to an analytics application and monitor the conversion rate of previously sent notifications, visit https://digital-marketing.labs.singlestore.com/. Enter the connection configurations of your SingleStore deployment:

A connection configuration page of real time digital marketing for martech analytics application.

Select the Connect button. The web-based analytics application opens and shows a visual representation of the data.

A dashboard of martech analytics application

Last modified: October 28, 2024

Was this article helpful?