Skip to main content

RDBMS Destination

The RDBMS destination allows Flowlyze to connect to any relational database accessible publicly via the Internet, without dependencies on a specific vendor. It is designed to implement data synchronization, migration, consolidation, or bidirectional integration scenarios between systems.

Connection Parameters

To establish the database connection, Flowlyze requires the following parameters:

  • Server URL
  • Port
  • Username
  • Password

The connection correctness can be verified directly through the test function available in the configuration screen.

Writing Query Definition

Flowlyze offers very granular management of SQL operations thanks to three types of queries that cover the entire processing cycle:

SQL Processing Cycle

The three phases (PREPARE → INSERT → FINALIZE) allow implementing complex logic:

  • PREPARE: environment preparation (temporary tables, cleanup)
  • INSERT: data writing for each record
  • FINALIZE: consolidation and final operations (upsert, stored procedures)

INSERT STATEMENT

Executed for each row of the dataset. Allows using a parametric SQL statement, where record fields are accessible via:

  • @fieldname
  • @fieldname_subfield for nested objects

Generally this statement is used to execute INSERT, but it can be any valid SQL command.

PREPARE STATEMENT

Executed before processing a batch.
Allows preparatory operations such as:

  • creating temporary tables,
  • cleaning or initializing data structures,
  • generating intermediate environments for writing.

FINALIZE STATEMENT

Executed after inserting all rows of the batch.
Typically used for:

  • consolidating data into final tables,
  • executing upserts,
  • calling stored procedures,
  • performing recalculations or normalization operations.

Example 1 — Writing in insert or update mode

This example illustrates a customer synchronization flow where a JSON message is transformed into a row of the relational table **CUSTOMER**. Flowlyze executes the query for each record in the queue, mapping message fields to table fields. The goal is to demonstrate how a simple integration can still support more complex logic such as upsert (insert or update).

Data Structure (Mermaid diagram)

In this model:

  • The customer is represented by a single CUSTOMER table.
  • Address data is flattened into dedicated columns (ADDRESSSTREET, ADDRESSCITY, etc.).
  • All demographic and preference information is centralized in a single record to simplify management.

Intent and Motivations of the Example

The goal of the example is to show how a single JSON message:

{
"customer_id": "CUST-482917",
"name": "Jordan Mitchell",
"date_of_birth": "1987-03-12",
"email": "jordan.mitchell@examplemail.com",
"phone": "(415) 555-2894",
"address": {
"street": "742 Market Street",
"unit": "Apt 5B",
"city": "San Francisco",
"state": "CA",
"zip": "94103"
},
"account_status": "Active",
"signup_date": "2023-08-19",
"preferred_contact_method": "Email",
"marketing_opt_in": true,
"notes": "Customer prefers weekend delivery windows."
}

is converted into a row of the CUSTOMER table via the following SQL:

INSERT INTO CUSTOMER (
CUSTOMERID,
NAME,
DATEOFBIRTH,
EMAIL,
PHONE,
ADDRESSSTREET,
ADDRESSUNIT,
ADDRESSCITY,
ADDRESSSTATE,
ADDRESSZIP,
ACCOUNTSTATUS,
SIGNUPDATE,
PREFERREDCONTACTMETHOD,
MARKETINGOPTIN,
NOTES
) VALUES (
@customer_id,
@name,
@date_of_birth,
@email,
@phone,
@address__street,
@address__unit,
@address__city,
@address__state,
@address__zip,
@account_status,
@signup_date,
@preferred_contact_method,
@marketing_opt_in,
@notes
);

Key Mapping Points:

  • Message fields are mapped 1:1 to table columns.
  • Nested values are referenced via dot notation (@address__street).
  • The "flat" model allows representing all the most relevant data in a single entity.
Nested Object Notation

To access nested fields in JSON, use the double underscore notation: @object__field. For example, @address__street accesses address.street in the JSON message.

Example 2 — Writing in Transactional Mode

This example shows an order synchronization flow where a single order message is decomposed into multiple relational entities (customer, addresses, order header, order lines) and written in a coherent and transactional manner. The goal is to ensure that all data related to an order is inserted/updated together, or all discarded in case of error.

