Python

Dependencies

Code

Note

The following script is only a proof of concept. For production purposes or loading large amounts of data, use LOAD DATA or pipelines.

#!/usr/bin/env python3
import os
import sys
import time
import threading
import argparse
import singlestoredb as database
parser = argparse.ArgumentParser()
parser.add_argument("--host", default=None, help="The hostname of the SingleStore node to connect to")
parser.add_argument("--port", default=None, type=int, help="The port of the SingleStore node to connect to")
parser.add_argument("--user", default="root", help="The user of the SingleStore node to connect to")
parser.add_argument("--password", default="", help="The password of the SingleStore node to connect to")
parser.add_argument("--database", default="simple_benchmark", help="The database to use - note: this database should not exist")
parser.add_argument("--num-workers", type=int, default=10, help="The number of insert threads")
parser.add_argument("--time", type=int, default=30, help="The number of seconds to run the benchmark for")
options = parser.parse_args()
HOST = None
PORT = None
TABLE = "tbl"
BATCH_SIZE = 5000
# Pre-generate the workload query
QUERY_TEXT = "INSERT INTO %s (val) VALUES %s" % (TABLE, ",".join(["(1)"] * BATCH_SIZE))
def get_connection(host=None, port=None, db=options.database):
""" Returns a new connection to the database. """
if host is None:
host = HOST
if port is None:
port = PORT
out = database.connect(
host=host,
port=port,
user=options.user,
password=options.password,
database=db)
out.autocommit(True)
return out
class InsertWorker(threading.Thread):
""" A simple thread which inserts empty rows in a loop. """
def __init__(self, stopping):
super(InsertWorker, self).__init__()
self.stopping = stopping
self.daemon = True
self.exception = None
def run(self):
with get_connection() as conn:
with conn.cursor() as cur:
while not self.stopping.is_set():
cur.execute(QUERY_TEXT)
def test_connection():
try:
with get_connection(db="information_schema") as conn:
conn.is_connected()
except database.Error:
print("Unable to connect to SingleStore with provided connection details.")
print("Please verify that SingleStore is running @ %s:%s" % (HOST, PORT))
sys.exit(1)
def setup_test_db():
""" Create a database and table for this benchmark to use. """
with get_connection(db="information_schema") as conn:
with conn.cursor() as cur:
print('Creating database %s' % options.database)
try:
# note: the following query will fail if there is an existing database
cur.execute('CREATE DATABASE %s' % options.database)
except database.Error:
print("Database %s already exists - since we drop the database at" % options.database)
print("the end of this script, please specify an un-used database")
print("with the --database flag.")
sys.exit(1)
cur.execute('USE %s' % options.database)
cur.execute('CREATE TABLE IF NOT EXISTS %s (id INT AUTO_INCREMENT PRIMARY KEY, val INT)' % TABLE)
def warmup():
print('Warming up workload')
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute(QUERY_TEXT)
def run_benchmark():
""" Run a set of InsertWorkers and record their performance. """
stopping = threading.Event()
workers = [ InsertWorker(stopping) for _ in range(options.num_workers) ]
print('Launching %d workers' % options.num_workers)
print('Workload will take approximately %d seconds.' % options.time)
[ worker.start() for worker in workers ]
time.sleep(options.time)
print('Stopping workload')
stopping.set()
[ worker.join() for worker in workers ]
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) AS count FROM %s" % TABLE)
count = cur.fetchall()[0][0]
print("%d rows inserted using %d threads" % (count, options.num_workers))
print("%.1f rows per second" % (count / float(options.time)))
def cleanup():
""" Cleanup the database this benchmark is using. """
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute('DROP DATABASE IF EXISTS %s' % options.database)
except database.Error:
pass
if __name__ == '__main__':
HOST = options.host or "127.0.0.1"
PORT = options.port or 3306
cleanup()
try:
test_connection()
setup_test_db()
warmup()
run_benchmark()
except KeyboardInterrupt:
print("Interrupted... exiting...")

Last modified: August 22, 2024

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK