# Python

**Dependencies**

* [SingleStore Python Client](https://docs.singlestore.com/db/v9.1/developer-resources/connect-with-application-development-tools/connect-with-python/connect-using-the-singlestore-python-client.md)

**Code**

> **📝 Note**: The following script is only a proof of concept. For production purposes or loading large amounts of data, use [LOAD DATA](https://docs.singlestore.com/db/v9.1/reference/sql-reference/data-manipulation-language-dml/load-data.md) or [pipelines](https://docs.singlestore.com/db/v9.1/load-data/about-singlestore-pipelines.md).

```python
#!/usr/bin/env python3

import os
import sys
import time
import threading
import argparse

import singlestoredb as s2

parser = argparse.ArgumentParser()

parser.add_argument("--host", default=None, help="The hostname of the SingleStore node to connect to")
parser.add_argument("--port", default=None, type=int, help="The port of the SingleStore node to connect to")
parser.add_argument("--user", default="root", help="The user of the SingleStore node to connect to")
parser.add_argument("--password", default="", help="The password of the SingleStore node to connect to")

parser.add_argument("--database", default="simple_benchmark", help="The database to use - note: this database should not exist")

parser.add_argument("--num-workers", type=int, default=10, help="The number of insert threads")
parser.add_argument("--time", type=int, default=30, help="The number of seconds to run the benchmark for")

options = parser.parse_args()

HOST = None
PORT = None

TABLE = "tbl"
BATCH_SIZE = 5000

# Pre-generate the workload query
QUERY_TEXT = "INSERT INTO %s (val) VALUES %s" % (TABLE, ",".join(["(1)"] * BATCH_SIZE))

def get_connection(host=None, port=None, db=options.database):
    """ Returns a new connection to the database. """
    if host is None:
        host = HOST
    if port is None:
        port = PORT

    out = s2.connect(
        host=host,
        port=port,
        user=options.user,
        password=options.password,
        database=db)
    out.autocommit(True)
    return out

class InsertWorker(threading.Thread):
    """ A simple thread which inserts empty rows in a loop. """

    def __init__(self, stopping):
        super(InsertWorker, self).__init__()
        self.stopping = stopping
        self.daemon = True
        self.exception = None

    def run(self):
        with get_connection() as conn:
            with conn.cursor() as cur:
                while not self.stopping.is_set():
                    cur.execute(QUERY_TEXT)

def test_connection():
    try:
        with get_connection(db="information_schema") as conn:
            conn.is_connected()
    except s2.Error:
        print("Unable to connect to SingleStore with provided connection details.")
        print("Please verify that SingleStore is running @ %s:%s" % (HOST, PORT))
        sys.exit(1)

def setup_test_db():
    """ Create a database and table for this benchmark to use. """

    with get_connection(db="information_schema") as conn:
        with conn.cursor() as cur:
            print('Creating database %s' % options.database)

            try:
                # note: the following query will fail if there is an existing database
                cur.execute('CREATE DATABASE %s' % options.database)
            except s2.Error:
                print("Database %s already exists - since we drop the database at" % options.database)
                print("the end of this script, please specify an un-used database")
                print("with the --database flag.")
                sys.exit(1)

            cur.execute('USE %s' % options.database)

            cur.execute('CREATE TABLE IF NOT EXISTS %s (id INT AUTO_INCREMENT PRIMARY KEY, val INT)' % TABLE)

def warmup():
    print('Warming up workload')
    with get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(QUERY_TEXT)

def run_benchmark():
    """ Run a set of InsertWorkers and record their performance. """

    stopping = threading.Event()
    workers = [ InsertWorker(stopping) for _ in range(options.num_workers) ]

    print('Launching %d workers' % options.num_workers)
    print('Workload will take approximately %d seconds.' % options.time)

    [ worker.start() for worker in workers ]
    time.sleep(options.time)

    print('Stopping workload')

    stopping.set()
    [ worker.join() for worker in workers ]

    with get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT COUNT(*) AS count FROM %s" % TABLE)
            count = cur.fetchall()[0][0]

    print("%d rows inserted using %d threads" % (count, options.num_workers))
    print("%.1f rows per second" % (count / float(options.time)))

def cleanup():
    """ Cleanup the database this benchmark is using. """
    try:
        with get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute('DROP DATABASE IF EXISTS %s' % options.database)
    except s2.Error:
        pass

if __name__ == '__main__':
    HOST = options.host or "127.0.0.1"
    PORT = options.port or 3306

    cleanup()

    try:
        test_connection()
        setup_test_db()
        warmup()
        run_benchmark()
    except KeyboardInterrupt:
        print("Interrupted... exiting...")
```

***

Modified at: March 26, 2025

Source: [/db/v9.1/developer-resources/concurrent-multi-insert-examples/python/](https://docs.singlestore.com/db/v9.1/developer-resources/concurrent-multi-insert-examples/python/)

(An index of the documentation is available at /llms.txt)
