# Configure Cluster Monitoring with the Operator

## HTTPS Connections

Use the following instructions to configure monitoring with HTTPS connections. To use HTTP connections, skip to [Configure the Exporter Process](https://docs.singlestore.com/#section-idm4480543891112033590369071496.md).

## Create an SSL Secret

Create a Secret containing SSL certificates that will be used for HTTPS connections. The Secret must be named `<cluster-name>-additional-secrets` to be automatically mounted to each pod of the cluster.

## Option 1: Use kubectl

Use `kubectl` to create the Secret.

```shell
kubectl create secret generic <cluster-name>-additional-secrets \
  --from-file=ssl-crt=<path_to_server-cert.pem> \
  --from-file=ssl-key=<path_to_server-key.pem> \
  --from-file=ssl-ca=<path_to_ca-cert.pem>
```

## Option 2: Declare an SSL Secret in a YAML File

The `data` section of the secret must have the following key/value pairs:

* `ssl-crt`: The Base64-encoded server certificate
* `ssl-key`: The Base64-encoded server private key
* `ssl-ca`: The Base64-encoded Certificate Authority (CA) certificate

For example:

```yaml
apiVersion: v1
kind: Secret
metadata:
   name: <cluster-name>-additional-secrets
type: Opaque
data:
  ssl-ca:  ...WdNQWtOQk1SWXdGQ...
  ssl-crt: ...U5wYzJOdk1ROHdEU...
  ssl-key: ...HaVBOTytQaEh2QSt...
```

**Note**: Replace `<cluster-name>` with your SingleStore cluster name.

## Confirm that the Keys are Mounted to the Cluster

1. Exec into the Master Aggregator (MA) pod.
   ```shell
   kubectl exec node-<cluster-name>-master-0 -c node
   ```

2. Confirm that the following files are present in the `/etc/memsql/extra-secret` directory.
   ```
   ssl-crt
   ssl-key
   ssl-ca
   ```

Refer to [SSL Secure Connections](https://docs.singlestore.com/db/v9.1/security/encryption/ssl-secure-connections.md) for more information.

## Add the Exporter SSL Args

1. In the `sdb-operator.yaml` file on the Source cluster, add the following argument to the `args` list in the `sdb-operator` section.
   ```yaml
   "--master-exporter-parameters",
   "--config.ssl-cert=/etc/memsql/extra-secret/ssl-crt
   --config.ssl-key=/etc/memsql/extra-secret/ssl-key --config.use-https --config.user=root --no-cluster-collect.info_schema.tables
   --no-cluster-collect.info_schema.tablestats
   --no-collect.info_schema.tables --no-collect.info_schema.tablestats"
   ```
   Note that this is a single `master-exporter-parameters` argument and the remainder is its value. When modified, the file will resemble the following.

   If the cluster is configured to use the `root` user with SSL, an additional `--config.ssl-ca=/etc/memsql/ssl/ca-cert.pem` argument must be added into the `--master-exporter-parameters`.
   ```yaml
   apiVersion: apps/v1
   kind: Deployment
   metadata:
     name: sdb-operator
     labels:
       app.kubernetes.io/component: operator
   spec:
     replicas: 1
     selector:
       matchLabels:
         name: sdb-operator
     template:
       metadata:
         labels:
           name: sdb-operator
       spec:
         serviceAccountName: sdb-operator
         containers:
           - name: sdb-operator
             image: operator_image_tag
             imagePullPolicy: Always
             args: [
               # Cause the operator to merge rather than replace annotations on services
               "--merge-service-annotations",
               # Allow the process inside the container to have read/write access to the `/var/lib/memsql` volume.
               "--fs-group-id", "5555",
               "--cluster-id", "sdb-cluster"
               "--master-exporter-parameters",
               "--config.ssl-cert=/etc/memsql/extra-secret/ssl-crt --config.ssl-key=/etc/memsql/extra-secret/ssl-key --config.use-https --config.user=root --no-cluster-collect.info_schema.tables --no-cluster-collect.info_schema.tablestats --no-collect.info_schema.tables --no-collect.info_schema.tablestats"          ]
             env:
               - name: WATCH_NAMESPACE
                 valueFrom:
                   fieldRef:
                     fieldPath: metadata.namespace
               - name: POD_NAME
                 valueFrom:
                   fieldRef:
                     fieldPath: metadata.name
               - name: OPERATOR_NAME
                 value: "sdb-operator"
   ```

2. Apply the changes to the cluster.
   ```shell
   kubectl apply -f sdb-operator.yaml
   ```

3. Confirm that the Operator pod is running.
   ```shell
   kubectl get pods

   ```
   ```output

   memsql-operator-758ffb66c8-5sn4l      1/1     Running
   ```

4. Run the following command to force a restart of the `memsql_exporter` container on the master pod.
   ```shell
   kubectl exec -it node-<memsql-cluster-name>-master-0 -cexporter -- /bin/sh -c "kill 1"
   ```

## Configure the Exporter Process

The monitoring exporter should already be running in a container in the Master node Pod on the Source cluster.

If Metrics and Source clusters are the same or are located in the same Kubernetes cluster (in different namespaces, for example), no further action is required, and you may skip to the [next step](https://docs.singlestore.com/#section-idm4604985730918433590373732789.md).

If Metrics and Source clusters are located in different Kubernetes clusters, the exporter process must be exposed to outside of the cluster as a service (such as a Load Balancer service) and this service must be accessible from all nodes in the Metrics cluster.

For example:

1. Retrieve the `ownerReferences` UID.
   ```shell
   kubectl get svc svc-<cluster-name>-ddl -o jsonpath='{.metadata.ownerReferences}'
   ```

2. Modify the `svc-k8s-cluster-exporter.yaml` file using the UID value retrieved in the above step.
   ```yaml
   apiVersion: v1
   kind: Service
   metadata:
     annotations:
       custom: annotations
       service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout: "4000"
     labels:
       app.kubernetes.io/component: master
       app.kubernetes.io/instance: <memsql-cluster-name>
       app.kubernetes.io/name: memsql-cluster
       custom: label
     name: svc-<memsql-cluster-name>-exporter
     namespace: default
     ownerReferences:
     - apiVersion: memsql.com/v1alpha1
       controller: true
       kind: MemsqlCluster
       name: <memsql-cluster-name>
       uid: <ownerReferences-UID>         # Update with ownerReferences UID
   spec:
     externalTrafficPolicy: Cluster
     ipFamilies:
     - IPv4
     ipFamilyPolicy: SingleStack
     ports:
     - name: prometheus
       port: 9104
       protocol: TCP
     selector:
       app.kubernetes.io/instance: <memsql-cluster-name>
       app.kubernetes.io/name: memsql-cluster
       statefulset.kubernetes.io/pod-name: node-<memsql-cluster-name>-master-0
     sessionAffinity: None
     type: LoadBalancer

   ```

3. Create the exporter service.
   ```shell
   kubectl create -f svc-k8s-cluster-exporter.yaml
   ```

## Configure the Metrics Database

## Determine the Hostname of the Exporter

From the previous step, if the Metrics and Source clusters are the same or are located in the same Kubernetes cluster, then `<name of the master pod>.svc-<cluster name>.<namespace containing the Source cluster master pod>.svc.cluster.local` can be used as the exporter hostname in this section.

However, if the Metrics and Source clusters are located in different Kubernetes clusters, then a hostname/IP address of the created service that can be reached by each node of the Metrics cluster can be used as the exporter hostname in this section.

## Automatically Configure the Metrics Database

## Create and Apply the Tools RBAC

Either use an existing account with sufficient permissions or create a service account that can be used for running the configuration Pod.

1. Save the following to a `tools-rbac.yaml` file.
   ```yaml
   apiVersion: v1
   kind: ServiceAccount
   metadata:
     name: tools
   ---
   apiVersion: rbac.authorization.k8s.io/v1
   kind: Role
   metadata:
     namespace: default
     name: tools-role
   rules:
     - apiGroups:
         - ""
       resources:
         - pods
         - services
         - namespaces
       verbs:
         - get
         - list
     - apiGroups: [ "" ]
       resources: [ "pods/exec" ]
       verbs: [ "create" ]
     - apiGroups:
         - apps
       resources:
         - statefulsets
       verbs:
         - get
         - list
     - apiGroups:
         - memsql.com
       resources:
         - '*'
       verbs:
         - get
         - list
   ---
   apiVersion: rbac.authorization.k8s.io/v1
   kind: RoleBinding
   metadata:
     name: tools
     namespace: default
   subjects:
     - kind: ServiceAccount
       name: tools
   roleRef:
     kind: Role
     name: tools-role
     apiGroup: rbac.authorization.k8s.io

   ```

2. Apply the `tools-rbac.yaml` file to the cluster. This creates a `tools` service account with the required permissions.
   ```shell
   kubectl apply -f tools-rbac.yaml
   ```

## Create and Apply the Start Monitoring Job

## Use Kubernetes Secrets for Credentials

Store database credentials in Kubernetes Secrets. This approach improves security and follows Kubernetes best practices for managing sensitive information.

1. Create a Kubernetes Secret

   Create a Secret that stores the monitoring user credentials:
   ```bash
   kubectl create secret generic monitoring-user-creds \
    --from-literal=user=<monitoring_user> \
    --from-literal=password=<monitoring_password>

   ```
   Replace `<monitoring_password>` with the database password.

2. Reference the Secret in `start-monitoring-job.yaml`

   Update `start-monitoring-job.yaml` to reference the Secret through environment variables:
   ```yaml
   env:
     - name: USER
       valueFrom:
         secretKeyRef:
           name: monitoring-user-creds
           key: user

     - name: PASSWORD
       valueFrom:
         secretKeyRef:
           name: monitoring-user-creds
           key: password
   ```
   This configuration differs from the hardcoded approach in the following ways:

   * The `--user` and `--password` flags reference environment variables (`$(USER)` and `$(PASSWORD)`).
   * The `env` section maps the environment variables to values stored in the `monitoring-user-creds` Secret.
   * Credentials are no longer exposed directly in the YAML file.

3. Apply the Job

   Apply the updated Job configuration:
   ```bash
   kubectl apply -f start-monitoring-job.yaml
   ```

> **📝 Note**: Existing cluster monitoring instances can be configured to collect event traces after upgrading a cluster to SingleStore v8.5 or later. Refer to [Query History](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/query-history.md) for more information on how to fully enable this feature.1) Add `--collect-event-traces` to your existing `start-monitoring-job.yaml` file.
>
>    **HTTP Connections**
>    ```yaml
>    [...]
>    command: ["sdb-admin",
>       "start-monitoring-kube",
>       "--user=$(USER)",
>       "--password=$(PASSWORD)",
>       "--collect-event-traces",
>       "--exporter-host=<exporter-hostname>",
>       "--yes”,
>       <other options…>
>       ]
>
>    env:
>      - name: USER
>        valueFrom:
>          secretKeyRef:
>            name: monitoring-user-creds
>            key: user
>      - name: PASSWORD
>        valueFrom:
>          secretKeyRef:
>            name: monitoring-user-creds
>            key: password
>    [...]
>
>    ```
>    **HTTPS Connections**
>    ```yaml
>    [...]
>    command: ["sdb-admin",
>       "start-monitoring-kube",
>       "--user=$(USER)",
>       "--password=$(PASSWORD)",
>       "--collect-event-traces",
>       "--exporter-host=<exporter-hostname>",
>       "--ssl-ca=/etc/memsql/extra-secret/ssl-ca",
>       "--yes"
>       <other options…>
>       ]
>
>    env:
>      - name: USER
>        valueFrom:
>          secretKeyRef:
>            name: monitoring-user-creds
>            key: user
>      - name: PASSWORD
>        valueFrom:
>          secretKeyRef:
>            name: monitoring-user-creds
>            key: password
>    [...]
>
>
>    ```
>
> 2) Restart monitoring.
>    ```shell
>    kubectl apply -f start-monitoring-job.yaml
>    ```

The following YAML creates a job that sets up the `metrics` database and the associated pipelines.

## With Internet Access

1. Modify the `start-monitoring-job.yaml` file so that it resembles the following. Note that:

   1. `<exporter-hostname>` must be replaced with the exporter hostname from the [Determine the Hostname of the Exporter](https://docs.singlestore.com/#section-idm4566299833192033586931698618.md) step

   2. `<other-options…>` must be removed or replaced with the options available in [sdb-admin start-monitoring-kube](https://docs.singlestore.com/db/v9.1/reference/singlestore-tools-reference/sdb-admin-commands/start-monitoring-kube.md)

   3. The `env` section references the Kubernetes Secret created in [Use Kubernetes Secret for Credentials](https://docs.singlestore.com/#section-id235607703963385.md) to securely provide credentials to the monitoring job.

   **HTTP Connections**
   ```yaml
   apiVersion: batch/v1
   kind: Job
   metadata:
     name: toolbox-start-monitoring
   spec:
     template:
       spec:
         serviceAccountName: tools
         containers:
         image: singlestore/tools:alma-v1.11.6-1.17.2-cc87b449d97fd7cde78fdc4621c2aec45cc9a6cb
         imagePullPolicy: IfNotPresent
         command: ["sdb-admin",
                   "start-monitoring-kube",
                   "--user=$(USER)",
                   "--password=$(PASSWORD)",
                   "--collect-event-traces",
                   "--exporter-host=<exporter-hostname>",
                   "--yes"
   		     <other options…>
                   ]
         env:
            - name: USER
              valueFrom:
                secretKeyRef:
                   name: monitoring-user-creds
                   key: user
            - name: PASSWORD
              valueFrom:
                secretKeyRef:
                   name: monitoring-user-creds
                   key: password
         restartPolicy: Never
     backoffLimit: 2


   ```
   **HTTPS Connections**

   Update the following lines from the above definition:
   ```
          command: ["sdb-admin",
                    "start-monitoring-kube",
                    "--user=$(USER)",
                    "--password=$(PASSWORD)",
                    "--collect-event-traces",
                    "--exporter-host=<exporter-hostname>",
                    "--yes"
                   <other options…>
                   ]
   ```
   to:
   ```yaml
           command: ["sdb-admin",
                     "start-monitoring-kube",
                     "--user=$(USER)",
                     "--password=$(PASSWORD)",
                     "--collect-event-traces",
                     "--exporter-host=<exporter-hostname>",
                     "--ssl-ca=/etc/memsql/extra-secret/ssl-ca",
                     "--yes"
   		<other options…>
                     ]
   ```

2. Run the following command to apply the changes in the `start-monitoring-job.yaml` file.
   ```shell
   kubectl apply -f start-monitoring-job.yaml
   ```

* *Without Internet Access*

  The following steps are only required for those deployment environments that cannot access the internet and therefore require the use of a local registry.1) Run the following command to pull the SingleStore Tools image from Docker Hub.
     ```shell
     docker pull singlestore/tools:alma-v1.11.6-1.17.2-cc87b449d97fd7cde78fdc4621c2aec45cc9a6cb
     ```

  2) Push this image to a container image registry that is accessible by your deployment.
     ```shell
     docker push <internal-registry-name>/singlestore/tools:alma-v1.11.6-1.17.2-cc87b449d97fd7cde78fdc4621c2aec45cc9a6cb
     ```

  3) Modify the `start-monitoring-job.yaml` file so that it resembles the following. Note that:

     1. `<database-user>` must be replaced with the desired database user, such as the admin user

     2. `<database-user-password>` must be replaced with this database user’s password

     3. `<exporter-hostname>` must be replaced with the exporter hostname from the [Determine the Hostname of the Exporter](https://docs.singlestore.com/#section-idm4566299833192033586931698618.md) step

     4. `<other-options…>` must be removed or replaced with the options available in [sdb-admin start-monitoring-kube](https://docs.singlestore.com/db/v9.1/reference/singlestore-tools-reference/sdb-admin-commands/start-monitoring-kube.md)

     **HTTP Connections**
     ```yaml
     apiVersion: batch/v1
     kind: Job
     metadata:
       name: toolbox-start-monitoring
     spec:
       template:
         spec:
           serviceAccountName: tools
           containers:
           - name: toolbox-start-monitoring
             image: <internal-registry-name>singlestore/tools:alma-v1.11.6-1.17.2-cc87b449d97fd7cde78fdc4621c2aec45cc9a6cb  # Update this line
             imagePullPolicy: IfNotPresent
             command: ["sdb-admin",
                       "start-monitoring-kube",
                       "--user=<database-user>",
                       "--password=<database-user-password>",
                       "--collect-event-traces",
                       "--exporter-host=<exporter-hostname>",
                       "--yes"
     		<other-options…>
                       ]
           restartPolicy: Never
       backoffLimit: 2

     ```
     **HTTPS Connections**

     Update the following lines from the above definition:
     ```yaml
             command: ["sdb-admin",
                       "start-monitoring-kube",
                       "--user=<database-user>",
                       "--password=<database-user-password>",
                       "--collect-event-traces",
                       "--exporter-host=<exporter-hostname>",
                       "--yes"
     		<other options…>
                       ]
     ```
     to:
     ```yaml
             command: ["sdb-admin",
                       "start-monitoring-kube",
                       "--user=<database-user>",
                       "--password=<database-user-password>",
                       "--collect-event-traces",
                       "--exporter-host=<exporter-hostname>",
                       "--ssl-ca=/etc/memsql/extra-secret/ssl-ca",
                       "--yes"
     		<other options…>
                       ]
     ```

  4) Run the following command to apply the changes in the `start-monitoring-job.yaml` file.
     ```shell
     kubectl apply -f start-monitoring-job.yaml
     ```

## Confirm that the Start Monitoring Job is Running

Run the following command to confirm that the job has finished successfully. The output displayed will be `Completions 1/1` for `toolbox-start-monitoring`.

```shell
kubectl get jobs

```

```output

NAME                       COMPLETIONS   DURATION   AGE
toolbox-start-monitoring   1/1           13s        21s

```

You may terminate this job by running the following command.

```shell
kubectl delete -f start-monitoring-job.yaml
```

As of Kubernetes 1.23, `ttlSecondsAfterFinished: <seconds>` may be added to the job spec to automatically remove the finished job within the specified number of seconds.

* *Manually Configure the Metrics Database*

  > **📝 Note**: The following steps are only required if the `metrics` database cannot be configured [automatically](https://docs.singlestore.com/#section-idm4554611581256033586934764558.md).The `metrics` database can either reside in the Source cluster, or in a dedicated Metrics cluster.## `metrics` Database DDLRun the following SQL scripts to configure the database.**Please note the following**:- The database that is created to store monitoring data is named `metrics` by default. You must edit these SQL statements manually to change the database name.
  - The database user is `root`. To create the `metrics` database as a user other than `root`, run the following SQL statements and specify the desired database user for `dbmon` and this user’s password for `<password>`.
    ```sql
    CREATE USER 'dbmon' IDENTIFIED BY '<password>';
    GRANT SELECT, CREATE, INSERT, UPDATE, DELETE, EXECUTE, INDEX, ALTER, DROP, CREATE DATABASE, LOCK TABLES, CREATE VIEW, SHOW VIEW, CREATE ROUTINE ON metrics.* to 'dbmon'@'%';
    GRANT CREATE PIPELINE, DROP PIPELINE, ALTER PIPELINE, START PIPELINE, SHOW PIPELINE ON metrics.* to 'dbmon'@'%';
    ```
  - These commands must be run on the Master Aggregator of the Metrics cluster.Create the `metrics` database and associated tables by copying, pasting, and running each set of SQL statements below\.You may also download the latest [metrics database DDL SQL file](https://assets.contentstack.io/v3/assets/bltac01ee6daa3a1e14/blt57aa9f7ce49466f7/metrics-database-ddl-85_and_later.zip) from SingleStore.## View the `metrics` Database DDL```sql
  create database if not exists metrics;

  CREATE TABLE IF NOT EXISTS metrics.config (
      `hostport`                 TINYBLOB NOT NULL,
      `retention_period_minutes` INT NOT NULL,
      `purge_frequency_minutes`  INT NOT NULL,
      `purge_limit`              INT NOT NULL,
      PRIMARY KEY(`hostport`)
  );

  CREATE TABLE IF NOT EXISTS metrics.`metrics` (
      `labels`                                                                         JSON,
      `name`                                                                           TINYBLOB NOT NULL,
      `memsql_tags`                                                                    JSON COLLATE utf8_bin,
      `cluster`           AS `memsql_tags`::$cluster                                   PERSISTED TINYBLOB,
      `exporter_hostport` AS `memsql_tags`::$host                                      PERSISTED TINYBLOB,
      `host`              AS `labels`::$host                                           PERSISTED TINYBLOB,
      `port`              as `labels`::$port                                           PERSISTED SMALLINT(6),
      `role`              as `labels`::$role                                           PERSISTED TINYBLOB,
      `extractor`         as substring_index(`name`, '_', 1)                           PERSISTED TINYBLOB,
      `subsystem`         as substring_index(substring_index(`name`, '_', 2), '_', -1) PERSISTED TINYBLOB,
      `job`               AS `memsql_tags`::$push_job                                  PERSISTED TINYBLOB,
      `value`                                                                          DOUBLE NOT NULL,
      `intval`            AS cast(floor(`value`) AS signed integer)                    PERSISTED BIGINT(20),
      `time_sec`                                                                       BIGINT(20) NOT NULL,
      SORT KEY `name` (`exporter_hostport`, `cluster`, `time_sec`, `extractor`, `subsystem`, `host`, `role`, `name`),
      SHARD KEY()
  );

  CREATE TABLE IF NOT EXISTS metrics.query_event_history (
      `cluster`                                   VARCHAR(512) NOT NULL,
      `exporter_hostport` AS `memsql_tags`::$host PERSISTED TINYBLOB,
      `memsql_tags`                               JSON NOT NULL,
      `ts`                                        TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
      `node_id`                                   VARCHAR(512),
      `node_ip_addr`                              VARCHAR(512),
      `node_port`                                 VARCHAR(512),
      `node_start_epoch_s`                        BIGINT(4) NOT NULL,
      `event_id`                                  BIGINT(4) NOT NULL,
      `event_time`                                TIMESTAMP NOT NULL,
      `event_type`                                VARCHAR(512) NOT NULL,
      `activity_name`                             VARCHAR(512),
      `connection_id`                             BIGINT,
      `database_name`                             VARCHAR(512),
      `duration_ms`                               BIGINT,
      `error_code`                                VARCHAR(512),
      `error_message`                             TEXT,
      `plan_id`                                   BIGINT,
      `query_text`                                LONGTEXT,
      `resource_pool_name`                        VARCHAR(512),
      `start_time`                                TIMESTAMP,
      `success`                                   BOOLEAN,
      `user_name`                                 VARCHAR(256),
      UNIQUE KEY (`cluster`, `node_ip_addr`, `node_start_epoch_s`, `event_id`, `activity_name`) USING HASH,
      SORT KEY (`exporter_hostport`, `cluster`, `database_name`, `start_time`),
      SHARD KEY (`cluster`, `activity_name`)
  );

  CREATE TABLE IF NOT EXISTS metrics.act_samples (
      `cluster`                                   VARCHAR(512) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
      `exporter_hostport` AS `memsql_tags`::$host PERSISTED TINYBLOB,
      `memsql_tags`                               JSON NOT NULL,
      `ts`                                        TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
      `ACTIVITY_TYPE`                             VARCHAR(512) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
      `ACTIVITY_NAME`                             VARCHAR(512) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
      `AGGREGATOR_ACTIVITY_NAME`                  VARCHAR(512) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
      `DATABASE_NAME`                             VARCHAR(512) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
      `NODE_ID`                                   SMALLINT(6) DEFAULT NULL,
      `PARTITION_ID`                              SMALLINT(6) DEFAULT NULL,
      `CPU_TIME_MS`                               BIGINT(4) DEFAULT NULL,
      `CPU_WAIT_TIME_MS`                          BIGINT(4) DEFAULT NULL,
      `ELAPSED_TIME_MS`                           BIGINT(4) DEFAULT NULL,
      `LOCK_TIME_MS`                              BIGINT(4)  DEFAULT NULL,
      `NETWORK_TIME_MS`                           BIGINT(4)  DEFAULT NULL,
      `DISK_TIME_MS`                              BIGINT(4) DEFAULT NULL,
      `DISK_B`                                    BIGINT(4) DEFAULT NULL,
      `DISK_LOGICAL_READ_B`                       BIGINT(4) DEFAULT NULL,
      `DISK_LOGICAL_WRITE_B`                      BIGINT(4) DEFAULT NULL,
      `DISK_PHYSICAL_READ_B`                      BIGINT(4) DEFAULT NULL,
      `DISK_PHYSICAL_WRITE_B`                     BIGINT(4) DEFAULT NULL,
      `LOCK_ROW_TIME_MS`                          BIGINT(4) DEFAULT NULL,
      `LOG_FLUSH_TIME_MS`                         BIGINT(4) DEFAULT NULL,
      `LOG_BUFFER_TIME_MS`                        BIGINT(4) DEFAULT NULL,
      `LOG_BUFFER_WRITE_B`                        BIGINT(4) DEFAULT NULL,
      `NETWORK_B`                                 BIGINT(4) DEFAULT NULL,
      `NETWORK_LOGICAL_RECV_B`                    BIGINT(4) DEFAULT NULL,
      `NETWORK_LOGICAL_SEND_B`                    BIGINT(4) DEFAULT NULL,
      `MEMORY_BS`                                 BIGINT(4) DEFAULT NULL,
      `MEMORY_MAJOR_FAULTS`                       BIGINT(4) DEFAULT NULL,
      `RUN_COUNT`                                 BIGINT(4)  NULL DEFAULT '0',
      `SUCCESS_COUNT`                             BIGINT(4)  NULL DEFAULT '0',
      `FAILURE_COUNT`                             BIGINT(4)  NULL DEFAULT '0',
      SORT KEY (`exporter_hostport`, `cluster`, `ts`, `ACTIVITY_TYPE`,`ACTIVITY_NAME`,
      `AGGREGATOR_ACTIVITY_NAME`,`DATABASE_NAME`,`NODE_ID`,`PARTITION_ID`),
      SHARD KEY()
  );

  create table IF NOT EXISTS metrics.mv_queries
  (
      `cluster`                                   VARCHAR(512) NOT NULL,
      `exporter_hostport` AS `memsql_tags`::$host PERSISTED TINYBLOB,
      `memsql_tags`                               JSON NOT NULL,
      `ts`                                        TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      `ACTIVITY_NAME`                             VARCHAR(512),
      `QUERY_TEXT`                                LONGTEXT,
      `PLAN_WARNINGS`                             VARCHAR(8192),
      SORT KEY (`exporter_hostport`, `cluster`, `ts`, `ACTIVITY_NAME`),
      SHARD KEY()
  );

  create table IF NOT EXISTS metrics.cluster_info
  (
      `cluster`                                   VARCHAR(512) NOT NULL,
      `exporter_hostport` AS `memsql_tags`::$host PERSISTED TINYBLOB,
      `memsql_tags`                               JSON NOT NULL,
      `ts`                                        TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
      `status`                                    JSON NOT NULL,
      SORT KEY (`exporter_hostport`, `cluster`, `ts`),
      SHARD KEY()
  );

  create table IF NOT EXISTS metrics.db_status
  (
      `cluster`           AS `memsql_tags`::$cluster   PERSISTED TINYBLOB,
      `exporter_hostport` AS `memsql_tags`::$host      PERSISTED TINYBLOB,
      `memsql_tags`                                    JSON NOT NULL,
      `ts`                                             TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
      `keys`                                           JSON NOT NULL,
      `values`                                         JSON NOT NULL,
      database_name       AS `keys`::$database_name    PERSISTED TINYBLOB,
      `role`              as `keys`::$role             PERSISTED tinyblob,
      num_partitions      as `values`::$num_partitions PERSISTED SMALLINT(6),
      `online`            as `values`::$online         PERSISTED SMALLINT(6),
      `offline`           as `values`::$offline        PERSISTED SMALLINT(6),
      replicating         as `values`::$replicating    PERSISTED SMALLINT(6),
      recovering          as `values`::$recovering     PERSISTED SMALLINT(6),
      pending             as `values`::$pending        PERSISTED SMALLINT(6),
      transition          as `values`::$transition     PERSISTED SMALLINT(6),
      unrecoverable       as `values`::$unrecoverable  PERSISTED SMALLINT(6),
      ref_db_state        as `values`::$ref_db_state   PERSISTED TINYBLOB,
      summary             as `values`::$summary        PERSISTED TINYBLOB,
      SORT KEY (`exporter_hostport`, `cluster`, `ts`, `database_name`),
      SHARD KEY()
  );

  create table if not exists metrics.mv_events
  (
      `cluster`           AS `memsql_tags`::$cluster PERSISTED TINYBLOB,
      `exporter_hostport` AS `memsql_tags`::$host    PERSISTED TINYBLOB,
      `memsql_tags`                                  JSON NOT NULL,
      `ts`                                           TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
      `keys`                                         JSON NOT NULL,
      `values`                                       JSON NOT NULL,
      `event_ts`                                     TIMESTAMP,
      `event_type`        AS `keys`::$event_type     PERSISTED TINYBLOB,
      `origin_node_id`    AS `keys`::$origin_node_id PERSISTED SMALLINT,
      `ip_addr`           AS `values`::$ip_addr      PERSISTED TINYBLOB,
      `port`              AS `values`::$port         PERSISTED SMALLINT,
      `type`              AS `values`::$type         PERSISTED TINYBLOB,
      `severity`          as `keys`::$severity       PERSISTED TINYBLOB,
      `details`           as `values`::$details      PERSISTED TINYBLOB,
      SORT KEY (`exporter_hostport`, `cluster`, `event_ts`, `origin_node_id`),
      SHARD KEY()
  );

  create table if not exists metrics.mv_nodes
  (
      `cluster`                AS `memsql_tags`::$cluster            PERSISTED TINYBLOB,
      `exporter_hostport`      AS `memsql_tags`::$host               PERSISTED TINYBLOB,
      `memsql_tags`                                                  JSON NOT NULL,
      `ts`                                                           TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
      `keys`                                                         JSON NOT NULL,
      `values`                                                       JSON NOT NULL,
      `node_id`                AS `keys`::$node_id                   PERSISTED SMALLINT,
      `ip_addr`                AS `keys`::$ip_addr                   PERSISTED TINYBLOB,
      `port`                   AS `keys`::$port                      PERSISTED SMALLINT,
      `role`                   AS `keys`::$role                      PERSISTED TINYBLOB,
      `external_host`          AS `keys`::$external_host             PERSISTED TINYBLOB,
      `external_port`          AS `keys`::$external_port             PERSISTED SMALLINT,
      `state`                  AS `values`::$state                   PERSISTED TINYBLOB,
      `availability_group`     AS `values`::$availability_group      PERSISTED SMALLINT,
      `num_cpus`               AS `values`::$num_cpus                PERSISTED BIGINT,
      `max_memory_mb`          AS `values`::$max_memory_mb           PERSISTED BIGINT,
      `max_table_memory_mb`    AS `values`::$max_table_memory_mb     PERSISTED BIGINT,
      `memory_used_mb`         AS `values`::$memory_used_mb          PERSISTED BIGINT,
      `table_memory_used_mb`   AS `values`::$table_memory_used_mb    PERSISTED BIGINT,
      `total_data_disk_mb`     AS `values`::$total_data_disk_mb      PERSISTED BIGINT,
      `available_data_disk_mb` AS `values`::$available_data_disk_mb  PERSISTED BIGINT,
      `uptime`                 AS `values`::$uptime                  PERSISTED BIGINT,
      `version`                AS `values`::$version                 PERSISTED TINYBLOB,
      SORT KEY(`exporter_hostport`, `cluster`, `ts`, `node_id`),
      SHARD KEY()
  );

  create table if not exists metrics.connection_attributes
  (
      `cluster`           AS `memsql_tags`::$cluster PERSISTED TINYBLOB,
      `exporter_hostport` AS `memsql_tags`::$host    PERSISTED TINYBLOB,
      `memsql_tags`                                  JSON NOT NULL,
      `ts`                                           TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
      `node_id`                                      BIGINT,
      `user`                                         VARCHAR(256),
      `connection_id`                                BIGINT,
      `attribute_name`                               VARCHAR(256),
      `attribute_value`                              VARCHAR(256),
      SORT KEY(`exporter_hostport`, `cluster`, `ts`, `connection_id`),
      SHARD KEY()
  );

  create table if not exists metrics.pipeline_metrics (
          `cluster`           AS `memsql_tags`::$cluster PERSISTED TINYBLOB,
          `exporter_hostport` AS `memsql_tags`::$host    PERSISTED TINYBLOB,
          `memsql_tags`                                  JSON NOT NULL,
          `ts`                                           TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
          `database_name`                                VARCHAR(512) NOT NULL,
          `pipeline_name`                                VARCHAR(512) NOT NULL,
          `pipeline_id`                                  INTEGER NOT NULL,
          `state`                                        VARCHAR(50) NOT NULL, -- possible values Running, Error, Stopped, Running Foreground etc
          `create_time`                                  VARCHAR(50) NOT NULL,
          `alter_time`                                   VARCHAR(50) NOT NULL,
      SORT KEY(`exporter_hostport`, `cluster`, `ts`, `pipeline_name`),
      SHARD KEY()
  );

  create table if not exists metrics.pipeline_errors (
          `cluster`                                   VARCHAR(512) NOT NULL,
          `exporter_hostport` AS `memsql_tags`::$host    PERSISTED TINYBLOB,
          `memsql_tags`                                  JSON NOT NULL,
          `ts`                                           TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
          `database_name`                                VARCHAR(512) NOT NULL,
          `pipeline_name`                                VARCHAR(512) NOT NULL,
          `batch_id`                                     INTEGER NOT NULL,
          `batch_host`                                   VARCHAR(512) NOT NULL,
          `batch_port`                                   INTEGER NOT NULL,
          `error_type`                                   VARCHAR(20) NOT NULL, -- possible values Error, Warning
          `error_code`                                   INTEGER NOT NULL,
          `error_message`                                TEXT NOT NULL,
          `error_id`                                     INTEGER NOT NULL,
          `error_unix_time`                              VARCHAR(50) NOT NULL,
      SORT KEY (`exporter_hostport`, `cluster`, `ts`, `pipeline_name`),
      UNIQUE KEY `PRIMARY` (`cluster`,`database_name`, `pipeline_name`, `error_id`, `error_unix_time`) using hash,
      SHARD KEY (`cluster`,`database_name`, `pipeline_name`, `error_id`, `error_unix_time`)
  );

  create table if not exists metrics.purge_log
  (
      `ts`           TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
      `hostport`     TINYBLOB NOT NULL,
      `rows_deleted` BIGINT NOT NULL,
      `status`       TINYBLOB NOT NULL,
      KEY(`ts`, `hostport`),
      SHARD KEY()
  );

  delimiter //
  create or replace procedure metrics.purge_metrics(current_ts TIMESTAMP, current_hostport TINYBLOB,
      retention_period INT) AS
  declare rc int = 1;
  declare earliest_ts INT = UNIX_TIMESTAMP(TIMESTAMPADD(MINUTE, -retention_period, current_ts));
  begin
      delete from metrics where exporter_hostport = current_hostport and time_sec < earliest_ts;
      rc = row_count();
      update purge_log set rows_deleted = rows_deleted + rc where `hostport` = current_hostport and `ts` = current_ts;
  end //
  delimiter ;

  delimiter //
  create or replace procedure metrics.purge_blobs(current_ts TIMESTAMP, current_hostport TINYBLOB,
      retention_period INT) AS
  declare rc int = 1;
  declare earliest_ts TIMESTAMP = TIMESTAMPADD(MINUTE, -retention_period, current_ts);
  begin
      delete from act_samples where `exporter_hostport` = current_hostport and ts < earliest_ts;
      rc = row_count();
      delete from mv_events where `exporter_hostport` = current_hostport and ts < earliest_ts;
      rc += row_count();
      delete from mv_queries where `exporter_hostport` = current_hostport and ts < earliest_ts;
      rc += row_count();
      delete from cluster_info where `exporter_hostport` = current_hostport and ts < earliest_ts;
      rc += row_count();
      delete from db_status where `exporter_hostport` = current_hostport and ts < earliest_ts;
      rc += row_count();
      delete from mv_nodes where `exporter_hostport` = current_hostport and ts < earliest_ts;
      rc += row_count();
      delete from connection_attributes where `exporter_hostport` = current_hostport and ts < earliest_ts;
      rc += row_count();
      delete from pipeline_metrics where `exporter_hostport` = current_hostport and ts < earliest_ts;
      rc += row_count();
      delete from pipeline_errors where `exporter_hostport` = current_hostport and ts < earliest_ts;
      rc += row_count();
      update purge_log set rows_deleted = rows_deleted + rc where `hostport` = current_hostport and `ts` = current_ts;
  end //
  delimiter ;

  -- This procedure purges monitoring data that is older than retention period set in the config table.
  -- To prevent overrunning this we use purge_log table in order to track when the last purge
  -- was completed. New purge process won't start unless at least some time passed since previous purge,
  -- controlled by `purge_frequency_minutes` in the config table.
  delimiter //
  CREATE or replace PROCEDURE metrics.purge_data(current_hostport TINYBLOB) AS
  declare
      need_purge_qry QUERY(need_purge BOOLEAN) =
          select (TIMESTAMPDIFF(
              MINUTE,
              (select ifnull(max(ts), from_unixtime(0)) from purge_log
                where hostport = current_hostport),
              CURRENT_TIMESTAMP()
          )>=(select ifnull(ANY_VALUE(purge_frequency_minutes), 60)
                from config
               where hostport = current_hostport) );
      retention_period_qry QUERY(retention_period INT) =
          select ifnull(ANY_VALUE(retention_period_minutes), 10080)
          from config
          where hostport = current_hostport;
      retention_period INT = SCALAR(retention_period_qry);
      current_ts TIMESTAMP;
  begin
      IF SCALAR(need_purge_qry) THEN
          BEGIN
              select current_timestamp() into current_ts;
              insert into purge_log values (current_ts, current_hostport, 0, "RUNNING");
              call purge_metrics(current_ts, current_hostport, retention_period);
              call purge_blobs(current_ts, current_hostport, retention_period);
              call purge_event_traces(current_ts, current_hostport, retention_period);
              update purge_log set status = "FINISHED" where `ts` = current_ts;
              EXCEPTION
                  WHEN OTHERS THEN
                      update purge_log set status = "FAILED" where `ts` = current_ts and hostport = current_hostport;
              delete from purge_log where TIMESTAMPDIFF(MINUTE, ts, CURRENT_TIMESTAMP()) > 525600;
          END;
      END IF;
  end //
  delimiter ;

  delimiter //
  CREATE or replace PROCEDURE metrics.load_metrics (metrics_pipe query(`labels` JSON, `name` TINYBLOB, `memsql_tags` JSON COLLATE utf8_bin,`value` DOUBLE NOT NULL,`time_sec` BIGINT(20) NOT NULL)) AS
  DECLARE hostport TINYBLOB;
  BEGIN
      INSERT INTO metrics (
          `labels`,
          `name`,
          `memsql_tags`,
          `value`,
          `time_sec`
      ) SELECT
          `labels`,
          `name`,
          `memsql_tags`,
          `value`,
          `time_sec`
          from metrics_pipe;
      SELECT IFNULL(ANY_VALUE(`memsql_tags`::$host), "") INTO hostport FROM metrics_pipe;
      CALL purge_data(hostport);
  END //
  delimiter ;

  delimiter //
  create or replace procedure metrics.load_blobs (blobs_pipe query(type text, `keys` JSON, `vals` JSON, time_sec BIGINT, `memsql_tags` JSON NULL)) AS
  DECLARE hostport TINYBLOB;
  begin
      insert into act_samples (
          `cluster`,
          `memsql_tags`,
          `ts`,
          `ACTIVITY_TYPE`,
          `ACTIVITY_NAME`,
          `AGGREGATOR_ACTIVITY_NAME`,
          `NODE_ID`,
          `PARTITION_ID`,
          `DATABASE_NAME`,
          `CPU_TIME_MS`,
          `CPU_WAIT_TIME_MS`,
          `ELAPSED_TIME_MS`,
          `LOCK_TIME_MS` ,
          `NETWORK_TIME_MS` ,
          `DISK_TIME_MS`,
          `DISK_B`,
          `DISK_LOGICAL_READ_B`,
          `DISK_LOGICAL_WRITE_B`,
          `DISK_PHYSICAL_READ_B`,
          `DISK_PHYSICAL_WRITE_B`,
          `LOCK_ROW_TIME_MS`,
          `LOG_FLUSH_TIME_MS`,
          `LOG_BUFFER_TIME_MS`,
          `LOG_BUFFER_WRITE_B`,
          `NETWORK_B`,
          `NETWORK_LOGICAL_RECV_B`,
          `NETWORK_LOGICAL_SEND_B`,
          `MEMORY_BS`,
          `MEMORY_MAJOR_FAULTS`,
          `RUN_COUNT`,
          `SUCCESS_COUNT`,
          `FAILURE_COUNT`
      ) select
      `memsql_tags`::$cluster,
      `memsql_tags`,
      FROM_UNIXTIME(time_sec),
      `keys`::$activity_type,
      `keys`::$activity_name,
      `keys`::$aggregator_activity_name,
      `keys`::node_id,
      `keys`::partition_id,
      `keys`::$db_name,
      `vals`::cpu_time_ms,
      `vals`::cpu_wait_time_ms,
      `vals`::elapsed_time_ms,
      `vals`::lock_time_ms,
      `vals`::network_time_ms,
      `vals`::disk_time_ms,
      `vals`::disk_b,
      `vals`::disk_logical_read_b,
      `vals`::disk_logical_write_b,
      `vals`::disk_physical_read_b,
      `vals`::disk_physical_write_b,
      `vals`::lock_row_time_ms,
      `vals`::log_flush_time_ms,
      `vals`::log_buffer_time_ms,
      `vals`::log_buffer_write_b,
      `vals`::network_b,
      `vals`::network_logical_recv_b,
      `vals`::network_logical_send_b,
      `vals`::memory_bs,
      `vals`::memory_major_faults,
      `vals`::run_count,
      `vals`::success_count,
      `vals`::failure_count
      from blobs_pipe
      where type = 'activity';

      insert into mv_events (
          `memsql_tags`,
          `keys`,
          `values`,
          `ts`,
          `event_ts`
      )
      select
      `memsql_tags`,
      `keys`,
      `vals`,
      FROM_UNIXTIME(time_sec),
      FROM_UNIXTIME(`keys`::$event_time)
      from blobs_pipe b
      left join (SELECT distinct cluster,
          origin_node_id,
          event_ts from mv_events) k
      ON b.`memsql_tags`::$cluster = k.cluster
      and b.`keys`::$origin_node_id = k.origin_node_id
      and b.`keys`::$event_time = k.event_ts
      where type = 'event'
      and k.cluster is null
      and k.origin_node_id is null;

      insert into mv_queries (
          `cluster`,
          `ACTIVITY_NAME`,
          `memsql_tags`,
          `ts`,
          `QUERY_TEXT`,
          `PLAN_WARNINGS`
      )
      select
      `memsql_tags`::$cluster,
      `keys`::$activity_name,
      -- memsql_tags and time_sec are the same for the whole batch
      ANY_VALUE(`memsql_tags`),
      FROM_UNIXTIME(ANY_VALUE(time_sec)),
      ANY_VALUE(`vals`::$query_text),
      ANY_VALUE(`vals`::$plan_warnings)
      from blobs_pipe b
      left join (SELECT distinct cluster, ACTIVITY_NAME from mv_queries) k
      ON b.`memsql_tags`::$cluster = k.cluster and b.`keys`::$activity_name = k.ACTIVITY_NAME
      where type = 'query'
      and k.cluster is null and k.ACTIVITY_NAME is null
      group by 1, 2;

      insert into cluster_info (
          `cluster`,
          `memsql_tags`,
          `ts`,
          `status`
      ) select
      `memsql_tags`::$cluster,
      `memsql_tags`,
      FROM_UNIXTIME(time_sec),
      `vals`
      from blobs_pipe
      where type = 'cluster';

      insert into db_status (
          `memsql_tags`,
          `keys`,
          `values`,
          `ts`
      ) select
      `memsql_tags`, `keys`, `vals`, FROM_UNIXTIME(time_sec)
      from blobs_pipe
      where type = 'db_state';

      insert into mv_nodes (
          `memsql_tags`,
          `keys`,
          `values`,
          `ts`
      ) select
      `memsql_tags`, `keys`, `vals`, FROM_UNIXTIME(time_sec)
      from blobs_pipe
      where type = 'node';

      insert into connection_attributes (
          `memsql_tags`,
          `node_id`,
          `connection_id`,
          `user`,
          `attribute_name`,
          `attribute_value`,
          `ts`
      ) select
      memsql_tags, vals::$node_id, vals::$connection_id, vals::$user, vals::$attribute_name, vals::$attribute_value,
      FROM_UNIXTIME(time_sec)
      from blobs_pipe
      where type = 'conn_attrs';

      insert into pipeline_metrics (
          `memsql_tags`,
          `database_name`,
          `pipeline_name`,
          `pipeline_id`,
          `state`,
          `create_time`,
          `alter_time`,
          `ts`
      ) select
      memsql_tags,`keys`::$database_name, `keys`::$pipeline_name, `keys`::$pipeline_id,
      vals::$state, vals::$create_time, vals::$alter_time, FROM_UNIXTIME(time_sec)
      from blobs_pipe
      where type = 'pipeline_metric';

      insert into pipeline_errors (
          `cluster`,
          `memsql_tags`,
          `database_name`,
          `pipeline_name`,
          `batch_id`,
          `batch_host`,
          `batch_port`,
          `error_type`,
          `error_code`,
          `error_message`,
          `error_id`,
          `error_unix_time`,
          `ts`
      ) select
       `memsql_tags`::$cluster, memsql_tags, `keys`::$database_name, `keys`::$pipeline_name, `keys`::$batch_id,
      `keys`::$batch_host, `keys`::$batch_port, `vals`::$error_type, `vals`::$error_code,
      `vals`::$error_message, `vals`::$error_id, `vals`::$ERROR_UNIX_TIMESTAMP, FROM_UNIXTIME(time_sec)
      from blobs_pipe
      where type = 'pipeline_error' ON DUPLICATE KEY UPDATE `ts` = values(ts);

      SELECT IFNULL(ANY_VALUE(`memsql_tags`::$host), "") INTO hostport FROM blobs_pipe;
      CALL purge_data(hostport);
  end //
  delimiter ;

  delimiter //
  create or replace procedure metrics.purge_event_traces(current_ts TIMESTAMP,
      current_hostport TINYBLOB, retention_period INT) AS
  declare rc int = 1;
  declare earliest_ts TIMESTAMP = TIMESTAMPADD(MINUTE, -retention_period, current_ts);
  begin
      delete from query_event_history where exporter_hostport = current_hostport and start_time < earliest_ts;
      rc = row_count();
      update purge_log set rows_deleted = rows_deleted + rc where `hostport` = current_hostport and `ts` = current_ts;
  end //
  delimiter ;

  delimiter //
  CREATE OR REPLACE PROCEDURE metrics.load_event_traces (event_traces_pipe query(type text, `keys` JSON, `vals` JSON, time_sec BIGINT, `memsql_tags` JSON NULL)) AS
  DECLARE hostport TINYBLOB;
  begin
  INSERT INTO query_event_history (
      `cluster`,
      `memsql_tags`,
      `ts`,
      `node_id`,
      `node_ip_addr`,
      `node_port`,
      `node_start_epoch_s`,
      `event_id`,
      `event_time`,
      `event_type`,
      `activity_name`,
      `connection_id`,
      `database_name`,
      `duration_ms`,
      `error_code`,
      `error_message`,
      `plan_id`,
      `query_text`,
      `resource_pool_name`,
      `start_time`,
      `success`,
      `user_name`
  ) SELECT
        `memsql_tags`::$cluster,
        `memsql_tags`,
        FROM_UNIXTIME(time_sec),
        `keys`::$`node_id`,
        `keys`::$`ip_addr`,
        `keys`::$`port`,
        `keys`::$`node_start_epoch_s`,
        `keys`::$`event_id`,
        `keys`::$`event_time`,
        `keys`::$`event_type`,
        `vals`::$`activity_name`,
        `vals`::$`connection_id`,
        `vals`::$`context_database`,
        `vals`::$`duration_ms`,
        `vals`::$`error_code`,
        `vals`::$`error_message`,
        `vals`::$`plan_id`,
        `vals`::$`query_text`,
        `vals`::$`resource_pool_name`,
        `vals`::$`start_time`,
        `vals`::$`success`,
        `vals`::$`user_name`
  FROM event_traces_pipe
  WHERE type = 'events'
  ON DUPLICATE KEY UPDATE `ts` = VALUES(`ts`);

  SELECT IFNULL(ANY_VALUE(`memsql_tags`::$host), '') INTO hostport FROM event_traces_pipe;
  CALL purge_data(hostport);
  end //
  delimiter ;

  -- Grafana Stuff

  delimiter //
  CREATE OR REPLACE FUNCTION metrics.`trim_metric`(input VARCHAR(512) CHARACTER SET utf8 COLLATE utf8_general_ci NULL, prefix VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL) RETURNS VARCHAR(512) CHARACTER SET utf8 COLLATE utf8_general_ci NULL AS begin
    return
    CASE
    WHEN input is not null and prefix is not null and locate(prefix, input, 1) > 0 THEN
      UPPER(substr(input, locate(prefix, input, 1) + length(prefix), length(input) - length(prefix)))
    ELSE input end;
  end //
  delimiter ;

  delimiter //
  CREATE OR REPLACE FUNCTION metrics.`trim_host`(input VARCHAR(512) CHARACTER SET utf8 COLLATE utf8_general_ci NULL, c int default 1) RETURNS VARCHAR(512) CHARACTER SET utf8 COLLATE utf8_general_ci NULL AS begin
   return
   CASE
   WHEN input RLIKE '([[:digit:]]{1,3}\\.){3}[[:digit:]]{1,3}' THEN
    input
   ELSE substring_index(input, '.', c) end;
  end //
  delimiter ;

  -- helper table for integers 1..100
  drop table if exists metrics.ints;

  create reference table metrics.ints
  (
      f int key
  );

  insert into metrics.ints values
  (1), (2), (3), (4), (5), (6), (7), (8), (9), (10),
  (11), (12), (13), (14), (15), (16), (17), (18), (19), (20),
  (21), (22), (23), (24), (25), (26), (27), (28), (29), (30),
  (31), (32), (33), (34), (35), (36), (37), (38), (39), (40),
  (41), (42), (43), (44), (45), (46), (47), (48), (49), (50),
  (51), (52), (53), (54), (55), (56), (57), (58), (59), (60),
  (61), (62), (63), (64), (65), (66), (67), (68), (69), (70),
  (71), (72), (73), (74), (75), (76), (77), (78), (79), (80),
  (81), (82), (83), (84), (85), (86), (87), (88), (89), (90),
  (91), (92), (93), (94), (95), (96), (97), (98), (99), (100);

  ```## `metrics` Database DDL VariablesThe following variables may be set via the `metrics` DDL file.- Monitoring database name: Default is `metrics`.
  - By default, the `metrics.config` table is empty.

    In the case of multi-cluster monitoring, an existing monitoring configuration can be overridden for a specific Source cluster by manually adding a row to this table and specifying:

    * The `<exporter-host><exporter-port>` of the Source cluster, in `hostport` form
    * A `retention_period_minutes` value, in minutes
    * A `purge_frequency_minutes` value, in minutes
    * A `purge_limit` value, an integer
  - `retention_period_minutes` = `10080` minutes (default), which equals `7` days: Period of time to retain monitoring data for a specific Source cluster.
  - `purge_frequency_minutes` = `60` (default), minutes: Time between purges of expired monitoring data for a specific Source cluster.
  - `purge_limit` = `10000` (default), integer: Maximum number of entries deleted during a monitoring data purge for a specific Source cluster.
  - Purge log retention period = `525600` minutes (default), which equals `365` days: Period of time to retain purge log entries.
  - `batch_interval` = `15000` (default), milliseconds: Time interval between data collections.
  - `high_cardinality_metrics` = `false` (default). When set to `true`, additional monitoring data is collected, and a higher load is placed on the Source cluster(s) during collection.
  - `monitoring_version`: No default. Can be specified as either Major.minor version (such as `7.3`) or Major.minor.patch version (such as `7.3.25`)
  - `sample_queries` = `true` (default): Controls whether the queries run on the Source cluster(s) also include the entire query text (from `information_schema.mv_queries`). As query sampling is enabled by default, specifying a value of `false` will not collect query text.## Pipelines DDLRun the following in a SQL client.**Note**: You must edit `exporter-host` and `port` in the following SQL statements to align with where your exporter process resides.* The `exporter-host` is typically the host of your Source cluster’s Master Aggregator that’s running the exporter, and must include `http://`.
  * The default port for the endpoint is `9104`.
  * The IP address provided in the `exporter-host` argument must be resolvable by all hosts in the cluster. If the cluster you are monitoring has hosts that contain leaf nodes (as is common in a multi-host vs. single-host deployment), do not use `localhost`. Instead, provide the explicit IP address or hostname that the other hosts in the cluster can resolve.1. Start the `metrics` pipeline.

     **HTTP Connections**
     ```sql
     CREATE OR REPLACE PIPELINE metrics.metrics AS
     LOAD DATA prometheus_exporter 
     "http://<exporter-host>:<exporter-port>/cluster-metrics"
     CONFIG '{"is_memsql_internal":true, "high_cardinality_metrics":false, 
     "monitoring_version":"<SingleStore version of the 'metrics' cluster>"}' 
     batch_interval 15000
     INTO PROCEDURE `load_metrics` FORMAT JSON;

     ```
     ```sql
     START PIPELINE IF NOT RUNNING metrics.metrics;

     ```
     **HTTPS Connections**
     ```sql
     CREATE OR REPLACE PIPELINE metrics.metrics AS
     LOAD DATA prometheus_exporter 
     "https://<exporter-host>:<exporter-port>/cluster-metrics"
     CONFIG '{"is_memsql_internal":true, "high_cardinality_metrics":false, 
     "monitoring_version":"<SingleStore version of the 'metrics' cluster>",
     "ssl_ca":"/etc/memsql/extra-secret/ssl-ca"}' 
     batch_interval 15000
     INTO PROCEDURE `load_metrics` FORMAT JSON;
     ```
     ```sql
     START PIPELINE IF NOT RUNNING metrics.metrics;
     ```

  2. Start the `blobs` pipeline.

     **HTTP Connections**
     ```sql
     CREATE OR REPLACE PIPELINE metrics.blobs AS
     LOAD DATA prometheus_exporter 
     "http://<exporter-host>:<exporter-port>/samples"
     CONFIG '{"is_memsql_internal":true,
     "download_type":"samples", "sample_queries":true,
     "monitoring_version":"<SingleStore version of the metrics cluster>"}'
     batch_interval 15000
     INTO PROCEDURE `load_blobs` FORMAT JSON;

     ```
     ```sql
     START PIPELINE IF NOT RUNNING metrics.blobs;

     ```
     **HTTPS Connections**
     ```sql
     CREATE OR REPLACE PIPELINE metrics.blobs AS
     LOAD DATA prometheus_exporter 
     "https://<exporter-host>:<exporter-port>/samples"
     CONFIG '{"is_memsql_internal":true,
     "download_type":"samples", "sample_queries":true,
     "monitoring_version":"<SingleStore version of the metrics cluster>",
     "ssl_ca":"/etc/memsql/extra-secret/ssl-ca"}'
     batch_interval 15000
     INTO PROCEDURE `load_blobs` FORMAT JSON;
     ```
     ```sql
     START PIPELINE IF NOT RUNNING metrics.blobs;
     ```

  3. Start the event traces pipeline.

     **HTTP Connections**
     ```sql
     create or replace pipeline metrics.event-traces as 
     load data prometheus_exporter 
     'http://<exporter-host>:<exporter-port>/event-traces' 
     config '{"fetch_latest":true, "is_memsql_internal":true, "download_type":"samples", "monitoring_version":"<SingleStoreDB version of the 'metrics' cluster>",
     batch_interval 15000 
     into procedure `load_event_traces` format json;
     ```
     ```sql
     start pipeline if not running metrics.event-traces;
     ```
     **HTTPS Connections**
     ```sql
     create or replace pipeline metrics.event-traces as 
     load data prometheus_exporter 
     'https://<exporter-host>:<exporter-port>/event-traces' 
     config '{"fetch_latest":true, "is_memsql_internal":true, "download_type":"samples", "monitoring_version":"<SingleStoreDB version of the 'metrics' cluster>", 
     "ssl_ca":"path to ca cert" OR "ssl_capath":"path to ca dir"}' 
     batch_interval 15000 
     into procedure `load_event_traces` format json;
     ```
     ```sql
     start pipeline if not running metrics.event-traces;
     ```

***

Modified at: June 2, 2026

Source: [/db/v9.1/reference/singlestore-operator-reference/monitor-your-kubernetes-cluster/configure-cluster-monitoring-with-the-operator/](https://docs.singlestore.com/db/v9.1/reference/singlestore-operator-reference/monitor-your-kubernetes-cluster/configure-cluster-monitoring-with-the-operator/)

(An index of the documentation is available at /llms.txt)
