SOURCE_ID: import_from_snowflake NAME: Import from Snowflake CATEGORY: CRM Run a Snowflake SQL query and import the result rows into Floqer — either as a one-time pull or as an ongoing source that re-runs the query on a schedule and brings in new rows incrementally. Imported rows become available to your Floqer workflows for enrichment, scoring, outreach, and the like. INDEX: 1. Endpoints 2. Pull mode (one-time vs ongoing) 3. Body shape (preview + create) 4. Field catalogue 5. Dynamic options 6. How to configure end-to-end 7. Key notes 8. Where it fits 9. When to use ================================================================================ 1. ENDPOINTS ================================================================================ Source identifier (used in every endpoint path): `import_from_snowflake`. POST /api/v1/sources/import_from_snowflake/preview Scope: sources:read Runs the query and returns a sample page of result rows, without creating anything. Use this to validate the query / connection settings and inspect column shape before committing. POST /api/v1/sources/import_from_snowflake Scope: sources:write Creates the source, imports query results immediately (when `pull_existing`), and — for an active source — re-runs the query on a schedule. Returns `source_instance_id` (the new source's UUID). POST /api/v1/sources/import_from_snowflake/options/ Scope: sources:read Resolves dynamic option values for a field: `warehouse`, `database`, `role`, `schema`, `table` (see §5). GET /api/v1/sources//data Scope: sources:read Paginated rows imported into the created source. `` is the UUID returned by Create. Query: `page_no` (default 1), `page_size` (default 20, max 200). Source-agnostic; see concepts.txt §10. POST /api/v1/sources//sync Scope: sources:write Connects the created source to a workflow and (by default) backfills it with the source's current rows. `` here is the UUID returned by Create. Body: { workflow_id, field_mapping, push_existing?, run? } — `field_mapping` keys are `input.` references on the target workflow, values are the source fields to pull. This step is source-agnostic; see concepts.txt §10 for the full shape. PATCH /api/v1/sources//status Scope: sources:write Pause or resume an active source by UUID. Body: {"status": "active" | "paused"}. Source-agnostic. Every endpoint for this source first verifies the API-key user has an active Snowflake connection. Missing connection → 424. ================================================================================ 2. PULL MODE (ONE-TIME VS ONGOING) ================================================================================ `pull_mode` is required on the create endpoint (not on preview). Values: "static" One-time import. The source runs the query once (when `pull_existing`) and stops — no recurring schedule. `external_id` is ignored. "active" Ongoing source. Runs the query initially (when `pull_existing`) AND re-runs it on the cadence in `schedule` (a cron string) until `expiration_date`, importing new rows incrementally. Set `external_id` to the column that uniquely identifies a row so re-runs don't re-import the same rows (see §4). `schedule` is required when `pull_mode: active`. Otherwise it's ignored. ================================================================================ 3. BODY SHAPE (PREVIEW + CREATE) ================================================================================ Field names are snake_case. Unknown top-level keys are rejected with 400 — the body is strict. Preview accepts: sql_query required: a SELECT statement database required: Snowflake database name warehouse required: warehouse name table required: table name role required: role name schema string; optional external_id string; optional (column used as the per-row dedupe key) Create accepts all the preview fields plus: name required: display name for the new source pull_mode required: "static" | "active" schedule cron string; required when pull_mode === "active" expiration_date ISO date (YYYY-MM-DD); optional (active source only) pull_existing boolean; optional, defaults to true (backfill on create) ================================================================================ 4. FIELD CATALOGUE ================================================================================ sql_query (string) — required A SELECT statement whose result rows become the imported records. Each selected column becomes a field on the imported rows. database (string) — required Snowflake database name. Resolve with the `database` dynamic-options call (§5). warehouse (string) — required Warehouse used to run the query. Resolve with the `warehouse` dynamic-options call (§5). table (string) — required Table name. Resolve with the `table` dynamic-options call (needs `database` + `schema` context). role (string) — required Role used to authorize the query. Resolve with the `role` dynamic-options call (§5). schema (string) — optional Schema name. Resolve with the `schema` dynamic-options call (needs `database` context). external_id (string) — optional Name of the column to use as the per-row deduplication key for an active (incremental) import. Each imported row's stable identifier is set to `row[external_id]`, so re-runs of the query don't re-import the same rows. Omit to use the default incremental key. Ignored for static imports. name (string) — required on create only Human-readable display name for the source. pull_mode (string) — required on create only "static" (one-time) or "active" (ongoing). See §2. schedule (string) — required on create when pull_mode === "active" Cron string for the re-run cadence (e.g. "0 12 * * *"). expiration_date (string) — optional, active only ISO date (YYYY-MM-DD) after which an active source stops re-running. pull_existing (boolean) — optional, create only Defaults to true. When true, the query is run and existing result rows imported on create. ================================================================================ 5. DYNAMIC OPTIONS ================================================================================ Five dynamic-options field names are available for this source. All are POSTs to /api/v1/sources/import_from_snowflake/options/ with body `{ "context"?: {...} }`. warehouse Context: none. Returns warehouse names. database Context: none. Returns database names. role Context: none. Returns role names. schema Context: { "database": "" } — REQUIRED. Returns schemas in that database. table Context: { "database": "", "schema": "" } — REQUIRED. Returns tables in that database + schema. Each call returns `{value, label}` pairs; use the `value` directly in the matching body field. Resolve in order: warehouse / database / role first, then schema (needs database), then table (needs database + schema). Connection check: each call verifies the API-key user has an active Snowflake connection first. Missing connection → 424. ================================================================================ 6. HOW TO CONFIGURE END-TO-END ================================================================================ Typical flow for an AI agent building a Snowflake import source: Step 1 — Resolve connection settings POST .../options/warehouse Body: {} → pick warehouse POST .../options/database Body: {} → pick database POST .../options/role Body: {} → pick role POST .../options/schema Body: {"context": {"database": ""}} POST .../options/table Body: {"context": {"database": "", "schema": ""}} Step 2 — Preview before committing POST /api/v1/sources/import_from_snowflake/preview Body: { "sql_query": "SELECT id, email, name FROM CUSTOMERS", "database": "", "warehouse": "", "schema": "", "table": "CUSTOMERS", "role": "" } Check the returned `data[]` row shape (one key per selected column). Iterate the query until you're happy — preview never creates anything. Step 3 — Create the source POST /api/v1/sources/import_from_snowflake Body: + the create-only fields: "name": "", "pull_mode": "static" | "active", "schedule": "0 12 * * *", // only when pull_mode=active "expiration_date": "2027-01-01", // optional, active only "pull_existing": true, // optional, defaults true "external_id": "id" // active incremental key (optional) Response: { "status": 201, "data": { "source_instance_id": "", "name", "created_at" }} Step 3b — (Optional) Poll imported rows while the import runs GET /api/v1/sources//data?page_no=1&page_size=20 ( = UUID from Step 3) Row keys match the columns your query selects. `total_count` grows until the import finishes. Step 4 — Sync the source into a workflow (so its rows flow downstream) POST /api/v1/sources//sync ( = UUID from Step 3 ) First build `field_mapping`: a. GET /api/v1/workflows → pick the destination workflow_id. b. GET /api/v1/workflows//sheets//inputs → each input has a `reference` like `{{input.email}}`. c. Map each workflow input (reference WITHOUT braces) to a source field — the source fields are the columns your query selects. Body: { "workflow_id": "", "field_mapping": { "input.email": "EMAIL", "input.name": "NAME" }, "push_existing": true, "run": "all" } See concepts.txt §10 for the full sync semantics (source-agnostic). The initial import runs asynchronously — the create call returns as soon as the source is created and the import has started, not when rows have finished arriving. ================================================================================ 7. KEY NOTES ================================================================================ - Snowflake connection is mandatory. If the API-key user has no active Snowflake connection, every endpoint (preview, create, options) returns 424 ("User is not connected to 'snowflake'."). Surface this as a "connect Snowflake first" CTA. - `sql_query`, `database`, `warehouse`, `table`, and `role` are all required on preview and create. `schema` and `external_id` are optional. - For an active (incremental) import, set `external_id` to the column that uniquely identifies a row, so re-runs don't re-import the same rows. `external_id` is ignored for static imports. - Each selected column in `sql_query` becomes a field on the imported rows (and an option for `field_mapping` on sync). - Credits are consumed when the source is created. If credit consumption fails (e.g. insufficient balance), the source is still created but its import can't start; the API returns 402 so you can top up and retry. - Use PATCH /status to pause an active source and resume it later without recreating it. ================================================================================ 8. WHERE IT FITS ================================================================================ UPSTREAM (in Snowflake) Rows live in the user's Snowflake account. The query, run under the chosen warehouse / database / schema / role, determines what gets imported. THIS SOURCE Creates a Snowflake import source. On creation it runs the query and imports results (when `pull_existing`); an active source re-runs the query on the `schedule` cron until `expiration_date`, bringing in new rows incrementally (keyed by `external_id`). DOWNSTREAM (in Floqer) Imported rows become available to your Floqer workflows — wire the source into a workflow to enrich, score, or run outreach on each row. ================================================================================ 9. WHEN TO USE ================================================================================ - One-time pull of a Snowflake query result into a Floqer workflow: `pull_mode: static`. - Keep a Snowflake query in sync incrementally on a schedule: `pull_mode: active`, `schedule: "0 12 * * *"`, `external_id: ""`. When you instead want to push data OUT of Floqer, use the relevant action templates inside a workflow — see the action catalog in llms-full.txt. ================================================================================ Last updated: 2026-06-02. Reference: https://floqer.com/docs/reference