Metadata-driven data ingestion from Salesforce to Data Lake using Azure Synapse Pipelines

Charith Ekanayake
4 min readSep 11, 2021

--

In this article, I’ll walk you through how to ingest data from Salesforce into Azure Data Lake Storage Gen2 in the parquet format. I would be using Azure Synapse pipelines to accomplish the task in a metadata-driven approach.

Supporting scripts on GitHub

Pre-requisites

Azure Synapse Analytics

Salesforce Account with Bulk API read access

Azure Data Lake Storage Gen 2 Account

AzureSQL Database S0 tier

a.Creating a Linked Service to Salesforce

b.Creating a Linked Service to AzureSQL DB

c.Creating a Linked Service to Azure Data Lake Storage

Overview of the Pipeline

This pipeline consists of a LookUp, For-Each, Internal LookUp, Copy Data, and a Stored procedure activity.

By iterating through the metadata table on the database we can programmatically consume the source objects. This makes life easier for the integration developer in turn to maintain and monitor ingestion pipelines. Rather than maintaining N number of individual pipelines you could maintain a single pipeline instead.

Now let us go through each activity and learn how this could be achieved.

1.LookupMet Activity

The external-lookup activity will retrieve the controller table metadata on AzureSQL DB.

Settings of external-lookup activity

2.Copy_Iterator For-Each activity settings

Tick the Sequential property which in turn allows iterating each item in the controller table in a sequential fashion.

@activity('LookupMet').output.value

3. LookupColumnMapping activity settings

This internal lookup activity creates a dynamic JSON mapping which is later referred inside the Copy activity.

Note

Inside the for loop, external lookup values can be accessed by referring

item().<column_name>

DECLARE @json_header varchar(MAX) = '{"type": "TabularTranslator", "mappings": ';
DECLARE @json VARCHAR(MAX);
SET @json = (
SELECT [source.name],[sink.name]
FROM dbo.ColumnMapper
WHERE tablename ='@{item().sourceobj}'
FOR JSON PATH );SELECT CONCAT(@json_header,@json,'}') AS json_string;

The source and sink column mapping is maintained in a metadata table on AzureSQL.

4. Copy data to ADLS activity

Source Dataset

Source Tab properties

This query retrieves data from Salesforce.

@concat(item().query1,' from ', item().sourceobj,' ',item().whereclause,'>=',formatDateTime(item().last_updt_dt_time,'yyyy-MM-ddTHH:mm:ssZ'))

Since Sink Dataset

Sink Dataset needs to be dynamic as well. It has to be parameterized in the below fashion. adbny is the container to which the parquet file is written in the data lake.

Sink Tab properties

@concat(item().targetlocation,'iter_data_',utcnow())

Mapping Tab properties

@json(activity('LookupColumnMapping').output.firstRow.json_string)

4. UpdateSync Stored Procedure activity settings

This will update the controller table last_updt_dt_time column against the particular table with the current timestamp as the final step of the pipeline.

Since we have finished building the pipeline let’s have a look at the final result of the execution.

Output

These pipelines can be scheduled using a schedule trigger based on the business requirement.

Hope this article provides you with a good starting point to create meta-driven pipelines with Azure Synapse.

--

--