by Chathura Ekanayake

Developing agile ETL flows with Ballerina

feature
Aug 26, 202412 mins
Data IntegrationData ManagementSoftware Development

The extract, transform, and load phases of ETL typically involve multiple tasks, each of which can be executed independently. This means you can develop each task as a microservice.

Data stream. Bright, colorful background with bokeh effect
Credit: Shutterstock

Companies generate vast amounts of data daily from various business operations. For example, whenever a customer is checked out at a retail outlet, data such as the customer identifier, retail outlet identifier, time of check out, list of purchased items, and the total sales value can be captured in the point of sales (PoS) system. Similarly, field sales staff may record possible sales opportunities in spreadsheets. In addition, most business communications occur via emails, making emails a highly valuable data source. To maintain information consistent across an organization and to gain business insights from this data, it is crucial to extract necessary details from these scattered data sources and keep all relevant information centralized.

Extract, transform, and load (ETL) technologies focus on this problem of extracting data from multiple sources, transforming extracted data into required formats, and loading those into relevant data stores or systems. However, the landscape of ETL applications is also changing rapidly due to business and technological advancements. Some of these challenges would be:

  • Use of AI to extract information from natural language or unstructured data sources.
  • Use of AI to transform data.
  • Connecting with cloud-based systems to extract or load data.
  • Flexible deployment of ETL flows in hybrid-cloud environments.
  • Scalability of ETL flows.
  • Microservices-like agile and quick deployment of ETL flows.
  • Support for streaming ETL operations.
  • Low-cost ETL deployments for small use cases.

In the remainder of this article we will discuss an architecture for building such agile ETL flows and methods for rapidly deploying those ETL flows.

Architecture for building agile ETL flows

Each of the extract, transform, and load phases of ETL typically involves multiple tasks. For example, the extraction phase may have tasks for extracting data from CSV documents and emails. Similarly, the transformation phase may have tasks for removing entries with missing fields, concatenating fields, categorizing, and mapping data from one format to another. Finally, the loading phase can have tasks for loading to a data warehouse, updating entries in a database, or inserting data into different systems. Such an ETL flow is depicted in the below figure.

ETL Ballerina 00

WSO2

Each of these tasks can be executed independently once the raw data or the output of another task is provided. Therefore, it is possible to implement each of these tasks in suitable technologies and execute them as independently deployable and scalable clusters. This makes it possible to develop each task as a microservice.

Furthermore, there are dependencies between tasks. For example, the “Concatenate fields” task depends on the “Extract from file system” task. Multiple methods can be used to pass data between such dependent tasks. A simple approach would be to use REST API calls to communicate between these tasks. However, it is possible to increase the decoupling and improve reliability if a messaging system is used between tasks. Then each task consumes data from a topic in the messaging system and publishes output data to another topic once its processing is complete. This approach offers multiple advantages:

  • Each task can work at its own speed, without getting overloaded by requests from the preceding task.
  • Data won’t be lost if a task fails.
  • Additional tasks can be added to the ETL flow without affecting current tasks.

Such an architecture for implementing ETL tasks as microservices and facilitating their communications via a messaging layer is shown below.

ETL Ballerina 01

WSO2

Separating each ETL task into a microservice can be thought of as the logical architecture. In the actual implementation, it is possible to determine whether to implement an ETL task as a separate microservice or to combine multiple tasks into a single microservice based on factors such as scalability, development team, and anticipated extensibility requirements.

Implementing ETL tasks

The next step is to implement individual ETL tasks. Because each of these tasks is a microservice, any technology can be used for the implementation. ETL tasks generally involve three steps:

  1. Integrating with data stores and external endpoints available in both on-premises data centers and cloud.
  2. Processing large and complex data structures.
  3. Transferring data over multiple formats and protocols.

Many integration technologies that support microservices-style deployments can be used for implementing ETL tasks. A good candidate for this purpose is the Ballerina programming language, which is designed specifically for integrations. Ballerina has native support for service development, database connections, common protocols, data transformations, and data types such as JSON, XML, CSV, and EDI. In addition, it comes with a large collection of connectors to integrate with on-prem and SaaS systems. In the following sections, we will explore some example ETL task developments with Ballerina.