Transactional Guarantees

All operations are executed within a single database transaction. If any step fails, the entire transaction is rolled back, ensuring data consistency.

Data Structure

How to Read the Model:

  • CUSTOMER represents the customer placing the order (demographic data and contact preferences).
  • ORDER_HEADER is the order header (date, status, payments, amounts).
  • ORDER_ITEM represents individual order lines (purchased items).
  • ADDRESS contains addresses linked to the order (shipping and billing), associated with both the customer and the order.

The TMP_CUSTOMER, TMP_ORDER_HEADER, TMP_ORDER_ITEM, and TMP_ADDRESS tables seen in the SQL are the temporary version of these same entities: they serve as an intermediate staging area, used to build a coherent dataset before consolidation into the main tables (CUSTOMER, ORDER_HEADER, ORDER_ITEM, ADDRESS).

Order Example (Input Message)

{
"order_id": "ORD-984532",
"order_date": "2025-02-11T14:23:00Z",
"customer": {
"customer_id": "CUST-482917",
"first_name": "Jordan",
"last_name": "Mitchell",
"email": "jordan.mitchell@examplemail.com",
"phone": "+1-415-555-2894",
"alt_phone": "+1-415-555-7331",
"contact_preferences": {
"preferred_method": "Email",
"allow_sms": true,
"allow_calls": false
}
},
"items": [
{
"item_id": "SKU-10482",
"description": "Wireless Bluetooth Headphones",
"quantity": 1,
"unit_price": 89.99
},
{
"item_id": "SKU-55720",
"description": "USB-C Charging Cable 2m",
"quantity": 2,
"unit_price": 12.49
},
{
"item_id": "SKU-88410",
"description": "Laptop Sleeve 13-inch",
"quantity": 1,
"unit_price": 24.99
}
],
"shipping_address": {
"street": "742 Market Street",
"unit": "Apt 5B",
"city": "San Francisco",
"state": "CA",
"zip": "94103"
},
"billing_address": {
"street": "742 Market Street",
"unit": "Apt 5B",
"city": "San Francisco",
"state": "CA",
"zip": "94103"
},
"payment_method": "Credit Card",
"payment_status": "Paid",
"order_status": "Processing",
"currency": "USD",
"shipping_cost": 7.99,
"total_amount": 147.96,
"notes": "Leave the package at the building concierge."
}

Intent and Motivations of the Example (SQL Reading)

The provided SQL implements a multi-step transactional write, aligned with the PREPARE / INSERT / FINALIZE model of the RDBMS destination.

Temporary Table Creation (PREPARE Phase)

CREATE TEMPORARY TABLE TMP_ADDRESS (...);
CREATE TEMPORARY TABLE TMP_CUSTOMER (...);
CREATE TEMPORARY TABLE TMP_ORDER_HEADER (...);
CREATE TEMPORARY TABLE TMP_ORDER_ITEM (...);

Motivations:

  • create a transactional staging area, isolated from the final data model;
  • allow loading and validating order data (including foreign key constraints between TMP_*) before consolidation;
  • keep the batch "self-contained": each order (or groups of orders) is loaded into a coherent set of temporary tables.

In Flowlyze terms, this part belongs to the PREPARE STATEMENT phase.

Writing to Temporary Tables (INSERT STATEMENT Phase)

The INSERT INTO TMP_* statements directly map the JSON to temporary tables.

Shipping and Billing Addresses

INSERT INTO TMP_ADDRESS (...) VALUES (
@shipping_address.id,
@shipping_address.street,
...
@customer__customer_id,
@order_id
);

INSERT INTO TMP_ADDRESS (...) VALUES (
@billing_address__id,
@billing_address__street,
...
@customer__customer_id,
@order_id
);
  • Two sets of data (shipping and billing) are extracted from the message.
  • Both addresses are linked to the same CUSTOMER_ID and ORDER_ID.
  • Placeholders (e.g. @shipping_address.street) represent message fields linked to the corresponding column.
