Python
Warning
SingleStore 9.0 gives you the opportunity to preview, evaluate, and provide feedback on new and upcoming features prior to their general availability. In the interim, SingleStore 8.9 is recommended for production workloads, which can later be upgraded to SingleStore 9.0.
Dependencies
Code
#!/usr/bin/env python3import osimport sysimport timeimport threadingimport argparseimport singlestoredb as s2parser = argparse.ArgumentParser()parser.add_argument("--host", default=None, help="The hostname of the SingleStore node to connect to")parser.add_argument("--port", default=None, type=int, help="The port of the SingleStore node to connect to")parser.add_argument("--user", default="root", help="The user of the SingleStore node to connect to")parser.add_argument("--password", default="", help="The password of the SingleStore node to connect to")parser.add_argument("--database", default="simple_benchmark", help="The database to use - note: this database should not exist")parser.add_argument("--num-workers", type=int, default=10, help="The number of insert threads")parser.add_argument("--time", type=int, default=30, help="The number of seconds to run the benchmark for")options = parser.parse_args()HOST = NonePORT = NoneTABLE = "tbl"BATCH_SIZE = 5000# Pre-generate the workload queryQUERY_TEXT = "INSERT INTO %s (val) VALUES %s" % (TABLE, ",".join(["(1)"] * BATCH_SIZE))def get_connection(host=None, port=None, db=options.database):""" Returns a new connection to the database. """if host is None:host = HOSTif port is None:port = PORTout = s2.connect(host=host,port=port,user=options.user,password=options.password,database=db)out.autocommit(True)return outclass InsertWorker(threading.Thread):""" A simple thread which inserts empty rows in a loop. """def __init__(self, stopping):super(InsertWorker, self).__init__()self.stopping = stoppingself.daemon = Trueself.exception = Nonedef run(self):with get_connection() as conn:with conn.cursor() as cur:while not self.stopping.is_set():cur.execute(QUERY_TEXT)def test_connection():try:with get_connection(db="information_schema") as conn:conn.is_connected()except s2.Error:print("Unable to connect to SingleStore with provided connection details.")print("Please verify that SingleStore is running @ %s:%s" % (HOST, PORT))sys.exit(1)def setup_test_db():""" Create a database and table for this benchmark to use. """with get_connection(db="information_schema") as conn:with conn.cursor() as cur:print('Creating database %s' % options.database)try:# note: the following query will fail if there is an existing databasecur.execute('CREATE DATABASE %s' % options.database)except s2.Error:print("Database %s already exists - since we drop the database at" % options.database)print("the end of this script, please specify an un-used database")print("with the --database flag.")sys.exit(1)cur.execute('USE %s' % options.database)cur.execute('CREATE TABLE IF NOT EXISTS %s (id INT AUTO_INCREMENT PRIMARY KEY, val INT)' % TABLE)def warmup():print('Warming up workload')with get_connection() as conn:with conn.cursor() as cur:cur.execute(QUERY_TEXT)def run_benchmark():""" Run a set of InsertWorkers and record their performance. """stopping = threading.Event()workers = [ InsertWorker(stopping) for _ in range(options.num_workers) ]print('Launching %d workers' % options.num_workers)print('Workload will take approximately %d seconds.' % options.time)[ worker.start() for worker in workers ]time.sleep(options.time)print('Stopping workload')stopping.set()[ worker.join() for worker in workers ]with get_connection() as conn:with conn.cursor() as cur:cur.execute("SELECT COUNT(*) AS count FROM %s" % TABLE)count = cur.fetchall()[0][0]print("%d rows inserted using %d threads" % (count, options.num_workers))print("%.1f rows per second" % (count / float(options.time)))def cleanup():""" Cleanup the database this benchmark is using. """try:with get_connection() as conn:with conn.cursor() as cur:cur.execute('DROP DATABASE IF EXISTS %s' % options.database)except s2.Error:passif __name__ == '__main__':HOST = options.host or "127.0.0.1"PORT = options.port or 3306cleanup()try:test_connection()setup_test_db()warmup()run_benchmark()except KeyboardInterrupt:print("Interrupted... exiting...")
Last modified: March 26, 2025