Skip to main content

Source

Flowlyze supports two ways to ingest data from sources: active (pull) and passive (push). Both let you acquire, normalize, and process data consistently across the platform.

Overview

There are two ingestion modes:

  • Active (pull): Flowlyze fetches data from sources on a schedule or on demand.
  • Passive (push): sources send events/messages to endpoints exposed by Flowlyze.

In both cases, data is acquired, normalized, and sent through the same transformation and validation pipelines.

Active Mode (pull)

On a schedule or on demand (via API or manual action in the UI), Flowlyze pulls data from the source and loads it into the internal database. Subsequent processing follows the rules defined in the flow (pipelines, transformations, validations).

Typical sources

  • Standard API connectors (REST/JSON)
  • Relational/NoSQL databases
  • Flat files (e.g., CSV) from local storage or FTP/SFTP

Use cases

  • Every night at 00:00: import a CSV file from an FTP location
  • Every hour: read a table from an operational database

When configuring a Source, you can define a global recurrence using a cron expression.
This lets you precisely schedule how often the source should run (every minute, hourly, daily, or at specific times).

You can also specify the integration type used by Flowlyze (e.g., REST API, database, file system, webhook) so that the Source behavior fits the flow requirements.

All Cron Expression times are in UTC (independent of user locale or tenant default time zone).

The following image shows a source scheduled every minute.

Source template

This configuration means an HTTP Source (outbound API call) will run every minute of every hour, every day.

What is a Cron Expression

A cron expression defines a recurring schedule.
It consists of five or six fields (depending on the system), each representing a time component:

FieldDescriptionAllowed values
1Minutes0–59
2Hours0–23
3Day of month1–31
4Month1–12 or names (JAN–DEC)
5Day of week0–6 or names (SUN–SAT)
Cron expression examples
Cron expressionMeaning
* * * * *Every minute
0 * * * *Hourly at minute 0
0 0 * * *Every day at midnight
0 9 * * 1-5Weekdays at 09:00
*/15 * * * *Every 15 minutes
0 0 1 * *First day of the month at midnight

Incremental reads

Flowlyze provides a built-in memory that enables incremental reads, improving flow performance and reducing the amount of processed data.

Incremental

Choose a discriminating field (timestamp/order). After each run, Flowlyze stores the max processed value; at the next run it only reads records after that value. This significantly reduces load on sources and flows.

The mechanism is based on identifying a discriminating field in the source (e.g., a database column or an HTTP payload field) that represents sequence or time order.

On each execution, Flowlyze stores the maximum (or another aggregate) value observed for that field.
On the next execution, that value is used as a reference to request only new or updated records.

Example behavior

Assume the source has a last_update field.
During the first run, Flowlyze processes all available data and stores the aggregate value (typically the maximum last_update).
On the next run, Flowlyze queries only records with last_update later than the stored value.

This avoids re-reading already processed data, ensuring efficiency, consistency, and scalability.

Configuration parameters
FieldDescription
Variable nameIdentifies the variable bound to the incremental field. Can be referenced using `{{variable_name}}`.
Message JSON PathJSONPath expression pointing to the discriminating field to use for incremental reading.
AggregatorAggregate used to select the value to store (e.g., max, min, etc.). Most commonly max.
Current valueThe current stored value; can be set/overridden manually to restart from a specific point.
Configuration example

The following image shows a configuration that uses last_update as the incremental field.

Source incremental

Passive Mode (push)

Flowlyze exposes an endpoint that external systems can call to send data (single records or batches). This is ideal for near real-time event ingestion.

Webhook Security

Protect the endpoint with an API Key and scope its path per tenant. Consider rate limits and message signatures when available.

Use case

  • An e-commerce sends an order update via webhook to the Flowlyze REST endpoint

Below is an example of enabling an endpoint with path “test” and API key “my-secret-apikey”.

Source template

Combined modes

Both modes can coexist. The same source can:

  • send updates in push (webhook), and
  • be synchronized in pull periodically to ensure alignment and consistency.

Unified process

Regardless of the ingress mode (active or passive), all data:

  1. Is acquired into Flowlyze’s internal database
  2. Is aggregated and normalized (schema, formats, encodings)
  3. Follows the same flow of transformation, enrichment, validation, and delivery to destinations