Data extractions

Business data can be in databases, CSV files, EDI documents, spreadsheets, or various enterprise systems such as ERP applications. Therefore, data extraction tasks need to connect with all of these data sources and read data using the format they expose. Below are some examples of data extraction from databases, CSV files, and EDI documents using Ballerina.

Reading databases


stream orders = dbClient->/orderdata;
check from var orderData in orders
   do {
       io:println(orderData);
   };

Reading CSV files


stream productDataStream = check io:fileReadCsvAsStream("product_data.csv");
check productDataStream.forEach(
   function(string[] productData) {
   io:println(productData);
});

Reading EDI documents


string ediText = check io:fileReadString("resources/purchase_order.edi");
PurchaseOrder simpleOrder = check fromEdiString(ediText);
io:println(string `Order Id: ${simpleOrder.header.orderId}`);

The data extraction phase may require extracting data from unstructured sources as well. A good example of this is the extraction of structured information from emails, comments, and reviews. The below example demonstrates the extraction of good points, bad points, and improvement points from reviews using Ballerina and OpenAI.


chat:CreateChatCompletionRequest request = {
   model: "gpt-3.5-turbo",
   messages: [
       {
           role: "user",
           content: string `
               Extract the following details in JSON from the reviews given.
                   {
                       good_points: string,
                       bad_points: string,
                       improvement_points: string
                   }
               The fields should contain points extracted from all reviews
               Here are the reviews:
               ${string:'join(",", ...summaryRequest.reviews)}
           `
       }
   ]
};
chat:CreateChatCompletionResponse summary = check openAiChat->/chat/completions.post(request);
if summary.choices.length() > 0 {
   string content = check summary.choices[0].message?.content.ensureType();
   io:println(content);
chat:CreateChatCompletionRequest request = {
   model: "gpt-3.5-turbo",
   messages: [
       {
           role: "user",
           content: string `
               Extract the following details in JSON from the reviews given.
                   {
                       good_points: string,
                       bad_points: string,
                       improvement_points: string
                   }
               The fields should contain points extracted from all reviews
               Here are the reviews:
               ${string:'join(",", ...summaryRequest.reviews)}
           `
       }
   ]
};
chat:CreateChatCompletionResponse summary = check openAiChat->/chat/completions.post(request);
if summary.choices.length() > 0 {
   string content = check summary.choices[0].message?.content.ensureType();
   io:println(content);
}

Data transformations

Extracted data may have originated from spreadsheets filled by employees, text scanned from handwritten documents, or data entered into systems by operators. As a result, this data can contain spelling mistakes, missing fields, duplicates, or invalid data. Therefore, the transformation phase must clean such data records before loading them to target systems. In addition, it may be necessary to combine related details from multiple sources to enrich data during the transformation phase. The below examples show the use of Ballerina for such tasks.

Removing duplicates


function removeDuplicates(SalesOrder[] orders) returns SalesOrder[] {
   return from var {itemId, customerId, itemName, quantity, date} in orders
       group by itemId, customerId, itemName
       select {
           itemId,
           customerId,
           itemName,
           quantity: [quantity][0],
           date: [date][0]
       };
}

Identifying invalid data entries


function isValidEmail(string inputString) returns boolean {
   string:RegExp emailPattern =  re `[A-Za-z0-9\._%+-]+@[A-Za-z0-9\.-]+\.[A-Za-z]{2,}`;
   return emailPattern.isFullMatch(inputString);
}

Data enrichment


CRMResponse response = check crmClient->/crm/api/customers/'json(customerId = customer.id);
if response.status == "OK" {
   customer.billingAddress = response.billingAddress;
   customer.primaryContact = response.telephone;
}

Often, extracted data needs to be transformed into a different format before storing in target systems. However, ETL tasks usually have to work with very large data structures consisting of hundreds of fields, which can make data mapping a tedious task. The visual data mapping capabilities of Ballerina can be used to simplify this as shown below.

ETF Ballerina 02

WSO2

Data loading

Finally, the tasks in the data loading phase need to connect with different target systems and send data over the required protocols. It is also important to establish secure connections to these target systems using technologies such as TLS and OAuth2. Ballerina has a large number of connectors and built-in support for all common security standards, making it easy to implement such data loading tasks. The below example shows the data insertion into Google BigQuery.


SalesData[] salesDataset = check io:fileReadCsv("./resources/sales_data.csv");
bigquery:TabledatainsertallrequestRows[] rows = from var salesData in salesDataset
   select {insertId: uuid:createType1AsString(), 'json: salesData};
bigquery:TableDataInsertAllRequest payload = {rows};
_ = check bigQueryClient->insertAllTableData(projectId, datasetId, tableId, payload);

Sometimes business users may want to examine certain data records such as entries with missing or invalid values. With a microservices architecture, the introduction of such an additional task is just a matter of adding a microservice to read from the relevant topic and load data into an end-user system such as a spreadsheet. An example of reading data from a topic and inserting them into Google Sheets is shown below.


sheets:Spreadsheet sheet = check spreadsheetClient->createSpreadsheet(sheetName);
_ = check spreadsheetClient->
   appendValue(sheet.spreadsheetId, ["Product", "Sales", "Date"], {sheetName: workSheetName});
foreach var {product, sales, date} in salesSummary {
   _ = check spreadsheetClient->
       appendValue(sheet.spreadsheetId, [product, sales, date], {sheetName: workSheetName});
}

Deploying and testing ETL flows

Developing individual ETL tasks as microservices allows the entire ETL flow to be deployed in a Kubernetes cluster. Each ETL task can be a pod in the Kubernetes deployment, making it possible to increase or decrease the number of pods of individual ETL tasks based on the load. However, organizations usually have multiple ETL flows, each with many tasks. Furthermore, these ETL flows can be owned by different teams. Therefore, it is crucial to have proper CI/CD pipelines, permission models, monitoring capabilities, and multiple environments for development, testing, performance validations, and production.

Ballerina can work with all common CI/CD, monitoring, and deployment technologies, making it seamless to integrate Ballerina-based ETL flows with an organization’s existing infrastructure. For example, Ballerina ETL source code can be maintained in GitHub, CI/CD actions can be implemented using Jenkins, ETL flows can be deployed on Amazon EKS, and the executions can be monitored using Prometheus and Grafana.

Another deployment option would be the Choreo platform, which offers all of these capabilities out-of-the-box. Because Choreo eliminates the need to build a platform, it is possible to start the ETL journey instantly by deploying a selected set of ETL flows, testing them, and moving them to production. Then changes to these ETL flows can be made or new ETL flows can be introduced in corresponding source repositories, which will be picked up by Choreo and deployed into the development environment.

Closing remarks

This article discussed the architecture and Ballerina language implementation of flexible, microservices-like ETL flows. Given that most business units produce data and have unique data requirements, the data processing capabilities, connectivity, and flexible deployment options offered by the Ballerina language can be transformative. The Ballerina team is currently working on improving the tools support to make it even simpler to build integration and ETL flows.

Chathura Ekanayake is an associate director/architect at WSO2. He is a part of the BPS team and focuses on overall BPM related aspects. He first joined WSO2 in 2006 and led the development of the WSO2 governance registry product in addition to working on WSO2 ESB. He pursued a Ph.D. at the Queensland University of Technology, Australia, and joined WSO2 upon completion of the Ph.D. in 2013. Chathura holds a first class honors degree in computer science and engineering from the University of Moratuwa, Sri Lanka.

Also by this author:

New Tech Forum provides a venue for technology leaders—including vendors and other outside contributors—to explore and discuss emerging enterprise technology in unprecedented depth and breadth. The selection is subjective, based on our pick of the technologies we believe to be important and of greatest interest to InfoWorld readers. InfoWorld does not accept marketing collateral for publication and reserves the right to edit all contributed content. Send all inquiries to doug_dineley@foundryco.com.