Cloud Functions
On this page
Note
This is a Preview feature.
SingleStore Cloud Functions is a serverless, cloud-based service that allows users to run code without managing compute resources.
Create and manage cloud functions using the Cloud Portal.
Publish Cloud Functions using the Cloud Portal
Create a Cloud Function
Cloud Functions can be created for shared notebooks only.
-
Using Cloud Functions in the left navigation
-
Using Shared Notebooks
-
Navigate to Editor > Shared.
-
Select a shared notebook.
-
Select Publish (on the top right).
-
New Cloud Function
After selecting Publish, a new dialog box appears.
|
Publish Settings | |
|---|---|
|
Publish as |
Select Cloud Function. |
|
Name |
Enter a name for the cloud function. |
|
Description |
Enter the cloud function description. |
|
Notebook |
Select a shared notebook to publish as a cloud function. |
|
Deployment |
Select the SingleStore deployment (workspace) the notebook will connect to. Selecting a workspace allows connecting to the SingleStore databases referenced in the notebook natively. |
|
Runtime |
Select a runtime from the following:
Note This field is in preview. |
|
Region |
Select a region. |
|
Idle Timeout |
Select an idle timeout. Note This field is in preview. |
Select Next.
Select Publish to publish the notebook as a cloud function.
Publish your first SingleStore Cloud function
Note
This notebook can be run on a Free Starter Workspace. To create a Free Starter Workspace navigate to Start using the left nav. You can also use your existing Standard or Premium workspace with this Notebook.
This Jupyter notebook will help you build your first Cloud Function, showcasing how to leverage the ultra-fast queries of SingleStore to build a responsive API server using FastAPI
Create some simple tables
This setup establishes a basic relational structure to store some items information.
%%sqlDROP TABLE IF EXISTS items;CREATE TABLE IF NOT EXISTSitems (id INT PRIMARY KEY,name VARCHAR(255),price FLOAT);
Importing logging module to enable live logs
import logging
Create a Connection Pool
To run multiple simultaneous queries, we use sqlalchemy to create a pool of sql connections to the workspace you have selected. We also define a method to execute queries and transactions using a connection from this pool.
from sqlalchemy import create_engine, textimport requestsca_cert_url = "https://portal.singlestore.com/static/ca/singlestore_bundle.pem"ca_cert_path = "/tmp/singlestore_bundle.pem"response = requests.get(ca_cert_url)with open(ca_cert_path, "wb") as f:f.write(response.content)sql_connection_string = connection_url.replace("singlestoredb", "mysql+pymysql")engine = create_engine(f"{sql_connection_string}?ssl_ca={ca_cert_path}",pool_size=10, # Maximum number of connections in the pool is 10max_overflow=5, # Allow up to 5 additional connections (temporary overflow)pool_timeout=30 # Wait up to 30 seconds for a connection from the pool)logging.info("Connection to workspace established successfully.")def execute_query(query: str):logging.info(f"Executing query: {query}")with engine.connect() as connection:return connection.execute(text(query))def execute_transaction(transactional_query: str):logging.info(f"Starting transaction for query: {transactional_query}")with engine.connect() as connection:transaction = connection.begin()try:result = connection.execute(text(transactional_query))transaction.commit()logging.info("Transaction committed successfully.")return resultexcept Exception as e:transaction.rollback()logging.error(f"Transaction rolled back due to error: {e}")raise e
Setup Environment
Lets setup the environment ro run a FastAPI app defining the Data Model and an executor to run the different requests in different threads simultaneously
from fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelfrom singlestoredb import connectfrom concurrent.futures import ThreadPoolExecutorimport asyncio# Define the Type of the Dataclass Item(BaseModel):id: intname: strprice: float# Create an executor that can execute queries on multiple threads simultaneouslyexecutor = ThreadPoolExecutor()def run_in_thread(fn, *args):loop = asyncio.get_event_loop()return loop.run_in_executor(executor, fn, *args)
Define FastAPI App
Next, we will be defining a FastAPI app that can insert, query and delete data from your table
app = FastAPI()# add logging middleware@app.middleware("http")async def log_requests(request, call_next):logging.info(f"Incoming request: {request.method} {request.url}")response = await call_next(request)logging.info(f"Response status: {response.status_code}")return response# Get all items@app.get("/items", response_model=list[Item])async def get_items():def get_items_query():result = execute_query("SELECT * FROM items;")rows = result.fetchall()logging.info(f"Fetched {len(rows)} items from the database")return [{"id": row[0], "name": row[1], "price": row[2]} for row in rows]try:return await run_in_thread(get_items_query)except Exception as e:logging.error(f"Error fetching all items: {str(e)}")raise HTTPException(status_code=500, detail=f"Error fetching all items: {str(e)}")# Insert an item@app.post("/items", response_model=dict)async def create_item(item: Item):def insert_item_query():logging.info(f"Inserting item: {item}")result = execute_transaction(f"INSERT INTO items (id, name, price) VALUES ({item.id}, '{item.name}', {item.price})")logging.info(f"Item with id {item.id} inserted successfully")return {"message": f"Item with id {item.id} inserted successfully"}try:return await run_in_thread(insert_item_query)except Exception as e:logging.error(f"Error while inserting item with id {item.id}: {str(e)}")raise HTTPException(status_code=500, detail=f"Error while inserting item with id {item.id}: {str(e)}")# Get item by id@app.get("/items/{item_id}", response_model=Item)async def get_item(item_id: int):def get_item_query():result = execute_query(f"SELECT * FROM items WHERE id={item_id}")row = result.fetchone()if not row:logging.error(f"Item with id {item_id} not found")raise HTTPException(status_code=404, detail="Item not found")logging.info(f"Item with id {item_id} fetched successfully")return {"id": row[0], "name": row[1], "price": row[2]}try:return await run_in_thread(get_item_query)except HTTPException as e:raise eexcept Exception as e:logging.error(f"Error fetching item with id {item_id}: {str(e)}")raise HTTPException(status_code=500, detail=f"Error fetching item with id {item_id}: {str(e)}")# Delete item by id@app.delete("/items/{item_id}", response_model=dict)async def delete_item(item_id: int):logging.info(f"Deleting item with id {item_id}")def delete_item_query():result = execute_transaction(f"DELETE FROM items WHERE id={item_id}")logging.info(f"Number of rows deleted: {result.rowcount}")return {"message": f"number of rows deleted: {result.rowcount}"}try:return await run_in_thread(delete_item_query)except Exception as e:logging.error(f"Error deleting item with id {item_id}: {str(e)}")raise HTTPException(status_code=500, detail=f"Error deleting item with id {item_id}: {str(e)}")
Start the FastAPI server
The link at which the cloud function will be available interactively will be displayed.
import singlestoredb.apps as appsconnection_info = await apps.run_function_app(app)
Publish Cloud Function
After validating the Cloud Function interactively, you can publish it and use it as an API server for your data!

Manage an Existing Cloud Function
To view an existing cloud function, select Cloud Functions in the left navigation.
-
View
-
Update
-
Share
-
Delete
View an Existing Cloud Function
To view an existing cloud function, select the cloud function name under the Name column.
View the following details for each cloud function:
View the details of a cloud function on the right navigation pane.
-
Copy URL
-
View API Keys
-
View Live Logs
-
Share
-
Update
-
Delete
View API Keys
Refer to Aura App API Keys for related information.
View Live Logs
To view live logs of the selected cloud function, select View Live Logs from the ellipsis on the right side.
Update an Existing Cloud Function
To update an existing cloud function, select the ellipsis in the Actions column of the cloud function, and select Update.
A cloud function can also be updated from a shared notebook.
Select Update Cloud Function and select Update.
Share an Existing Cloud Function
To share an existing cloud function, select the ellipsis in the Actions column of the cloud function, and select Share.
Delete an Existing Cloud Function
To delete an existing cloud function, select the ellipsis in the Actions column of the cloud function, and select Delete.
Status of Cloud Functions
|
Status |
Description |
|---|---|
|
Initializing |
The notebook is creating or updating the cloud function. |
|
Active |
The notebook is successfully published as a cloud function. |
|
Failed |
The cloud function is not configured correctly. |
|
Error |
An error unrelated to the notebook code prevented the cloud function from initializing. |
Troubleshoot Cloud Functions
automatically saves a snapshot of the notebook for each execution that fails.
|
Error |
Solution |
|---|---|
|
Workspace Deleted |
Update the cloud function and select a deployment. |
|
Workspace Suspended |
Resume the workspace, or create a new cloud function with a different deployment. |
|
Database Detached |
Reattach the database with the right permissions or create a new cloud function with a different database. |
|
Notebook Deleted/Not Present |
Create a new cloud function with a different notebook. |
|
Internal Errors/Misc |
Reach out to SingleStore Support or use the chat feature in the Cloud Portal. |
Last modified: February 3, 2026