This ensures operational uniformity, traceability, and scalability.

Http Ingestion (Push)

Endpoint V2

The V2 endpoint allows you to define the data model on the client side, removing the need to conform to a preconfigured schema. Anyone with the URL and API Key can pick the payload format among supported options.

The parser is automatically chosen based on the Content-Type header:

  • JSONapplication/json
  • XMLapplication/xml, text/xml
  • Formapplication/x-www-form-urlencoded, multipart/form-data
JSON payload

If the x-fl-selector query parameter is provided, the endpoint uses the pointed sub-object

If the HTTP query parameter x-fl-selector is present, the endpoint applies the given JSONPath expression to select the sub-object (or list) to process.
If the header is missing, the root object of the JSON body is used. The endpoint automatically detects whether the payload is a single message (object) or a batch (array). JSONPath expressions can select nested elements and lists.

Examples (JSON)
#Input (schematic)Content-Typex-fl-selectorInterpretation
1`[{}, {}, {}]`application/json(omitted)The whole array is a batch: 3 messages.
2`{ "data": [ {}, {}, {} ] }`application/jsondataSelects data: batch of 3 messages.
3`[ { "children": [ {}, {}, {} ] }, { "children": [ {}, {}, {} ] }, { "children": [ {}, {}, {} ] } ]`application/json$.*.children[*]Selects and flattens all children: 9 messages.

Note: examples are compact and omit business fields; each {} represents a complete record/object in production.

If payload parsing fails, Flowlyze still creates a fallback message containing a content field with the original payload for traceability and potential downstream retries.

Endpoint V1

The V1 endpoint (being phased out) relies on a predefined schema. The flow must be configured in advance to correctly interpret the incoming payload.
Within the flow you must specify:

Deprecated

