Previous
Overview
Create a pipeline that runs a scheduled MQL aggregation against your captured data and stores the results as precomputed summaries. For an overview of how pipelines work, see Data pipelines overview.
This example creates a pipeline that computes hourly temperature averages grouped by location:
viam datapipelines create \
--org-id=<org-id> \
--name=hourly-temp-avg \
--schedule="0 * * * *" \
--data-source-type=standard \
--mql='[{"$match": {"component_name": "temperature-sensor"}}, {"$group": {"_id": "$location_id", "avg_temp": {"$avg": "$data.readings.temperature"}, "count": {"$sum": 1}}}, {"$project": {"location": "$_id", "avg_temp": 1, "count": 1, "_id": 0}}]' \
--enable-backfill=true
The CLI prints the pipeline ID on success. Save this ID to query results and manage the pipeline.
| Flag | Required | Description |
|---|---|---|
--org-id | Yes | Your organization ID. |
--name | Yes | A descriptive name. Must be unique within the organization. |
--schedule | Yes | A cron expression in UTC. Also determines the query time window. See Cron schedule. |
--mql | One of --mql or --mql-path | The MQL aggregation pipeline as a JSON string. |
--mql-path | One of --mql or --mql-path | Path to a file containing the MQL aggregation pipeline as JSON. |
--enable-backfill | Yes | Whether to process historical time windows. true or false. |
--data-source-type | Yes | standard or hotstorage. |
For complex queries, use --mql-path to read from a file:
viam datapipelines create \
--org-id=<org-id> \
--name=hourly-temp-avg \
--schedule="0 * * * *" \
--data-source-type=standard \
--mql-path=./my-pipeline.json \
--enable-backfill=true
Where my-pipeline.json contains:
[
{ "$match": { "component_name": "temperature-sensor" } },
{
"$group": {
"_id": "$location_id",
"avg_temp": { "$avg": "$data.readings.temperature" },
"count": { "$sum": 1 }
}
},
{
"$project": {
"location": "$_id",
"avg_temp": 1,
"count": 1,
"_id": 0
}
}
]
import asyncio
from viam.app.viam_client import ViamClient
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType
API_KEY = "YOUR-API-KEY"
API_KEY_ID = "YOUR-API-KEY-ID"
ORG_ID = "YOUR-ORGANIZATION-ID"
async def main():
opts = ViamClient.Options.with_api_key(
api_key=API_KEY,
api_key_id=API_KEY_ID
)
client = await ViamClient.create_from_dial_options(opts)
data_client = client.data_client
# Returns the pipeline ID
pipeline_id = await data_client.create_data_pipeline(
organization_id=ORG_ID,
name="hourly-temp-avg",
mql_binary=[
{"$match": {"component_name": "temperature-sensor"}},
{"$group": {
"_id": "$location_id",
"avg_temp": {"$avg": "$data.readings.temperature"},
"count": {"$sum": 1},
}},
{"$project": {
"location": "$_id",
"avg_temp": 1,
"count": 1,
"_id": 0,
}},
],
schedule="0 * * * *",
data_source_type=TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_STANDARD,
enable_backfill=False,
)
print(f"Created pipeline: {pipeline_id}")
client.close()
if __name__ == "__main__":
asyncio.run(main())
package main
import (
"context"
"fmt"
"go.viam.com/rdk/app"
"go.viam.com/rdk/logging"
)
func main() {
ctx := context.Background()
logger := logging.NewDebugLogger("pipeline")
viamClient, err := app.CreateViamClientWithAPIKey(
ctx, app.Options{}, "YOUR-API-KEY", "YOUR-API-KEY-ID", logger)
if err != nil {
logger.Fatal(err)
}
defer viamClient.Close()
dataClient := viamClient.DataClient()
mqlStages := []map[string]interface{}{
{"$match": map[string]interface{}{
"component_name": "temperature-sensor",
}},
{"$group": map[string]interface{}{
"_id": "$location_id",
"avg_temp": map[string]interface{}{"$avg": "$data.readings.temperature"},
"count": map[string]interface{}{"$sum": 1},
}},
{"$project": map[string]interface{}{
"location": "$_id",
"avg_temp": 1,
"count": 1,
"_id": 0,
}},
}
pipelineID, err := dataClient.CreateDataPipeline(
ctx, "YOUR-ORGANIZATION-ID", "hourly-temp-avg",
mqlStages, "0 * * * *", false,
&app.CreateDataPipelineOptions{
TabularDataSourceType: app.TabularDataSourceTypeStandard,
},
)
if err != nil {
logger.Fatal(err)
}
fmt.Printf("Created pipeline: %s\n", pipelineID)
}
To get your credentials:
Find your organization ID in the Viam app by clicking your organization name and selecting Settings.
After creating a pipeline, see Query pipeline results to access the output, and Examples and tips for MQL patterns for common robotics use cases.
Was this page helpful?
Glad to hear it! If you have any other feedback please let us know:
We're sorry about that. To help us improve, please tell us what we can do better:
Thank you!