Passa al contenuto principale

Destinazione RDBMS

La destinazione RDBMS consente a Flowlyze di collegarsi a qualunque database relazionale accessibile pubblicamente via Internet, senza dipendenze rispetto a uno specifico vendor. È pensata per implementare scenari di sincronizzazione dati, migrazione, consolidamento o integrazione bidirezionale tra sistemi.

Parametri di connessione

Per stabilire la connessione al database, Flowlyze richiede i seguenti parametri:

  • URL del server
  • Porta
  • Username
  • Password

La correttezza della connessione può essere verificata direttamente tramite l'apposita funzione di test disponibile nella schermata di configurazione.

Definizione delle query di scrittura

Flowlyze offre una gestione molto granulare delle operazioni SQL grazie a tre tipologie di query che coprono l'intero ciclo di elaborazione:

Ciclo di elaborazione SQL

Le tre fasi (PREPARE → INSERT → FINALIZE) permettono di implementare logiche complesse:

  • PREPARE: preparazione ambiente (tabelle temporanee, pulizia)
  • INSERT: scrittura dati per ogni record
  • FINALIZE: consolidamento e operazioni finali (upsert, stored procedure)

INSERT STATEMENT

Eseguito per ogni riga del set di dati. Consente di utilizzare uno statement SQL parametrico, dove i campi del record sono accessibili tramite:

  • @nomecampo
  • @nomecampo_sottocampo per oggetti annidati

Generalmente questo statement è usato per eseguire INSERT, ma può essere qualunque comando SQL valido.

PREPARE STATEMENT

Eseguito prima dell'elaborazione di un batch.
Permette operazioni preparatorie come:

  • creazione di tabelle temporanee,
  • pulizia o inizializzazione di strutture dati,
  • generazione di ambienti intermedi per la scrittura.

FINALIZE STATEMENT

Eseguito dopo l'inserimento di tutte le righe del batch.
È tipicamente utilizzato per:

  • consolidare i dati nelle tabelle definitive,
  • eseguire upsert,
  • richiamare stored procedure,
  • effettuare ricalcoli o operazioni di normalizzazione.

Esempio 1 — Scrittura in modalità insert or update

Questo esempio illustra un flusso di sincronizzazione clienti in cui un messaggio JSON viene trasformato in una riga della tabella relazionale **CUSTOMER**. Flowlyze esegue la query per ogni record in coda, mappando i campi del messaggio sui campi della tabella. L'obiettivo è dimostrare come un'integrazione semplice possa comunque supportare logiche più complesse come l'upsert (insert or update).

Struttura dati (diagramma Mermaid)

In questo modello:

  • Il cliente è rappresentato da un'unica tabella CUSTOMER.
  • I dati dell'indirizzo sono appiattiti in colonne dedicate (ADDRESSSTREET, ADDRESSCITY, ecc.).
  • Tutte le informazioni anagrafiche e di preferenza sono centralizzate in un unico record per semplificare la gestione.

Intento e motivazioni dell'esempio

L'obiettivo dell'esempio è mostrare come un singolo messaggio JSON:

{
"customer_id": "CUST-482917",
"name": "Jordan Mitchell",
"date_of_birth": "1987-03-12",
"email": "jordan.mitchell@examplemail.com",
"phone": "(415) 555-2894",
"address": {
"street": "742 Market Street",
"unit": "Apt 5B",
"city": "San Francisco",
"state": "CA",
"zip": "94103"
},
"account_status": "Active",
"signup_date": "2023-08-19",
"preferred_contact_method": "Email",
"marketing_opt_in": true,
"notes": "Customer prefers weekend delivery windows."
}

venga convertito in una riga della tabella CUSTOMER tramite il seguente SQL:

INSERT INTO CUSTOMER (
CUSTOMERID,
NAME,
DATEOFBIRTH,
EMAIL,
PHONE,
ADDRESSSTREET,
ADDRESSUNIT,
ADDRESSCITY,
ADDRESSSTATE,
ADDRESSZIP,
ACCOUNTSTATUS,
SIGNUPDATE,
PREFERREDCONTACTMETHOD,
MARKETINGOPTIN,
NOTES
) VALUES (
@customer_id,
@name,
@date_of_birth,
@email,
@phone,
@address__street,
@address__unit,
@address__city,
@address__state,
@address__zip,
@account_status,
@signup_date,
@preferred_contact_method,
@marketing_opt_in,
@notes
);

Punti chiave della mappatura:

  • I campi del messaggio sono mappati 1:1 sulle colonne della tabella.
  • I valori annidati vengono referenziati tramite notazione a dot (@address__street).
  • Il modello "flat" permette di rappresentare tutti i dati più rilevanti in una sola entità.
Notazione per oggetti annidati

Per accedere a campi annidati nel JSON, usa la notazione con doppio underscore: @oggetto__campo. Ad esempio, @address__street accede a address.street nel messaggio JSON.

