Python

Dependencies

Code

Note

The following script is only a proof of concept. For production purposes or loading large amounts of data, use LOAD DATA or pipelines.

#!/usr/bin/env python3
import os
import sys
import time
import threading
import argparse
import singlestoredb as database
parser = argparse.ArgumentParser()
parser.add_argument("--host", default=None, help="The hostname of the SingleStore node to connect to")
parser.add_argument("--port", default=None, type=int, help="The port of the SingleStore node to connect to")
parser.add_argument("--user", default="root", help="The user of the SingleStore node to connect to")
parser.add_argument("--password", default="", help="The password of the SingleStore node to connect to")
parser.add_argument("--database", default="simple_benchmark", help="The database to use - note: this database should not exist")
parser.add_argument("--num-workers", type=int, default=10, help="The number of insert threads")
parser.add_argument("--time", type=int, default=30, help="The number of seconds to run the benchmark for")
options = parser.parse_args()
HOST = None
PORT = None
TABLE = "tbl"
BATCH_SIZE = 5000
# Pre-generate the workload query
QUERY_TEXT = "INSERT INTO %s (val) VALUES %s" % (TABLE, ",".join(["(1)"] * BATCH_SIZE))
def get_connection(host=None, port=None, db=options.database):
""" Returns a new connection to the database. """
if host is None:
host = HOST
if port is None:
port = PORT
out = database.connect(
host=host,
port=port,
user=options.user,
password=options.password,
database=db)
out.autocommit(True)
return out
class InsertWorker(threading.Thread):
""" A simple thread which inserts empty rows in a loop. """
def __init__(self, stopping):
super(InsertWorker, self).__init__()
self.stopping = stopping
self.daemon = True
self.exception = None
def run(self):
with get_connection() as conn:
with conn.cursor() as cur:
while not self.stopping.is_set():
cur.execute(QUERY_TEXT)
def test_connection():
try:
with get_connection(db="information_schema") as conn:
conn.is_connected()
except database.Error:
print("Unable to connect to SingleStore with provided connection details.")
print("Please verify that SingleStore is running @ %s:%s" % (HOST, PORT))
sys.exit(1)
def setup_test_db():
""" Create a database and table for this benchmark to use. """
with get_connection(db="information_schema") as conn:
with conn.cursor() as cur:
print('Creating database %s' % options.database)
try:
# note: the following query will fail if there is an existing database
cur.execute('CREATE DATABASE %s' % options.database)
except database.Error:
print("Database %s already exists - since we drop the database at" % options.database)
print("the end of this script, please specify an un-used database")
print("with the --database flag.")
sys.exit(1)
cur.execute('USE %s' % options.database)
cur.execute('CREATE TABLE IF NOT EXISTS %s (id INT AUTO_INCREMENT PRIMARY KEY, val INT)' % TABLE)
def warmup():
print('Warming up workload')
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute(QUERY_TEXT)
def run_benchmark():
""" Run a set of InsertWorkers and record their performance. """
stopping = threading.Event()
workers = [ InsertWorker(stopping) for _ in range(options.num_workers) ]
print('Launching %d workers' % options.num_workers)
print('Workload will take approximately %d seconds.' % options.time)
[ worker.start() for worker in workers ]
time.sleep(options.time)
print('Stopping workload')
stopping.set()
[ worker.join() for worker in workers ]
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) AS count FROM %s" % TABLE)
count = cur.fetchall()[0][0]
print("%d rows inserted using %d threads" % (count, options.num_workers))
print("%.1f rows per second" % (count / float(options.time)))
def cleanup():
""" Cleanup the database this benchmark is using. """
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute('DROP DATABASE IF EXISTS %s' % options.database)
except database.Error:
pass
if __name__ == '__main__':
HOST = options.host or "127.0.0.1"
PORT = options.port or 3306
cleanup()
try:
test_connection()
setup_test_db()
warmup()
run_benchmark()
except KeyboardInterrupt:
print("Interrupted... exiting...")

Last modified: February 23, 2024

Was this article helpful?