Working with Python UDFs
On this page
Equivalent Data Types
When defining Python UDFs, input parameters and return values must be mapped between Python and SingleStore data types.
Following are the default Python to SingleStore mappings:
Python Type |
SingleStore Type |
---|---|
|
|
|
|
|
|
|
|
|
|
For finer control, use the singlestoredb.
module to specify types such as SMALLINT
, VARCHAR
, DECIMAL
, or JSON
.
Handling NULL Values
By default, Python UDFs and TVFs do not allow NULL
values.NULL
support depends on whether the Python function is scalar or vectorized.
Scalar Python Functions (UDFs and TVFs)
-
Use
Optional[.
(or. . ] type
|None
in Python 3.10+) to allow parameters and return values to accept NULL
. -
This makes the entire parameter or return value nullable.
-
If any argument is
NULL
, the function can returnNULL
as needed.
Vectorized Python Functions (UDFs and TVFs)
-
Optional[.
cannot be used for element-level NULLs inside vectors.. . ] -
Instead, use the
Masked
annotation for parameters and return values that must supportNULL
. -
A
Masked
value consists of:-
A data vector containing the non-NULL values.
-
A boolean mask vector, where
True
indicates aNULL
element.
-
-
This ensures that
NULL
values are preserved and propagated correctly across inputs and outputs. -
In Python TVFs, each output column can independently use
Masked
to indicate nullability.
Overriding Parameter and Return Value Types
You can specify an output schema using the returns=
parameter in the @udf
decorator.
-
A list of SQL types with
name=
specified, or -
A
NamedTuple
,TypedDict
, orpydantic.
.BaseModel
The schema class is automatically inferred and it does not need to be returned explicitly, it is only used to define the output schema.
Cancelling Running Python UDFs
When a query that uses a Python UDF is cancelled or when the connection between the database engine and the Python UDF server is broken, the Python UDF execution continues until it can be safely interrupted.
-
Synchronous Python UDFs
-
Scalar Python UDFs: Cancellation can only occur between row function calls.
-
Vectorized Python UDFs: Cancellation occurs only after all rows in the current batch are processed.
-
-
Asynchronous Python UDFs
-
Cancellation is detected more quickly.
-
When a disconnect is detected, an
asyncio.
is raised the next time the Python UDF becomes active.CancelledError -
If synchronous operations are used inside an
async
Python UDF, they must complete before cancellation occurs. -
Nested async calls are cancelled as soon as they activate again.
-
SingleStore recommends using async
before the function definition for better cancellation handling.
Note
Use asynchronous Python UDFs and async
libraries wherever possible to ensure that Python UDFs can be cancelled promptly.
Timeouts
A timeout can be applied directly in the @udf
decorator using the timeout=
parameter.
-
The value must be specified in seconds.
-
If the timeout period is exceeded, the Python UDF is cancelled automatically.
User Permissions
To allow users other than the organization owner to execute Python UDFs or TVFs, the organization owner must grant the appropriate permissions.
Required Permissions
-
EXECUTE
(Object-Level Permission)-
Required to execute a function in a specific database.
-
Can be granted on all functions in a database or a specific function.
-
Following example demonstrates the syntax:
GRANT EXECUTE ON <db_name>.<function_name> TO 'user'@'%';
-
-
OUTBOUND
(Global Permission)-
Required to allow functions to make external network requests.
-
Must be granted globally.
-
Following example demonstrates the syntax:
GRANT OUTBOUND ON *.* TO 'user'@'%';
-
Note
Both permissions must be granted to ensure the user can execute Python UDFs or TVFs successfully.
Examples
Example 1: Calculate Sales Metrics with Vectorized Python TVFs
This example demonstrates a vectorized Python TVF that computes total and average sales per row and returns a table with named columns.
import typingimport numpy as npimport pandas as pdfrom singlestoredb.functions import udf, Tableimport numpy.typing as npt# Define output schema using NamedTupleclass SalesOutput(typing.NamedTuple):total_sales: floataverage_sales: floatcategory: str@udf(returns=SalesOutput)async def vector_sales(units_sold: npt.NDArray[np.float64], # Vector of units soldunit_price: npt.NDArray[np.float64], # Vector of unit pricescategory: npt.NDArray[np.str_] # Vector of categories) -> Table[pd.DataFrame]:"""Calculate total and average sales per row with category.Parameters----------units_sold : np.ndarray[np.float64]Number of units soldunit_price : np.ndarray[np.float64]Price per unitcategory : np.ndarray[str]Category name for each rowReturns-------pd.DataFrameTable with columns:- total_sales- average_sales- category"""# Compute vectorized total and averagetotal_sales = units_sold * unit_priceaverage_sales = total_sales / np.maximum(units_sold, 1) # Avoid divide by zero# Return as a DataFrame with named columnsdf = pd.DataFrame({'total_sales': total_sales,'average_sales': average_sales,'category': category})return Table(df)# Start Python UDF serverimport singlestoredb.apps as appsconnection_info = await apps.run_udf_app()
This results in the following external function:
CREATE EXTERNAL FUNCTION `vector_sales`(`units_sold` DOUBLE NOT NULL,`unit_price` DOUBLE NOT NULL,`category` TEXT NOT NULL)RETURNS TABLE(`total_sales` DOUBLE NOT NULL,`average_sales` DOUBLE NOT NULL,`category` TEXT NOT NULL)AS REMOTE SERVICE "http://<svchost>/<endpoint>/invoke" FORMAT ROWDAT_1;
Example 2: Vector Embeddings
This example demonstrates a Python UDF that can connect to any deployed embedding service in SingleStore Aura.
import base64import sysimport numpy as npimport requestsimport numpy.typing as nptfrom singlestoredb.functions import udf# Configuration for the embedding serviceMODEL_NAME = 'name-of-embedding-service'MODEL_URL = 'url-of-embedding-server'TOKEN = 'api-token-of-embedding-service'HEADERS = {'accept': 'application/json','Content-Type': 'application/json','Authorization': f'Bearer {TOKEN}',}@udfasync def mixedbread_embeddings(text: npt.NDArray[np.str_], # Input: array of strings) -> npt.NDArray[np.bytes_]: # Output: array of bytes"""Generate vector embeddings for an array of text inputs.Parameters----------text : numpy.ndarrayArray of input stringsReturns-------numpy.ndarrayArray of byte embeddings"""# Send request to embedding serviceres = requests.post(MODEL_URL,headers=HEADERS,json=dict(model=MODEL_NAME,input=text.tolist(),encoding_format='base64',),)# Raise an error if the request failedif res.status_code >= 400:print(res.content.decode('utf8'), file=sys.stderr)raise RuntimeError(res.content.decode('utf8'))# Decode embeddings and return as a NumPy array of bytesreturn np.array([base64.b64decode(x['embedding']) for x in res.json()['data']],dtype=object, # Preserve byte arrays correctly)# Start Python UDF serverimport singlestoredb.apps as appsconnection_info = await apps.run_udf_app()
This results in the following external function:
CREATE EXTERNAL FUNCTION `mixedbread_embeddings`(`text` TEXT NOT NULL)RETURNS BLOB NOT NULLAS REMOTE SERVICE "base-url-of-udf-server-including-port"FORMAT ROWDAT_1;
Example 3: OpenAI Integration with Structured Output
This example demonstrates a Python table-valued function (TVF) that returns structured output using a pydantic.
.
import osfrom typing import Listfrom pydantic import BaseModel, Fieldfrom singlestoredb.functions import udf, Tableimport instructorfrom openai import OpenAI# Create OpenAI client via instructor wrapperclient = instructor.from_openai(OpenAI(api_key=os.getenv("OPENAI_API_KEY")))# Define output schemaclass Synonym(BaseModel):word: str = Field(description='Synonym of the given word')score: float = Field(description='Closeness score from 0.0 to 1.0')# Define the TVF@udfasync def get_synonyms(word: str) -> Table[List[Synonym]]:"""Return a list of synonyms of the given word and a score."""return Table(client.create(model='gpt-4o-mini',messages=[dict(role='system', content='You are a helpful assistant'),dict(role='user',content=f'''* Get a list of synonyms of the word "{word}"* Limit the number of results to 10''')],response_model=List[Synonym],max_retries=0,))# Start Python UDF serverimport singlestoredb.apps as appsconnection_info = await apps.run_udf_app()
This results in the following external function:
CREATE EXTERNAL FUNCTION`get_synonyms`(`word` TEXT NOT NULL)RETURNS TABLE(`word` TEXT NOT NULL,`score` DOUBLE NOT NULL)AS REMOTE SERVICE "http://<svchost>/<endpoint>/invoke" FORMAT ROWDAT_1;
Run the following command to retrieve the synonyms of the word “danger”.
SELECT * FROM get_synonyms('danger');
The command results the following:
word score0 hazard 0.901 risk 0.852 threat 0.883 peril 0.904 jeopardy 0.875 threatening 0.806 threatening situation 0.707 threatened 0.758 insecurities 0.609 unsafe 0.65
Example 4: Machine Learning Model Scoring
This example demonstrates how to use a pre-trained Keras model stored in Link text to score a collection of input parameters.
import numpy as npimport pandas as pdimport numpy.typing as nptimport singlestoredb.notebook as nbfrom tensorflow.keras.models import load_modelfrom singlestoredb.functions import udf# Download model from Stagenb.stage.download_file('my_model.keras', local_path='my_model.keras')# Load the modelmodel = load_model('my_model.keras')@udfasync def keras_score(param_1: npt.NDArray[np.float32],param_2: npt.NDArray[np.float32],param_3: npt.NDArray[np.float32],) -> npt.NDArray[np.float32]:"""Score rows using a Keras model.Parameters----------param_1 : np.ndarray[np.float32]First input parameter.param_2 : np.ndarray[np.float32]Second input parameter.param_3 : np.ndarray[np.float32]Third input parameter.Returns-------np.ndarray[np.float32]Predicted scores from the model."""X_test = pd.DataFrame({'param_1': param_1,'param_2': param_2,'param_3': param_3})return model.predict(X_test).reshape((-1,))# Start Python UDF serverimport singlestoredb.apps as appsconnection_info = await apps.run_udf_app()
Last modified: September 18, 2025