Previous
Export data
Integrate Viam’s captured data with your own database or data infrastructure. This is useful when compliance requires you to control where data resides, when you already have a MongoDB cluster with dashboards and pipelines built on it, or when you want to reduce dependency on any single vendor at the storage layer.
Viam handles reliable machine-to-cloud transport. Your database handles long-term storage, retention, access control, and downstream analytics.
| Pattern | Mechanism | Latency | Best For |
|---|---|---|---|
| API/SDK queries | Pull from Viam programmatically | On-demand | Dashboards, bulk export, ad-hoc analysis |
| CLI export | Batch download to local files | Manual | One-time migration, backups |
| Direct MongoDB connection | Query through connection URI | On-demand | External tools, custom sync scripts |
In this architecture, Viam handles everything from the machine to the cloud: data capture on the machine, local buffering when connectivity drops, and reliable sync to Viam’s cloud storage. Your infrastructure takes over from there. A sync script or direct connection pulls data from Viam and writes it to your database. Your team manages retention, access control, roll-ups, and downstream consumers against your own cluster.
Viam provides a MongoDB Atlas Data Federation interface to your captured data. Before you can connect, you need to set up credentials.
Log in to the Viam CLI:
viam login
Find your organization ID:
viam organizations list
Copy the ID for the organization whose data you want to access.
Configure a database password for your organization. This creates a database user tied to your organization:
viam data database configure --org-id=<YOUR-ORG-ID> --password=<NEW-PASSWORD>
Choose a strong password. You will use it in connection URIs and scripts.
Retrieve the connection hostname:
viam data database hostname --org-id=<YOUR-ORG-ID>
The output includes the hostname and a connection URI. Save both values.
With credentials configured, you can connect to your Viam data using any
MongoDB client. This step uses mongosh (the MongoDB shell) to verify the
connection and explore your data interactively.
Install mongosh if you do not have it:
brew install mongosh
On Linux, follow the mongosh installation guide.
Connect using the connection URI from step 1:
mongosh "mongodb://db-user-<YOUR-ORG-ID>:<PASSWORD>@<HOSTNAME>/?ssl=true&authSource=admin"
Replace <YOUR-ORG-ID>, <PASSWORD>, and <HOSTNAME> with your values.
Switch to the sensor data database:
use sensorData
List available collections:
show collections
You should see a readings collection containing your captured tabular data.
Query a few documents to verify:
db.readings.find().sort({time_received: -1}).limit(5)
This connection URI works with any MongoDB-compatible tool: Compass, Studio 3T, database drivers in any language, or your own applications.
For one-time exports, backups, or migration, the Viam CLI provides bulk export commands.
Export tabular data (sensor readings):
Tabular export requires you to specify the exact part, resource, and method:
viam data export tabular \
--part-id=<YOUR-PART-ID> \
--resource-name=my-sensor \
--resource-subtype=sensor \
--method=Readings \
--destination=./export
You can add time filters to narrow the export:
viam data export tabular \
--part-id=<YOUR-PART-ID> \
--resource-name=my-sensor \
--resource-subtype=sensor \
--method=Readings \
--start=2025-01-01T00:00:00Z \
--end=2025-02-01T00:00:00Z \
--destination=./january-export
Export binary data (images, point clouds):
Binary export supports broad filtering by organization, location, or machine:
viam data export binary filter --org-ids=<YOUR-ORG-ID> --destination=./export
Once exported, you can load these files into your own database, convert them to CSV for analysis tools, or archive them for compliance.
For automated workflows, use the Viam SDK to query data from Viam and write it directly to your own MongoDB cluster.
Install dependencies:
pip install viam-sdk pymongo
Save this as sync_to_db.py:
import asyncio
from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
from pymongo import MongoClient
API_KEY = "<your-api-key>"
API_KEY_ID = "<your-api-key-id>"
ORG_ID = "<your-org-id>"
# Your own MongoDB cluster.
MY_MONGO_URI = "mongodb+srv://user:pass@your-cluster.mongodb.net/"
MY_DB_NAME = "my_robot_data"
MY_COLLECTION_NAME = "sensor_readings"
async def connect() -> ViamClient:
dial_options = DialOptions(
credentials=Credentials(
type="api-key",
payload=API_KEY,
),
auth_entity=API_KEY_ID,
)
return await ViamClient.create_from_dial_options(dial_options)
async def main():
# Connect to Viam.
viam_client = await connect()
data_client = viam_client.data_client
# Query recent data from Viam.
data = await data_client.tabular_data_by_mql(
organization_id=ORG_ID,
mql_binary=[
{"$match": {
"component_name": "my-sensor",
"time_received": {
"$gte": {"$date": "2025-01-01T00:00:00Z"},
},
}},
{"$sort": {"time_received": 1}},
{"$limit": 1000},
],
)
print(f"Pulled {len(data)} records from Viam.")
# Connect to your own database and insert.
my_client = MongoClient(MY_MONGO_URI)
my_db = my_client[MY_DB_NAME]
my_collection = my_db[MY_COLLECTION_NAME]
inserted = 0
for record in data:
my_collection.insert_one(record)
inserted += 1
print(f"Inserted {inserted} records into {MY_DB_NAME}.{MY_COLLECTION_NAME}.")
my_client.close()
viam_client.close()
if __name__ == "__main__":
asyncio.run(main())
Run it:
python sync_to_db.py
Install dependencies:
mkdir sync-data && cd sync-data
go mod init sync-data
go get go.viam.com/rdk
go get go.mongodb.org/mongo-driver/mongo
Save this as main.go:
package main
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.viam.com/rdk/app"
"go.viam.com/rdk/logging"
)
func main() {
apiKey := "YOUR-API-KEY"
apiKeyID := "YOUR-API-KEY-ID"
orgID := "YOUR-ORGANIZATION-ID"
myMongoURI := "mongodb+srv://user:pass@your-cluster.mongodb.net/"
ctx := context.Background()
logger := logging.NewDebugLogger("sync-data")
// Connect to Viam.
viamClient, err := app.CreateViamClientWithAPIKey(
ctx, app.Options{}, apiKey, apiKeyID, logger)
if err != nil {
logger.Fatal(err)
}
defer viamClient.Close()
dataClient := viamClient.DataClient()
// Query recent data from Viam.
mqlStages := []map[string]interface{}{
{"$match": map[string]interface{}{
"component_name": "my-sensor",
"time_received": map[string]interface{}{
"$gte": map[string]interface{}{
"$date": "2025-01-01T00:00:00Z",
},
},
}},
{"$sort": map[string]interface{}{"time_received": 1}},
{"$limit": 1000},
}
tabularData, err := dataClient.TabularDataByMQL(ctx, orgID, mqlStages, nil)
if err != nil {
logger.Fatal(err)
}
fmt.Printf("Pulled %d records from Viam.\n", len(tabularData))
// Connect to your own MongoDB cluster.
mongoClient, err := mongo.Connect(ctx,
options.Client().ApplyURI(myMongoURI))
if err != nil {
logger.Fatal(err)
}
defer mongoClient.Disconnect(ctx)
collection := mongoClient.Database("my_robot_data").Collection("sensor_readings")
inserted := 0
for _, record := range tabularData {
_, err := collection.InsertOne(ctx, record)
if err != nil {
logger.Errorf("failed to insert record: %v", err)
continue
}
inserted++
}
fmt.Printf("Inserted %d records into my_robot_data.sensor_readings.\n", inserted)
}
Run it:
go run main.go
To get your credentials:
Find your organization ID in the Viam app by clicking your organization name and selecting Settings.
A one-time pull is useful, but most teams need ongoing sync. The key is tracking the last sync timestamp so each run only pulls new data.
Save this as ongoing_sync.py:
import asyncio
import json
import os
from datetime import datetime, timezone
from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
from pymongo import MongoClient
API_KEY = "<your-api-key>"
API_KEY_ID = "<your-api-key-id>"
ORG_ID = "<your-org-id>"
MY_MONGO_URI = "mongodb+srv://user:pass@your-cluster.mongodb.net/"
MY_DB_NAME = "my_robot_data"
MY_COLLECTION_NAME = "sensor_readings"
# File to persist the last sync timestamp between runs.
STATE_FILE = "sync_state.json"
def load_last_sync_time() -> str:
"""Load the last sync timestamp from the state file."""
if os.path.exists(STATE_FILE):
with open(STATE_FILE, "r") as f:
state = json.load(f)
return state.get("last_sync_time", "2000-01-01T00:00:00Z")
return "2000-01-01T00:00:00Z"
def save_last_sync_time(timestamp: str) -> None:
"""Save the last sync timestamp to the state file."""
with open(STATE_FILE, "w") as f:
json.dump({"last_sync_time": timestamp}, f)
async def connect() -> ViamClient:
dial_options = DialOptions(
credentials=Credentials(
type="api-key",
payload=API_KEY,
),
auth_entity=API_KEY_ID,
)
return await ViamClient.create_from_dial_options(dial_options)
async def sync_once():
last_sync = load_last_sync_time()
print(f"Syncing data received after {last_sync}")
viam_client = await connect()
data_client = viam_client.data_client
# Pull only data received since the last sync.
data = await data_client.tabular_data_by_mql(
organization_id=ORG_ID,
mql_binary=[
{"$match": {
"time_received": {
"$gt": {"$date": last_sync},
},
}},
{"$sort": {"time_received": 1}},
{"$limit": 5000},
],
)
if not data:
print("No new data to sync.")
viam_client.close()
return
print(f"Pulled {len(data)} new records.")
# Insert into your database.
my_client = MongoClient(MY_MONGO_URI)
my_db = my_client[MY_DB_NAME]
my_collection = my_db[MY_COLLECTION_NAME]
latest_time = last_sync
inserted = 0
for record in data:
my_collection.insert_one(record)
inserted += 1
# Track the most recent timestamp.
record_time = record.get("time_received", "")
if hasattr(record_time, "isoformat"):
record_time = record_time.isoformat()
if str(record_time) > latest_time:
latest_time = str(record_time)
# Persist the latest timestamp for the next run.
save_last_sync_time(latest_time)
print(f"Inserted {inserted} records. Last sync time: {latest_time}")
my_client.close()
viam_client.close()
if __name__ == "__main__":
asyncio.run(sync_once())
Save this as main.go:
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.viam.com/rdk/app"
"go.viam.com/rdk/logging"
)
const stateFile = "sync_state.json"
type syncState struct {
LastSyncTime string `json:"last_sync_time"`
}
func loadLastSyncTime() string {
data, err := os.ReadFile(stateFile)
if err != nil {
return "2000-01-01T00:00:00Z"
}
var state syncState
if err := json.Unmarshal(data, &state); err != nil {
return "2000-01-01T00:00:00Z"
}
return state.LastSyncTime
}
func saveLastSyncTime(t string) error {
data, err := json.Marshal(syncState{LastSyncTime: t})
if err != nil {
return err
}
return os.WriteFile(stateFile, data, 0644)
}
func main() {
apiKey := "YOUR-API-KEY"
apiKeyID := "YOUR-API-KEY-ID"
orgID := "YOUR-ORGANIZATION-ID"
myMongoURI := "mongodb+srv://user:pass@your-cluster.mongodb.net/"
ctx := context.Background()
logger := logging.NewDebugLogger("sync-data")
lastSync := loadLastSyncTime()
fmt.Printf("Syncing data received after %s\n", lastSync)
// Connect to Viam.
viamClient, err := app.CreateViamClientWithAPIKey(
ctx, app.Options{}, apiKey, apiKeyID, logger)
if err != nil {
logger.Fatal(err)
}
defer viamClient.Close()
dataClient := viamClient.DataClient()
// Pull only data received since the last sync.
mqlStages := []map[string]interface{}{
{"$match": map[string]interface{}{
"time_received": map[string]interface{}{
"$gt": map[string]interface{}{
"$date": lastSync,
},
},
}},
{"$sort": map[string]interface{}{"time_received": 1}},
{"$limit": 5000},
}
tabularData, err := dataClient.TabularDataByMQL(ctx, orgID, mqlStages, nil)
if err != nil {
logger.Fatal(err)
}
if len(tabularData) == 0 {
fmt.Println("No new data to sync.")
return
}
fmt.Printf("Pulled %d new records.\n", len(tabularData))
// Connect to your own MongoDB cluster.
mongoClient, err := mongo.Connect(ctx,
options.Client().ApplyURI(myMongoURI))
if err != nil {
logger.Fatal(err)
}
defer mongoClient.Disconnect(ctx)
collection := mongoClient.Database("my_robot_data").Collection("sensor_readings")
latestTime := lastSync
inserted := 0
for _, record := range tabularData {
_, err := collection.InsertOne(ctx, record)
if err != nil {
logger.Errorf("failed to insert record: %v", err)
continue
}
inserted++
// Track the most recent timestamp.
if tr, ok := record["time_received"]; ok {
var ts string
switch v := tr.(type) {
case time.Time:
ts = v.Format(time.RFC3339)
case string:
ts = v
}
if ts > latestTime {
latestTime = ts
}
}
}
// Persist the latest timestamp for the next run.
if err := saveLastSyncTime(latestTime); err != nil {
logger.Errorf("failed to save sync state: %v", err)
}
fmt.Printf("Inserted %d records. Last sync time: %s\n", inserted, latestTime)
}
Run the sync script on a schedule using cron (Linux/macOS) or Task Scheduler (Windows). For example, to sync every 15 minutes:
crontab -e
Add this line (adjust the path to your script):
*/15 * * * * cd /path/to/sync-data && python ongoing_sync.py >> sync.log 2>&1
For the Go version, compile first and run the binary:
go build -o sync-data .
*/15 * * * * cd /path/to/sync-data && ./sync-data >> sync.log 2>&1
Configure credentials. Run the CLI commands from step 1 to set up database access. Verify you get a hostname and connection URI back.
Connect with mongosh. Use the connection URI to connect and run
db.readings.find().limit(3) in the sensorData database. Verify you see
your captured data.
Export with the CLI. Run viam data export tabular filter --org-ids=<YOUR-ORG-ID> --destination=./test-export and confirm files
appear in the ./test-export/data/ directory.
Run the one-time sync script. Copy the Python or Go script from step 4, fill in your credentials and MongoDB URI, and run it. Verify records appear in your destination database.
Run the ongoing sync script twice. Run the sync script from step 5. Note the count of inserted records. Wait for new data to be captured by your machine, then run it again. The second run should only pull data received after the first run’s latest timestamp.
Was this page helpful?
Glad to hear it! If you have any other feedback please let us know:
We're sorry about that. To help us improve, please tell us what we can do better:
Thank you!