V1 is deprecated and being retired. Migrate to V2 for flexibility and robustness.

  • Consider as single object: whether the endpoint receives a single object
  • path: unique URL tail (last part of https://in.flowlyze.com/api/wh/{tenant_id}/{path})
  • apikey: key passed either as query string or header x-api-key

Example: https://in.flowlyze.com/api/wh/flowlyze-demo/test) for tenant flowlyze-demo and path test.

Required parameters in the flow
Consider as single object

Whether the endpoint receives a single JSON object per request (true) or a list of objects (array).

  • true → payload is a single record
  • false → payload is a list of records
Path

Unique tail at the end of the URL identifying the webhook instance within the tenant.
Example: in /flowlyze-demo/test, the path is test.

API Key

Unique key required for authentication:

  • Query string: ?x-api-key=<KEY>
  • HTTP header: x-api-key: <KEY>
URL examples

For tenant flowlyze-demo and path test: https://in.flowlyze.com/api/wh/flowlyze-demo/test

Authentication via query string: https://in.flowlyze.com/api/wh/flowlyze-demo/prova?x-api-key=<KEY>

Authentication via HTTP header (recommended), curl example:

`curl -X POST \`  
`https://in.flowlyze.com/api/wh/flowlyze-demo/prova \`
`-H "x-api-key: <KEY>" \`
`-H "Content-Type: application/json" \`
`-d '{ "example": "value" }'`

Lifecycle and migration

V1 is being phased out. For schema flexibility (selectors, formats, batch autodiscovery), migrate to V2, which lets clients define the data model and supports JSONPath selection.

Http

An HTTP Source in Flowlyze reads and acquires data from remote endpoints over HTTP/HTTPS, typically exposed by web services, REST APIs, or microservices.
Data is processed as JSON, the standard for structured inter-system exchange.

Advanced HTTP Source configurations let you flexibly control many aspects of integration.

Incremental reading (Delta Reading)

Configure the HTTP source to perform incremental reads, i.e., only new or changed data since the last run.
This uses Flowlyze’s memory and reduces transferred data volume.

Payload parsing

Define parsing and transformation rules for incoming JSON using JSONPath expressions to identify fields, extract portions of data, or reshape the message for the downstream flow.

Supported authentication modes

OAuth2 (Bearer Token)

What it does: obtains an access token from an Authorization Server and sends it as Authorization: Bearer <token>.

When to use: modern enterprise/public APIs (OpenAPI), strong security, token expiry/rotation, scopes/permissions.

Typical Flowlyze config

  • Grant type: commonly Client Credentials for server-to-server (or Authorization Code for interactive users)
  • Token URL: OAuth2 endpoint (e.g., https://auth.example.com/oauth/token)
  • Client ID / Client Secret
  • Scope (optional)
  • Outbound header: Authorization: Bearer {{access_token}} (inserted automatically)

Custom JWT Bearer

What it does: builds a signed JWT (typically RS256) with agreed claims and either sends it directly as Bearer or exchanges it for an access token at a custom endpoint.

When to use: proprietary APIs requiring a signed JWT instead of a standard OAuth token, or a custom “JWT → access token” flow.

Typical Flowlyze config

  • Algorithm: RS256/ES256/HS256 (usually RS256)

  • Private key / Key ID (kid)

  • Claims: iss, sub, aud, iat, exp, plus custom claims

  • Token emission:

    • Direct Bearer: send JWT as Authorization: Bearer <jwt>
    • Exchange: send JWT to an endpoint to obtain an access token, then use as Bearer

Final header example (direct bearer)

Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...

Basic Auth

What it does: sends username:password Base64-encoded in the Authorization header.

When to use: legacy/internal services exposing Basic Auth over HTTPS.

Typical Flowlyze config

  • Username
  • Password
  • Outbound header (automatic)

Note: Always use HTTPS; otherwise Basic Auth credentials are exposed.

API Key

What it does: sends a static key as header or query string.

When to use: simple/public services where OAuth is not required.

Typical Flowlyze config

  • Key value
  • Location: Header (e.g., x-api-key: <key>) or Query (e.g., ?apikey=<key>)

Complete example

https://api.example.com/data?filter=update_date>{{last_update}}

The {last_update} placeholder is set by Flowlyze with the last stored incremental value (e.g., max update_date).

If the API returns this structure:

{
"data": [{}, {}, {}]
}

Use data as the selector for records to ingest.

RDMBS

An RDBMS (SQL) Source connects to relational databases and extracts data using a custom SQL query.
Use it when data resides in enterprise databases and must be read incrementally, parametrically, or transactionally.

The query can be:

  • a simple SELECT, with joins and predicates
  • a stored procedure that handles business logic server-side

You can use global variables or incremental parameters (e.g., {{last_update}}) resolved dynamically by Flowlyze.

Connection

FieldDescription
DB EngineDatabase engine (MySQL, PostgreSQL, Oracle, MSSQL). Drivers and SQL dialect adapt automatically.
HostDatabase hostname or IP.
PortPort (3306 MySQL, 5432 PostgreSQL, 1433 MSSQL, 1521 Oracle).
DatabaseDatabase/schema name.
UsernameUser with read (and proc) permissions.
PasswordUse secrets for security.

Data selection query

Any database-supported statement is allowed. Flowlyze sends the query as read-only or procedure call, depending on the syntax.

Use:

  • global variables ({{variable}})
  • incremental fields (e.g., {{last_update}})
  • limits/pagination to optimize data reading

Examples

-- 1. Import all orders
SELECT * FROM orders;

-- 2. Only orders updated after the last incremental value
SELECT *
FROM orders
JOIN customers ON orders.customer_id = customers.id
WHERE order_update > {{last_update}}
LIMIT 1000;

-- 3. Stored procedure returning and acknowledging data
SELECT * FROM getOrdersAndAcknowledge();

💡 Variables like {{last_update}} are resolved at runtime, enabling incremental/contextual logic.

Acknowledge query (TBD)

You can configure an acknowledge query executed only after the data has been successfully received and confirmed within Flowlyze.

It receives the processed set (fetchedData) and can:

  • update processed flags (e.g., processed = true)
  • log synchronization
  • run custom confirmation procedures

Example:

{{#set "ids"}}{{#each fetchedData}}{{this.id}}{{#unless @last}},{{/unless}}{{/each}}{{/set}}

UPDATE orders
SET processed = TRUE
WHERE id IN ({{ids}});
-- e.g., WHERE id in (1,2,3,4)

This ensures records are marked “read” only after the end-to-end processing, avoiding loss or duplication.

Flat File

A Flat File Source reads and parses structured files in multiple formats (CSV, positional, XML, JSON) from various acquisition channels (FTP, HTTP, S3, Azure Blob Storage).

Each row or logical unit becomes an independent message processed asynchronously for maximum scalability and flexibility.

Acquisition channels

ChannelDescription
FTP / FTPS / SFTPSecurely download files from an FTP server. Auth via user/password or SSH key; paths, filename patterns, retention rules supported.
HTTP (TBD)Download from HTTP/HTTPS with possible authentication (API key, Basic, Bearer).
Amazon S3 (TBD)Connect to a bucket and filter by prefix or filename.
Azure Blob Storage (TBD)Access a container; auth via connection string or service principal.
Azure Blob Storage RFEExtended variant for multi/coordination processing (in development).

Reading protocols

Flowlyze supports two reading protocols:

Coordinated

Flowlyze uses a working directory with read/write permissions:

  1. Copy file from source directory to a temporary processing directory
  2. Read and parse contents
  3. Move file to a completion directory on finish

Advantages:

  • Visibility on pending files
  • Error inspection
  • Full traceability

Directory cleanup and artifact handling are the user’s responsibility.

Simple (TBD)

Flowlyze downloads and processes the file directly, without local copies.
On errors, Flowlyze emits a flow error message with details but does not keep a copy of the original file.

Supported formats

FormatStatusDescription
CSVImplementedDelimited files with advanced parsing (header, quoting, culture, custom delimiters).
PositionalTBDFixed-width files with explicit column positions.
XMLTBDXML with XPath selectors, node mapping.
JSONTBDComplex JSON with JSONPath entity selection.

CSV configuration

ParameterDescription
Line DelimiterUsually \n or \r\n.
Column DelimiterDefault ,; e.g., ;.
Quote CharacterDefault ".
CultureNumber/date culture (e.g., it-IT, en-US).
Has HeaderFirst row contains column names (true/false).
Grouping ColumnColumn used to group multiple rows into a single message.

Row Grouping

Flowlyze can group multiple source rows into one logical message using a grouping column.
This is useful for hierarchical relations or multiple versions of the same record in a file.

During parsing, Flowlyze evaluates the grouping column (e.g., product_id, record_id).
Rows with the same value are aggregated into one JSON object.
Each group yields a single message containing common fields and a nested array with grouped rows.

Example 1 – Variants grouped by main product

CSV with product variants (size, color, price) associated with a main product identified by product_id.

Source file

product_idvariant_idcolorsizeprice
10011redM29.90
10012blueL31.50
10023blackS28.00

Grouping column: product_id

Aggregated output

{
"product_id": 1001,
"variants": [
{ "variant_id": 1, "color": "red", "size": "M", "price": 29.90 },
{ "variant_id": 2, "color": "blue", "size": "L", "price": 31.50 }
]
},
{
"product_id": 1002,
"variants": [
{ "variant_id": 3, "color": "black", "size": "S", "price": 28.00 }
]
}

Example 2 – Grouping multiple saves (journaling tables)

A journaling/audit table records multiple versions of the same record. The flow can consolidate them into one logical message.

Source file

record_idupdate_timefieldold_valuenew_value
5012025-10-01 10:30:00statusdraftpending
5012025-10-02 11:45:00statuspendingapproved
5012025-10-03 09:20:00notenull"OK"

Grouping column: record_id

Aggregated output

{
"record_id": 501,
"data": [
{
"update_time": "2025-10-01T10:30:00Z",
"field": "status",
"old_value": "draft",
"new_value": "pending"
},
{
"update_time": "2025-10-02T11:45:00Z",
"field": "status",
"old_value": "pending",
"new_value": "approved"
},
{
"update_time": "2025-10-03T09:20:00Z",
"field": "note",
"old_value": null,
"new_value": "OK"
}
]
}

In this example, Flowlyze emits one message per record_id, containing the complete change history in chronological order. This consolidates versions into a coherent representation for downstream systems (e.g., data lake, CRM, or auditing service).