Create Stored Procedures
This part of the tutorial shows how to create stored procedures that are used to load data using pipelines.
Note
The SQL Editor only runs the queries you have selected, so make sure you have them all selected before selecting Run.
Run the following SQL commands to create the stored procedures:
USE martech;DELIMITER //CREATE OR REPLACE PROCEDURE process_locations(_batch QUERY(subscriber_id BIGINT(20), offset_x DOUBLE, offset_y DOUBLE)) ASDECLARE_expanded QUERY(city_id BIGINT, subscriber_id BIGINT, lonlat GEOGRAPHYPOINT) = SELECTcity_id, subscriber_id,GEOGRAPHY_POINT(GEOGRAPHY_LONGITUDE(center) + (offset_x * diameter),GEOGRAPHY_LATITUDE(center) + (offset_y * diameter)) AS lonlatFROM _batch, cities;BEGININSERT INTO subscribers (city_id, subscriber_id, current_location)SELECT city_id, subscriber_id, lonlatFROM _expandedON DUPLICATE KEY UPDATE current_location = VALUES(current_location);INSERT INTO locations (city_id, subscriber_id, ts, lonlat, olc_8)SELECTcity_id,subscriber_id,now(6) AS ts,lonlat,encode_open_location_code(lonlat, 8) AS olc_8FROM _expanded;END //CREATE OR REPLACE PROCEDURE process_purchases(_batch QUERY(subscriber_id BIGINT(20), vendor TEXT)) ASBEGININSERT INTO purchases (city_id, subscriber_id, ts, vendor)SELECT city_id, subscriber_id, now(6) AS ts, vendorFROM _batch, cities;END //CREATE OR REPLACE PROCEDURE process_requests(_batch QUERY(subscriber_id BIGINT(20), domain TEXT)) ASBEGININSERT INTO requests (city_id, subscriber_id, ts, domain)SELECT city_id, subscriber_id, now(6) AS ts, domainFROM _batch, cities;END//CREATE OR REPLACE PROCEDURE prune_segments(_until datetime(6)) ASBEGINDELETE FROM subscriber_segments WHERE expires_at <= _until;END //CREATE OR REPLACE PROCEDURE run_matching_process(_interval enum('second','minute','hour','day','week','month')) RETURNS BIGINT(20) ASDECLARE_ts DATETIME = NOW(6);_count BIGINT;BEGININSERT INTO notifications SELECT _ts, * FROM match_offers_to_subscribers(_interval);_count = row_count();INSERT INTO subscribers_last_notificationSELECT city_id, subscriber_id, tsFROM notificationsWHERE ts = _tsON DUPLICATE KEY UPDATE last_notification = _ts;RETURN _count;END //CREATE OR REPLACE PROCEDURE update_segments(_since DATETIME(6), _until DATETIME(6)) ASBEGININSERT INTO subscriber_segmentsSELECT * FROM dynamic_subscriber_segments(_since, _until)ON DUPLICATE KEY UPDATE expires_at = VALUES(expires_at);END //CREATE OR REPLACE PROCEDURE update_sessions(_session_id TEXT, _lease_duration_sections INT(11)) ASDECLARE_num_alive_controllers QUERY(c INT) =SELECT COUNT(*) FROM sessionsWHERE is_controller AND expires_at > NOW(6);_num_transactions QUERY(i INT) = SELECT @@trancount;BEGININSERT INTO sessionsSETsession_id = _session_id,expires_at = NOW() + INTERVAL _lease_duration_sections SECONDON DUPLICATE KEY UPDATE expires_at = VALUES(expires_at);START TRANSACTION;IF SCALAR(_num_alive_controllers) = 0 THENUPDATE sessionsSET is_controller = (session_id = _session_id);END IF;ECHO SELECTsession_id, is_controller, expires_atFROM sessionsWHERE session_id = _session_id;COMMIT;DELETE FROM sessionsWHERE NOW(6) > (expires_at + INTERVAL (_lease_duration_sections * 2) SECOND);EXCEPTIONWHEN OTHERS THENIF SCALAR(_num_transactions) > 0 THENROLLBACK;END IF;END //DELIMITER ;
Last modified: October 10, 2024