Customer
INSERT INTO TMP_CUSTOMER (...) VALUES (
@customer.customer_id,
@customer.first_name,
@customer.last_name,
@customer.email,
@customer.phone,
@customer.alt_phone,
@customer.contact_preferences.preferred_method,
@customer.contact_preferences__allow_sms,
@customer.contact_preferences__allow_calls
);
  • The customer object is extracted and normalized into a row of TMP_CUSTOMER.
  • Contact preferences are saved in dedicated columns (PREFERRED_METHOD, ALLOW_SMS, ALLOW_CALLS).
Order Header
INSERT INTO TMP_ORDER_HEADER (...) VALUES (
@order_id,
@order_date,
@customer.customer_id,
@payment_method,
@payment_status,
@order_status,
@currency,
@shipping_cost,
@total_amount,
@notes
);
  • Links the order to the customer (CUSTOMER_ID) and contains all payment and status information.
Order Lines (Loop on Items)
{{#each items}}

INSERT INTO TMP_ORDER_ITEM (...) VALUES (
@items[{{@index}}].order_item_id,
@order_id,
@items[{{@index}}].item_id,
@items[{{@index}}].description,
@items[{{@index}}].quantity,
@items[{{@index}}].unit_price
);

{{/each}}
  • For each element in the items array, an INSERT into TMP_ORDER_ITEM is generated.
  • The {{#each items}} construct indicates an iterative template: Flowlyze generates as many INSERT statements as there are items in the order.
  • All lines are linked to the same ORDER_ID, preserving the 1–N relationship between order and lines.

This part constitutes the INSERT STATEMENT phase of the RDBMS destination, executed for each message.

Consolidation into Final Tables (FINALIZE STATEMENT Phase)

After populating the temporary tables, the SQL executes consolidation to the final tables (CUSTOMER, ORDER_HEADER, ADDRESS, ORDER_ITEM) using INSERT ... SELECT ... ON DUPLICATE KEY UPDATE.

Example for CUSTOMER:

INSERT INTO CUSTOMER (...columns...)
SELECT
c.CUSTOMER_ID AS EXT_ID,
c.CUSTOMER_ID,
...
FROM TMP_CUSTOMER c
ON DUPLICATE KEY UPDATE
CUSTOMER_ID = VALUES(CUSTOMER_ID),
FIRST_NAME = VALUES(FIRST_NAME),
...
;

Same schema for:

  • ORDER_HEADER (orders table),
  • ADDRESS (addresses linked to customer/order),
  • ORDER_ITEM (order lines).
Key Motivations:
  1. Idempotent Upsert:

    • If the record does not exist → INSERT.
    • If the record exists (same PK / unique key) → UPDATE.
      This allows re-running the flow on the same order without creating duplicates, aligning the database with the most recent state of the source system.
  2. Transactional Atomicity

    • All operations (writing to TMP_*, consolidation to final tables) can be executed within a single database transaction.
    • If any step fails (e.g. constraint error, inconsistent data), the entire transaction can be rolled back, keeping data in a consistent state.
  3. Separation between Logical Modeling and Data Source

    • The JSON message has an "event-oriented" structure.
    • The database has a normalized relational structure.
    • Temporary tables serve as a bridge: they allow transformations, validations, enrichments, and constraint management before the final upsert.
  4. Portability across RDBMS

    • The ON DUPLICATE KEY UPDATE syntax is typical of MySQL, but the insert or update logic is replicable on other databases:

      • ON CONFLICT DO UPDATE in PostgreSQL,
      • MERGE INTO ... WHEN MATCHED / NOT MATCHED in SQL Server (MSSQL) and Oracle.
    • In Flowlyze, the PREPARE / INSERT / FINALIZE structure remains the same; only the SQL dialect used in FINALIZE changes to adapt to the RDBMS.

SQL Portability

The ON DUPLICATE KEY UPDATE syntax is specific to MySQL. For other databases:

  • PostgreSQL: use ON CONFLICT DO UPDATE
  • SQL Server / Oracle: use MERGE INTO ... WHEN MATCHED / NOT MATCHED