Load MarTech Data into SingleStore
On this page
This tutorial creates a MarTech data application that delivers ads to users based on their behavior and location history.
Create Database and Tables
Connect to your SingleStore Helios deployment and run the following SQL commands to create a database named martech and twelve associated tables.
For SingleStore Helios deployments, you can also run these commands in the SQL Editor, select Cloud Portal > Develop > Data Studio > SQL Editor.
Note
The SQL Editor only runs the queries that you select, so ensure you have them all selected before selecting Run.
DROP DATABASE IF EXISTS martech;CREATE DATABASE martech;USE martech;CREATE ROWSTORE REFERENCE TABLE cities (city_id bigint(20) NOT NULL,city_name text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,center geographypoint NOT NULL,diameter double DEFAULT NULL,PRIMARY KEY (city_id));CREATE TABLE locations (city_id bigint(20) NOT NULL,subscriber_id bigint(20) NOT NULL,ts datetime(6) NOT NULL SERIES TIMESTAMP,lonlat geographypoint NOT NULL,olc_8 text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,SHARD KEY __SHARDKEY (city_id,subscriber_id),SORT KEY ts (ts),KEY city_id (city_id,subscriber_id) USING HASH,KEY olc_8 (olc_8) USING HASH);CREATE TABLE notifications (ts datetime(6) NOT NULL SERIES TIMESTAMP,city_id bigint(20) NOT NULL,subscriber_id bigint(20) NOT NULL,offer_id bigint(20) NOT NULL,cost_cents bigint(20) NOT NULL,lonlat geographypoint NOT NULL,SHARD KEY __SHARDKEY (city_id,subscriber_id),SORT KEY ts (ts));CREATE ROWSTORE REFERENCE TABLE offers (offer_id bigint(20) NOT NULL AUTO_INCREMENT,customer text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,enabled tinyint(1) NOT NULL DEFAULT 1,notification_zone geography NOT NULL,segment_ids JSON COLLATE utf8mb4_bin NOT NULL,notification_content text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,notification_target text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,maximum_bid_cents bigint(20) NOT NULL,PRIMARY KEY (offer_id),KEY customer (customer),KEY notification_target (notification_target),KEY notification_zone (notification_zone) WITH (RESOLUTION = 8));CREATE TABLE purchases (city_id bigint(20) NOT NULL,subscriber_id bigint(20) NOT NULL,ts datetime(6) NOT NULL SERIES TIMESTAMP,vendor text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,SHARD KEY __SHARDKEY (city_id,subscriber_id),SORT KEY ts (ts),KEY vendor (vendor) USING HASH);CREATE TABLE requests (city_id bigint(20) NOT NULL,subscriber_id bigint(20) NOT NULL,ts datetime(6) NOT NULL SERIES TIMESTAMP,domain text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,SHARD KEY __SHARDKEY (city_id,subscriber_id),SORT KEY ts (ts),KEY domain (domain) USING HASH);CREATE ROWSTORE REFERENCE TABLE segments (segment_id bigint(20) NOT NULL,valid_interval enum('minute','hour','day','week','month') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,filter_kind enum('olc_8','request','purchase') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,filter_value text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,PRIMARY KEY (segment_id),UNIQUE KEY valid_interval (valid_interval,filter_kind,filter_value),KEY filter_kind (filter_kind,filter_value));CREATE ROWSTORE TABLE sessions (session_id text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,is_controller tinyint(1) NOT NULL DEFAULT 0,expires_at datetime(6) NOT NULL,PRIMARY KEY (session_id));CREATE ROWSTORE TABLE subscriber_segments (city_id bigint(20) NOT NULL,subscriber_id bigint(20) NOT NULL,segment_id bigint(20) NOT NULL,expires_at datetime(6) NOT NULL,PRIMARY KEY (city_id,subscriber_id,segment_id),SHARD KEY city_id (city_id,subscriber_id));CREATE ROWSTORE TABLE subscribers (city_id bigint(20) NOT NULL,subscriber_id bigint(20) NOT NULL,current_location geographypoint NOT NULL,PRIMARY KEY (city_id,subscriber_id),KEY current_location (current_location));CREATE ROWSTORE TABLE subscribers_last_notification (city_id bigint(20) NOT NULL,subscriber_id bigint(20) NOT NULL,last_notification datetime(6) DEFAULT NULL,PRIMARY KEY (city_id,subscriber_id),KEY last_notification (last_notification));CREATE ROWSTORE TABLE worldcities (city_id bigint(20) NOT NULL,city_name text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,center geographypoint NOT NULL,PRIMARY KEY (city_id),KEY center (center));
Create Functions
This part of the tutorial shows how to create functions that are used to develop the stored procedures.
Note
The SQL Editor only runs the queries that you select, so ensure you have them all selected before selecting Run.
Run the following SQL commands to create the user-defined functions (UDFs):
USE martech;DELIMITER //CREATE OR REPLACE FUNCTION date_add_dynamic(_dt DATETIME(6), _interval enum('second','minute','hour','day','week','month')) RETURNS DATETIME(6) ASBEGINRETURN CASE _intervalWHEN "second" THEN _dt + INTERVAL 1 SECONDWHEN "minute" THEN _dt + INTERVAL 1 MINUTEWHEN "hour" THEN _dt + INTERVAL 1 HOURWHEN "day" THEN _dt + INTERVAL 1 DAYWHEN "week" THEN _dt + INTERVAL 1 WEEKWHEN "month" THEN _dt + INTERVAL 1 MONTHEND;END //CREATE OR REPLACE FUNCTION date_sub_dynamic(_dt DATETIME(6) NULL, _interval enum('second','minute','hour','day','week','month') CHARACTER SET utf8 COLLATE utf8_general_ci NULL) RETURNS DATETIME(6) NULL ASBEGINRETURN CASE _intervalWHEN "second" THEN _dt - INTERVAL 1 SECONDWHEN "minute" THEN _dt - INTERVAL 1 MINUTEWHEN "hour" THEN _dt - INTERVAL 1 HOURWHEN "day" THEN _dt - INTERVAL 1 DAYWHEN "week" THEN _dt - INTERVAL 1 WEEKWHEN "month" THEN _dt - INTERVAL 1 MONTHEND;END //CREATE OR REPLACE FUNCTION encode_open_location_code(_lonlat geographypoint NULL, codeLength int(11) NULL DEFAULT 12)RETURNS text CHARACTER SET utf8 COLLATE utf8_general_ci NULLASDECLARESEPARATOR_ text = '+';SEPARATOR_POSITION_ int = 8;PADDING_CHARACTER_ text = '0';CODE_ALPHABET_ text = '23456789CFGHJMPQRVWX';ENCODING_BASE_ int = CHARACTER_LENGTH(CODE_ALPHABET_);LATITUDE_MAX_ int = 90;LONGITUDE_MAX_ int = 180;MAX_DIGIT_COUNT_ int = 15;PAIR_CODE_LENGTH_ int = 10;PAIR_PRECISION_ decimal = POWER(ENCODING_BASE_, 3);GRID_CODE_LENGTH_ int = MAX_DIGIT_COUNT_ - PAIR_CODE_LENGTH_;GRID_COLUMNS_ int = 4;GRID_ROWS_ int = 5;FINAL_LAT_PRECISION_ decimal = PAIR_PRECISION_ * POWER(GRID_ROWS_, MAX_DIGIT_COUNT_ - PAIR_CODE_LENGTH_);FINAL_LNG_PRECISION_ decimal = PAIR_PRECISION_ * POWER(GRID_COLUMNS_, MAX_DIGIT_COUNT_ - PAIR_CODE_LENGTH_);latitude double = geography_latitude(_lonlat);longitude double = geography_longitude(_lonlat);code text = '';latVal decimal = 0;lngVal decimal = 0;latDigit smallint;lngDigit smallint;ndx smallint;i_ smallint;BEGINIF ((codeLength < 2) OR ((codeLength < PAIR_CODE_LENGTH_) AND (codeLength % 2 = 1)) OR (codeLength > MAX_DIGIT_COUNT_)) THENRAISE USER_EXCEPTION(CONCAT('Invalid Open Location Code length - ', codeLength));END IF;latVal = floor(round((latitude + LATITUDE_MAX_) * FINAL_LAT_PRECISION_, 6));lngVal = floor(round((longitude + LONGITUDE_MAX_) * FINAL_LNG_PRECISION_, 6));IF (codeLength > PAIR_CODE_LENGTH_) THENi_ = 0;WHILE (i_ < (MAX_DIGIT_COUNT_ - PAIR_CODE_LENGTH_)) LOOPlatDigit = latVal % GRID_ROWS_;lngDigit = lngVal % GRID_COLUMNS_;ndx = (latDigit * GRID_COLUMNS_) + lngDigit;code = concat(substr(CODE_ALPHABET_, ndx + 1, 1), code);latVal = latVal DIV GRID_ROWS_;lngVal = lngVal DIV GRID_COLUMNS_;i_ = i_ + 1;END LOOP;ELSElatVal = latVal DIV power(GRID_ROWS_, GRID_CODE_LENGTH_);lngVal = lngVal DIV power(GRID_COLUMNS_, GRID_CODE_LENGTH_);END IF;i_ = 0;WHILE (i_ < (PAIR_CODE_LENGTH_ / 2)) LOOPcode = concat(substr(CODE_ALPHABET_, (lngVal % ENCODING_BASE_) + 1, 1), code);code = concat(substr(CODE_ALPHABET_, (latVal % ENCODING_BASE_) + 1, 1), code);latVal = latVal DIV ENCODING_BASE_;lngVal = lngVal DIV ENCODING_BASE_;i_ = i_ + 1;END LOOP;code = concat(substr(code, 1, SEPARATOR_POSITION_),SEPARATOR_,substr(code, SEPARATOR_POSITION_ + 1));IF (codeLength > SEPARATOR_POSITION_) THENRETURN substr(code, 1, codeLength+1);ELSERETURN substr(code, 1, codeLength);END IF;END //DELIMITER ;
Run the following SQL commands to create the table-valued functions (TVFs):
USE martech;CREATE OR REPLACE FUNCTION dynamic_subscriber_segments_locations(_since DATETIME(6) NULL, _until DATETIME(6) NULL) RETURNS TABLE ASRETURNSELECT locations.city_id AS city_id,locations.subscriber_id AS subscriber_id,segments.segment_id AS segment_id,MAX(date_add_dynamic(locations.ts,segments.valid_interval)) AS expires_atFROM (segments as segments JOIN locations as locations )WHERE ((segments.filter_kind = 'olc_8')AND(segments.filter_value = locations.olc_8)AND(locations.ts >= date_sub_dynamic( NOW(6),segments.valid_interval)) AND (locations.ts >= _since) AND (locations.ts < _until))GROUP BY 1, 2, 3;CREATE OR REPLACE FUNCTION dynamic_subscriber_segments_requests(_since DATETIME(6) NULL, _until DATETIME(6) NULL) RETURNS TABLE ASRETURNSELECT requests.city_id AS city_id,requests.subscriber_id AS subscriber_id,segments.segment_id AS segment_id,MAX(date_add_dynamic(requests.ts,segments.valid_interval)) AS expires_atFROM (segments as segments JOIN requests as requests )WHERE ((segments.filter_kind = 'request')AND(segments.filter_value = requests.domain)AND(requests.ts >= date_sub_dynamic( NOW(6),segments.valid_interval))AND(requests.ts >= _since) AND (requests.ts < _until))GROUP BY 1, 2, 3;CREATE OR REPLACE FUNCTION dynamic_subscriber_segments_purchases(_since DATETIME(6) NULL, _until DATETIME(6) NULL) RETURNS TABLE ASRETURNSELECT purchases.city_id AS city_id,purchases.subscriber_id AS subscriber_id,segments.segment_id AS segment_id,MAX(date_add_dynamic(purchases.ts,segments.valid_interval)) AS expires_atFROM (segments as segments JOIN purchases as purchases )WHERE ((segments.filter_kind = 'purchase')AND(segments.filter_value = purchases.vendor)AND(purchases.ts >= date_sub_dynamic( NOW(6),segments.valid_interval)) AND (purchases.ts >= _since) AND (purchases.ts < _until))GROUP BY 1, 2, 3;CREATE OR REPLACE FUNCTION dynamic_subscriber_segments(_since DATETIME(6) NULL, _until DATETIME(6) NULL) RETURNS TABLE ASRETURNSELECT dynamic_subscriber_segments_locations.city_id AS city_id,dynamic_subscriber_segments_locations.subscriber_id AS subscriber_id,dynamic_subscriber_segments_locations.segment_id AS segment_id,dynamic_subscriber_segments_locations.expires_at AS expires_atFROM dynamic_subscriber_segments_locations(_since, _until) as dynamic_subscriber_segments_locationsUNION ALLSELECT dynamic_subscriber_segments_requests.city_id AS city_id,dynamic_subscriber_segments_requests.subscriber_id AS subscriber_id,dynamic_subscriber_segments_requests.segment_id AS segment_id,dynamic_subscriber_segments_requests.expires_at AS expires_atFROM dynamic_subscriber_segments_requests(_since, _until) as dynamic_subscriber_segments_requestsUNION ALL(SELECT dynamic_subscriber_segments_purchases.city_id AS city_id,dynamic_subscriber_segments_purchases.subscriber_id AS subscriber_id,dynamic_subscriber_segments_purchases.segment_id AS segment_id,dynamic_subscriber_segments_purchases.expires_at AS expires_atFROM dynamic_subscriber_segments_purchases(_since, _until) as dynamic_subscriber_segments_purchases );CREATE OR REPLACE FUNCTION match_offers_to_subscribers(_interval enum('second','minute','hour','day','week','month') CHARACTER SET utf8 COLLATE utf8_general_ci NULL)RETURNS TABLEASRETURN WITH phase_1 AS(SELECT offers.offer_id AS offer_id,offers.customer AS customer,offers.enabled AS enabled,offers.notification_zone AS notification_zone,offers.segment_ids AS segment_ids,offers.notification_content AS notification_content,offers.notification_target AS notification_target,offers.maximum_bid_cents AS maximum_bid_cents,subscribers.city_id AS city_id,subscribers.subscriber_id AS subscriber_id,subscribers.current_location AS current_locationFROM (offers as offers JOIN (subscribers as subscribersLEFT JOIN subscribers_last_notification as subscribers_last_notificationWITH (table_convert_subselect = TRUE)ON ((subscribers.city_id = subscribers_last_notification.city_id)AND (subscribers.subscriber_id = subscribers_last_notification.subscriber_id))))WHERE ((offers.enabled = 1) AND GEOGRAPHY_CONTAINS(offers.notification_zone,subscribers.current_location)AND ( ISNULL(subscribers_last_notification.last_notification)OR (subscribers_last_notification.last_notification < date_sub_dynamic( NOW(),_interval)))AND (NOT EXISTS( SELECT n.ts AS ts, n.city_id AS city_id, n.subscriber_id AS subscriber_id,n.offer_id AS offer_id, n.cost_cents AS cost_cents,n.lonlat AS lonlat FROM notifications as nWHERE ((n.ts > date_sub_dynamic( NOW(),_interval))AND (offers.offer_id = n.offer_id) AND (subscribers.city_id = n.city_id)AND (subscribers.subscriber_id = n.subscriber_id)) LIMIT 1 )))), phase_2 AS (SELECT phase_1.offer_id AS offer_id,phase_1.customer AS customer, phase_1.enabled AS enabled, phase_1.notification_zone AS notification_zone,phase_1.segment_ids AS segment_ids, phase_1.notification_content AS notification_content,phase_1.notification_target AS notification_target, phase_1.maximum_bid_cents AS maximum_bid_cents,phase_1.city_id AS city_id, phase_1.subscriber_id AS subscriber_id, phase_1.current_location AS current_location,ROW_NUMBER() OVER (PARTITION BY phase_1.city_id, phase_1.offer_id, phase_1.subscriber_id) AS num_matching_segmentsFROM (( phase_1 AS phase_1 JOIN TABLE( JSON_TO_ARRAY(phase_1.segment_ids)) AS segment_ids)LEFT JOIN subscriber_segments as segmentON ((phase_1.city_id = segment.city_id) AND (phase_1.subscriber_id = segment.subscriber_id)AND ((segment_ids.table_col:>bigint(20) NULL) = segment.segment_id))))SELECT phase_2.city_id AS city_id, phase_2.subscriber_id AS subscriber_id,LAST_VALUE(phase_2.offer_id) OVER (PARTITION BY phase_2.city_id, phase_2.subscriber_id ORDER BY phase_2.maximum_bid_cents) AS best_offer_id,LAST_VALUE(phase_2.maximum_bid_cents) OVER (PARTITION BY phase_2.city_id, phase_2.subscriber_id ORDER BY phase_2.maximum_bid_cents) AS cost_cents,phase_2.current_location AS current_locationFROM phase_2 AS phase_2WHERE ( JSON_LENGTH(phase_2.segment_ids) = phase_2.num_matching_segments)GROUP BY 1, 2;
Create Stored Procedures
This part of the tutorial shows how to create stored procedures that are used to load data using pipelines.
Note
The SQL Editor only runs the queries that you select, so ensure you have them all selected before selecting Run.
Run the following SQL commands to create the stored procedures:
USE martech;DELIMITER //CREATE OR REPLACE PROCEDURE process_locations(_batch QUERY(subscriber_id BIGINT(20), offset_x DOUBLE, offset_y DOUBLE)) ASDECLARE_expanded QUERY(city_id BIGINT, subscriber_id BIGINT, lonlat GEOGRAPHYPOINT) = SELECTcity_id, subscriber_id,GEOGRAPHY_POINT(GEOGRAPHY_LONGITUDE(center) + (offset_x * diameter),GEOGRAPHY_LATITUDE(center) + (offset_y * diameter)) AS lonlatFROM _batch, cities;BEGININSERT INTO subscribers (city_id, subscriber_id, current_location)SELECT city_id, subscriber_id, lonlatFROM _expandedON DUPLICATE KEY UPDATE current_location = VALUES(current_location);INSERT INTO locations (city_id, subscriber_id, ts, lonlat, olc_8)SELECTcity_id,subscriber_id,now(6) AS ts,lonlat,encode_open_location_code(lonlat, 8) AS olc_8FROM _expanded;END //CREATE OR REPLACE PROCEDURE process_purchases(_batch QUERY(subscriber_id BIGINT(20), vendor TEXT)) ASBEGININSERT INTO purchases (city_id, subscriber_id, ts, vendor)SELECT city_id, subscriber_id, now(6) AS ts, vendorFROM _batch, cities;END //CREATE OR REPLACE PROCEDURE process_requests(_batch QUERY(subscriber_id BIGINT(20), domain TEXT)) ASBEGININSERT INTO requests (city_id, subscriber_id, ts, domain)SELECT city_id, subscriber_id, now(6) AS ts, domainFROM _batch, cities;END//CREATE OR REPLACE PROCEDURE prune_segments(_until datetime(6)) ASBEGINDELETE FROM subscriber_segments WHERE expires_at <= _until;END //CREATE OR REPLACE PROCEDURE run_matching_process(_interval enum('second','minute','hour','day','week','month')) RETURNS BIGINT(20) ASDECLARE_ts DATETIME = NOW(6);_count BIGINT;BEGININSERT INTO notifications SELECT _ts, * FROM match_offers_to_subscribers(_interval);_count = row_count();INSERT INTO subscribers_last_notificationSELECT city_id, subscriber_id, tsFROM notificationsWHERE ts = _tsON DUPLICATE KEY UPDATE last_notification = _ts;RETURN _count;END //CREATE OR REPLACE PROCEDURE update_segments(_since DATETIME(6), _until DATETIME(6)) ASBEGININSERT INTO subscriber_segmentsSELECT * FROM dynamic_subscriber_segments(_since, _until)ON DUPLICATE KEY UPDATE expires_at = VALUES(expires_at);END //CREATE OR REPLACE PROCEDURE update_sessions(_session_id TEXT, _lease_duration_sections INT(11)) ASDECLARE_num_alive_controllers QUERY(c INT) =SELECT COUNT(*) FROM sessionsWHERE is_controller AND expires_at > NOW(6);_num_transactions QUERY(i INT) = SELECT @@trancount;BEGININSERT INTO sessionsSETsession_id = _session_id,expires_at = NOW() + INTERVAL _lease_duration_sections SECONDON DUPLICATE KEY UPDATE expires_at = VALUES(expires_at);START TRANSACTION;IF SCALAR(_num_alive_controllers) = 0 THENUPDATE sessionsSET is_controller = (session_id = _session_id);END IF;ECHO SELECTsession_id, is_controller, expires_atFROM sessionsWHERE session_id = _session_id;COMMIT;DELETE FROM sessionsWHERE NOW(6) > (expires_at + INTERVAL (_lease_duration_sections * 2) SECOND);EXCEPTIONWHEN OTHERS THENIF SCALAR(_num_transactions) > 0 THENROLLBACK;END IF;END //DELIMITER ;
Load Data with Pipelines
This part of the tutorial shows how to ingest MarTech data from a public AWS S3 bucket into the SingleStore database using pipelines.
Note
The SQL Editor only runs the queries that you select, so ensure you have them all selected before selecting Run.
-
Run the following SQL commands to create the pipelines:
USE martech;CREATE OR REPLACE PIPELINE citiesAS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/cities.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE cities;CREATE OR REPLACE PIPELINE locationsAS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/locations.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE locations;CREATE OR REPLACE PIPELINE notificationsAS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/notifications.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE notifications;CREATE OR REPLACE PIPELINE offersAS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/offers.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE offers;CREATE OR REPLACE PIPELINE purchasesAS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/purchases.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE purchases;CREATE OR REPLACE PIPELINE requestsAS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/requests.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE requests;CREATE OR REPLACE PIPELINE segmentsAS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/segments.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE segments;CREATE OR REPLACE PIPELINE sessionsAS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/sessions.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE sessions;CREATE OR REPLACE PIPELINE subscriber_segmentsAS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/subscriber_segments.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE subscriber_segments;CREATE OR REPLACE PIPELINE subscribersAS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/subscribers.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE subscribers;CREATE OR REPLACE PIPELINE subscribers_last_notificationAS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/subscribers_last_notification.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE subscribers_last_notification;CREATE OR REPLACE PIPELINE worldcitiesAS LOAD DATA S3 's3://singlestore-docs-example-datasets/martech/worldcities.csv'CONFIG '{"region":"us-east-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE worldcities; -
Run the following SQL commands to start the pipelines:
USE martech;START ALL PIPELINES;
Once the Success message is returned for all the created pipelines, SingleStore starts ingesting the data from the S3 bucket.
Verify the Pipeline Success
Query the pipelines_
information schema view to inspect the progress of the pipelines.worldcities
pipeline has finished ingesting data.
SELECT * FROM information_schema.pipelines_filesWHERE pipeline_name = "worldcities";
You can also run the pipelines in foreground to easily verify that all the data has been ingested.
START PIPELINE <pipeline_name> FOREGROUND;
For SingleStore Helios deployments, you can also monitor the progress of your pipelines on the Cloud Portal, select Deployments > <your_
Run Queries on Data
You can start running queries once the data is loaded into your SingleStore database.
Query 1: Send Notifications
This query calls a function that matches offers with subscribers and sends notifications to subscribers.
USE martech;INSERT INTO notifications (ts,city_id, subscriber_id, offer_id, cost_cents, lonlat)SELECT now(),* FROM match_offers_to_subscribers("second");
Query OK, 5234 rows affected (2.166 s)
Query 2: Analyze Conversion Rate
This query calculates the number of customers who have made purchases so far and determines the conversion rate.
Note
As time progresses, more users will have made purchases, and the conversion rate may change.
USE martech;SELECT*,CONCAT(FORMAT((total_conversions / total_notifications) * 100,2),"%") AS conversion_rate,RANK() OVER (ORDER BY ((total_conversions / total_notifications)) desc) AS rankFROM (SELECToffer_notification.customer,offer_notification.notification_zone,COUNT(offer_notification.offer_id) as total_notifications,COUNT(purchases.vendor) as total_conversionsFROM (SELECT offers.offer_id, offers.customer , notifications.ts, notifications.city_id, notifications.subscriber_id, offers.notification_zoneFROM offers, notificationsWHERE offers.offer_id = notifications.offer_id) offer_notificationLEFT JOIN purchases ONoffer_notification.city_id = purchases.city_idAND offer_notification.subscriber_id = purchases.subscriber_idAND purchases.ts > offer_notification.tsAND purchases.vendor = offer_notification.customerGROUP BY 1LIMIT 5 /* remove LIMIT to see full result */);
+----------+---------------------------------------------------------------------------------------------------------------------------------------------+---------------------+-------------------+-----------------+------+
| customer | notification_zone | total_notifications | total_conversions | conversion_rate | rank |
+----------+---------------------------------------------------------------------------------------------------------------------------------------------+---------------------+-------------------+-----------------+------+
| Jazzy | POLYGON((-74.01000000 40.71250000, -74.00750000 40.71250000, -74.00750000 40.71500000, -74.01000000 40.71500000, -74.01000000 40.71250000)) | 90 | 51 | 56.67% | 1 |
| Eidel | POLYGON((-74.00000000 40.74500000, -73.99750000 40.74500000, -73.99750000 40.74750000, -74.00000000 40.74750000, -74.00000000 40.74500000)) | 78 | 34 | 43.59% | 2 |
| Skibox | POLYGON((-74.01500000 40.72500000, -74.01250000 40.72500000, -74.01250000 40.72750000, -74.01500000 40.72750000, -74.01500000 40.72500000)) | 40 | 17 | 42.50% | 3 |
| Kazu | POLYGON((-73.99750000 40.72250000, -73.99500000 40.72250000, -73.99500000 40.72500000, -73.99750000 40.72500000, -73.99750000 40.72250000)) | 88 | 24 | 27.27% | 4 |
| Rhyloo | POLYGON((-73.98250000 40.72250000, -73.98000000 40.72250000, -73.98000000 40.72500000, -73.98250000 40.72500000, -73.98250000 40.72250000)) | 181 | 23 | 12.71% | 5 |
+----------+---------------------------------------------------------------------------------------------------------------------------------------------+---------------------+-------------------+-----------------+------+
5 rows in set (0.51 sec)
Connect to the Analytics Application
To connect the MarTech data application to an analytics application and monitor the conversion rate of previously sent notifications, visit https://digital-marketing.
Select the Connect button.
Last modified: October 28, 2024