Esempio 2 — Scrittura in modalità transazionale

Questo esempio mostra un flusso di sincronizzazione ordini in cui un singolo messaggio di ordine viene scomposto in più entità relazionali (cliente, indirizzi, testata ordine, righe ordine) e scritto in modo coerente e transazionale. L'obiettivo è garantire che tutti i dati correlati a un ordine vengano inseriti/aggiornati insieme, oppure tutti scartati in caso di errore.

Transazionalità

Tutte le operazioni vengono eseguite all'interno di un'unica transazione del database. Se un qualsiasi passaggio fallisce, l'intera transazione viene rollbackata, garantendo la coerenza dei dati.

Struttura dati

Come leggere il modello:

  • CUSTOMER rappresenta il cliente che effettua l'ordine (dati anagrafici e preferenze di contatto).
  • ORDER_HEADER è la testata dell'ordine (data, stato, pagamenti, importi).
  • ORDER_ITEM rappresenta le singole righe dell'ordine (articoli acquistati).
  • ADDRESS contiene gli indirizzi collegati all'ordine (spedizione e fatturazione), associati sia al cliente sia all'ordine.

Le tabelle TMP_CUSTOMER, TMP_ORDER_HEADER, TMP_ORDER_ITEM e TMP_ADDRESS viste nello SQL sono la versione temporanea di queste stesse entità: fungono da staging area intermedia, usata per costruire un set di dati coerente prima del consolidamento sulle tabelle principali (CUSTOMER, ORDER_HEADER, ORDER_ITEM, ADDRESS).

Esempio di ordine (messaggio in ingresso)

{
"order_id": "ORD-984532",
"order_date": "2025-02-11T14:23:00Z",
"customer": {
"customer_id": "CUST-482917",
"first_name": "Jordan",
"last_name": "Mitchell",
"email": "jordan.mitchell@examplemail.com",
"phone": "+1-415-555-2894",
"alt_phone": "+1-415-555-7331",
"contact_preferences": {
"preferred_method": "Email",
"allow_sms": true,
"allow_calls": false
}
},
"items": [
{
"item_id": "SKU-10482",
"description": "Wireless Bluetooth Headphones",
"quantity": 1,
"unit_price": 89.99
},
{
"item_id": "SKU-55720",
"description": "USB-C Charging Cable 2m",
"quantity": 2,
"unit_price": 12.49
},
{
"item_id": "SKU-88410",
"description": "Laptop Sleeve 13-inch",
"quantity": 1,
"unit_price": 24.99
}
],
"shipping_address": {
"street": "742 Market Street",
"unit": "Apt 5B",
"city": "San Francisco",
"state": "CA",
"zip": "94103"
},
"billing_address": {
"street": "742 Market Street",
"unit": "Apt 5B",
"city": "San Francisco",
"state": "CA",
"zip": "94103"
},
"payment_method": "Credit Card",
"payment_status": "Paid",
"order_status": "Processing",
"currency": "USD",
"shipping_cost": 7.99,
"total_amount": 147.96,
"notes": "Leave the package at the building concierge."
}

Intento e motivazioni dell'esempio (lettura dell'SQL)

L'SQL fornito implementa una scrittura transazionale a più step, allineata con il modello di PREPARE / INSERT / FINALIZE della destinazione RDBMS.

Creazione delle tabelle temporanee (fase PREPARE)

CREATE TEMPORARY TABLE TMP_ADDRESS (...);
CREATE TEMPORARY TABLE TMP_CUSTOMER (...);
CREATE TEMPORARY TABLE TMP_ORDER_HEADER (...);
CREATE TEMPORARY TABLE TMP_ORDER_ITEM (...);

Motivazioni:

  • creare una staging area transazionale, isolata dal modello dati finale;
  • permettere di caricare e validare i dati dell'ordine (inclusi vincoli di chiave esterna tra TMP_*) prima del consolidamento;
  • mantenere il batch "autocontenuto": ogni ordine (o gruppi di ordini) viene caricato in un set di tabelle temporanee coerenti.

In termini Flowlyze, questa parte appartiene alla fase PREPARE STATEMENT.

Scrittura nelle tabelle temporanee (fase INSERT STATEMENT)

Gli INSERT INTO TMP_* mappano direttamente il JSON sulle tabelle temporanee.

Indirizzi di spedizione e fatturazione

INSERT INTO TMP_ADDRESS (...) VALUES (
@shipping_address.id,
@shipping_address.street,
...
@customer__customer_id,
@order_id
);

INSERT INTO TMP_ADDRESS (...) VALUES (
@billing_address__id,
@billing_address__street,
...
@customer__customer_id,
@order_id
);
  • Dal messaggio si estraggono due insiemi di dati (spedizione e fatturazione).
  • Entrambi gli indirizzi sono collegati allo stesso CUSTOMER_ID e ORDER_ID.
  • I placeholder (es. @shipping_address.street) rappresentano i campi del messaggio agganciati alla colonna corrispondente.
