Python
Dependencies
Code
Note
The following script is only a proof of concept. For production purposes or loading large amounts of data, use LOAD DATA or pipelines.
#!/usr/bin/env python3 import os import sys import time import threading import argparse import singlestoredb as database parser = argparse.ArgumentParser() parser.add_argument("--host", default=None, help="The hostname of the SingleStoreDB Cloud node to connect to") parser.add_argument("--port", default=None, type=int, help="The port of the SingleStoreDB Cloud node to connect to") parser.add_argument("--user", default="root", help="The user of the SingleStoreDB Cloud node to connect to") parser.add_argument("--password", default="", help="The password of the SingleStoreDB Cloud node to connect to") parser.add_argument("--database", default="simple_benchmark", help="The database to use - note: this database should not exist") parser.add_argument("--num-workers", type=int, default=10, help="The number of insert threads") parser.add_argument("--time", type=int, default=30, help="The number of seconds to run the benchmark for") options = parser.parse_args() HOST = None PORT = None TABLE = "tbl" BATCH_SIZE = 5000 # Pre-generate the workload query QUERY_TEXT = "INSERT INTO %s (val) VALUES %s" % (TABLE, ",".join(["(1)"] * BATCH_SIZE)) def get_connection(host=None, port=None, db=options.database): """ Returns a new connection to the database. """ if host is None: host = HOST if port is None: port = PORT out = database.connect( host=host, port=port, user=options.user, password=options.password, database=db) out.autocommit(True) return out class InsertWorker(threading.Thread): """ A simple thread which inserts empty rows in a loop. """ def __init__(self, stopping): super(InsertWorker, self).__init__() self.stopping = stopping self.daemon = True self.exception = None def run(self): with get_connection() as conn: with conn.cursor() as cur: while not self.stopping.is_set(): cur.execute(QUERY_TEXT) def test_connection(): try: with get_connection(db="information_schema") as conn: conn.is_connected() except database.Error: print("Unable to connect to SingleStoreDB Cloud with provided connection details.") print("Please verify that SingleStoreDB Cloud is running @ %s:%s" % (HOST, PORT)) sys.exit(1) def setup_test_db(): """ Create a database and table for this benchmark to use. """ with get_connection(db="information_schema") as conn: with conn.cursor() as cur: print('Creating database %s' % options.database) try: # note: the following query will fail if there is an existing database cur.execute('CREATE DATABASE %s' % options.database) except database.Error: print("Database %s already exists - since we drop the database at" % options.database) print("the end of this script, please specify an un-used database") print("with the --database flag.") sys.exit(1) cur.execute('USE %s' % options.database) cur.execute('CREATE TABLE IF NOT EXISTS %s (id INT AUTO_INCREMENT PRIMARY KEY, val INT)' % TABLE) def warmup(): print('Warming up workload') with get_connection() as conn: with conn.cursor() as cur: cur.execute(QUERY_TEXT) def run_benchmark(): """ Run a set of InsertWorkers and record their performance. """ stopping = threading.Event() workers = [ InsertWorker(stopping) for _ in range(options.num_workers) ] print('Launching %d workers' % options.num_workers) print('Workload will take approximately %d seconds.' % options.time) [ worker.start() for worker in workers ] time.sleep(options.time) print('Stopping workload') stopping.set() [ worker.join() for worker in workers ] with get_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT COUNT(*) AS count FROM %s" % TABLE) count = cur.fetchall()[0][0] print("%d rows inserted using %d threads" % (count, options.num_workers)) print("%.1f rows per second" % (count / float(options.time))) def cleanup(): """ Cleanup the database this benchmark is using. """ try: with get_connection() as conn: with conn.cursor() as cur: cur.execute('DROP DATABASE IF EXISTS %s' % options.database) except database.Error: pass if __name__ == '__main__': HOST = options.host or "127.0.0.1" PORT = options.port or 3306 cleanup() try: test_connection() setup_test_db() warmup() run_benchmark() except KeyboardInterrupt: print("Interrupted... exiting...")