Cliente
INSERT INTO TMP_CUSTOMER (...) VALUES (
@customer.customer_id,
@customer.first_name,
@customer.last_name,
@customer.email,
@customer.phone,
@customer.alt_phone,
@customer.contact_preferences.preferred_method,
@customer.contact_preferences__allow_sms,
@customer.contact_preferences__allow_calls
);
  • Si estrae l'oggetto customer e lo si normalizza in una riga di TMP_CUSTOMER.
  • Le preferenze di contatto vengono salvate in colonne dedicate (PREFERRED_METHOD, ALLOW_SMS, ALLOW_CALLS).
Testata ordine
INSERT INTO TMP_ORDER_HEADER (...) VALUES (
@order_id,
@order_date,
@customer.customer_id,
@payment_method,
@payment_status,
@order_status,
@currency,
@shipping_cost,
@total_amount,
@notes
);
  • Collega l'ordine al cliente (CUSTOMER_ID) e contiene tutte le informazioni di pagamento e stato.
Righe ordine (loop sugli items)
{{#each items}}

INSERT INTO TMP_ORDER_ITEM (...) VALUES (
@items[{{@index}}].order_item_id,
@order_id,
@items[{{@index}}].item_id,
@items[{{@index}}].description,
@items[{{@index}}].quantity,
@items[{{@index}}].unit_price
);

{{/each}}
  • Per ogni elemento dell'array items viene generato un INSERT in TMP_ORDER_ITEM.
  • Il costrutto {{#each items}} indica un template iterativo: Flowlyze genera tanti INSERT quanti sono gli articoli nell'ordine.
  • Tutte le righe sono legate allo stesso ORDER_ID, preservando la relazione 1–N tra ordine e righe.

Questa parte costituisce la fase INSERT STATEMENT della destinazione RDBMS, eseguita per ogni messaggio.

Consolidamento nelle tabelle finali (fase FINALIZE STATEMENT)

Dopo aver popolato le tabelle temporanee, l'SQL esegue il consolidamento verso le tabelle definitive (CUSTOMER, ORDER_HEADER, ADDRESS, ORDER_ITEM) usando INSERT ... SELECT ... ON DUPLICATE KEY UPDATE.

Esempio per CUSTOMER:

INSERT INTO CUSTOMER (...columns...)
SELECT
c.CUSTOMER_ID AS EXT_ID,
c.CUSTOMER_ID,
...
FROM TMP_CUSTOMER c
ON DUPLICATE KEY UPDATE
CUSTOMER_ID = VALUES(CUSTOMER_ID),
FIRST_NAME = VALUES(FIRST_NAME),
...
;

Stesso schema per:

  • ORDER_HEADER (tabella degli ordini),
  • ADDRESS (indirizzi collegati a cliente/ordine),
  • ORDER_ITEM (righe ordine).
Motivazioni chiave:
  1. Upsert idempotente:

    • Se il record non esiste → INSERT.
    • Se il record esiste (stesso PK / chiave univoca) → UPDATE.
      Questo consente di rieseguire il flusso sullo stesso ordine senza creare duplicati, allineando la base dati allo stato più recente del sistema sorgente.
  2. Atomicità transazionale

    • Tutte le operazioni (scrittura su TMP_*, consolidamento su tabelle finali) possono essere eseguite all'interno di un'unica transazione del database.
    • Se un qualsiasi passaggio fallisce (es. errore di vincolo, dato incoerente), l'intera transazione può essere rollbackata, mantenendo i dati in uno stato consistente.
  3. Separazione tra modellazione logica e origine dati

    • Il messaggio JSON ha una struttura "orientata all'evento".
    • Il database ha una struttura relazionale normalizzata.
    • Le tabelle temporanee fungono da ponte: permettono trasformazioni, controlli, arricchimenti e gestione di vincoli prima dell'upsert definitivo.
  4. Portabilità tra RDBMS

    • La sintassi ON DUPLICATE KEY UPDATE è tipica di MySQL, ma la logica di insert or update è replicabile su altri database:

      • ON CONFLICT DO UPDATE in PostgreSQL,
      • MERGE INTO ... WHEN MATCHED / NOT MATCHED in SQL Server (MSSQL) e Oracle.
    • In Flowlyze, la struttura PREPARE / INSERT / FINALIZE resta la stessa; cambia solo il dialetto SQL usato nel FINALIZE per adattarsi al RDBMS.

Portabilità SQL

La sintassi ON DUPLICATE KEY UPDATE è specifica di MySQL. Per altri database:

  • PostgreSQL: usa ON CONFLICT DO UPDATE
  • SQL Server / Oracle: usa MERGE INTO ... WHEN MATCHED / NOT MATCHED