In the first article of this two-part series about streaming ETL and the implementation of ADWEKO CSF (Configurable Streaming ETL Framework for FSDP), we have discussed the fundamentals of Apache Kafka and how it can be used for streaming ETL to FSDP.
This second part will discuss the typical ETL challenges data lineage, error handling and data quality with a special focus on the specificities to be considered in a streaming approach. It will also provide an overview of our solutions to these challenges in CSF. Relating batch reporting requirements to streaming ETL is also tackled. The series is concluded with an overview on testing streaming ETL systems and a summary.
Especially in large and complex enterprise ETL systems, it is crucial to understand the lineage and flow of data through the different layers. A good overview of data lineage enables the functional department to easily understand data origins, ensures compliance to e.g. GDPR regulations and supports change management by providing a direct view on affected dependent processes.
Kafka as an ETL platform natively provides a foundation for a reliable and conceptually simple approach to data lineage. This leverages the structure of Kafka and the ETL processes setting up on it. Inside a streaming ETL architecture, data is read from Kafka topics, processed in an application. The result is written back to another Kafka topic. By persisting intermediate transformation results in Kafka topics, this architecture naturally defines a processing graph. Messages in topics are edges and each processing step is a vertex.
This is taken advantage of by CSF to provide a complete data lineage. The edges of the graph represent a message in a Kafka topic. They contain information including the Kafka topic, partition and offset. Since this information uniquely identifies a message in Kafka, the data lineage information can be used to retrieve the message used by every processing step for a given outcome. The name of the application processing a message with its specific version is saved with the vertices. This allows a user to retrieve the exact code version used to come to a specific processing outcome.
A visualization in the SAP Analytics Cloud (SAC), which was built based on the CSF data lineage information for a business partner, can be seen in figure 1. The vertices of the graph, representing Kafka applications, are visualized by the cogwheel icons. Each of them contains information on the application id of the processing application, its version, and the time when the processing was done. The edges, which represent the messages in the two Kafka, are represented by the cylinders. They contain information on the topic name and message partition and offset.
The component that loads data into FSDP has the option to write this information into a database while loading and linking it with the data in the FSDP table to provide an end-to-end overview.
Error Handling and Data Quality Ensurance
When designing ETL systems, one key point that needs to be considered is how to handle errors. In an ETL process integrating sophisticated data systems, multiple classes of errors can occur. Data quality problems in the source systems can be uncovered by the ETL process. There could be problems indicated by the data models mapping, like data not fitting matched fields with different length in the source and target system. For dealing with these problems, it is crucial to have an error handling mechanism, which alsoensures no data is lost in the ETL process.
Multiple strategies for error handling exist (see for example https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/). Erroneous messages can just be ignored. Because this leads to data loss on errors, it is no feasible option in the context of financial data. Another option is fully stopping the application when faulty data appears. Since it is hard to ensure that there are no data quality problems in the source data, this is also usually not a good idea.
A third approach is to use a dedicated topic, also called dead-letter-queue, where erroneous data is saved for further processing. Data quality experts can then investigate these messages and resolve the problems either manually or by triggering an error resolution process. CSF implements this concept. In every pipeline step, faulty data is written to an error topic when detected.
As visualized in figure 2, errors can possibly occur in all stages in the ETL process:
- Errors in the mapping step (e.g. a missing required field in the source system data)
- Errors while loading data (e.g. a value is too large for the column of the target table)
- Errors in additional steps added to the ETL process. (e.g. a step that checks the data quality can write messages not fulfilling the data quality constraints to the error topic)
The error messages written to the error topic can then be processed by an error handling application. This application could for example write corrected data to the input topic. The enforcement of additional integrity constraints, like a check of referential integrity, can be useful to ensure a high data quality in the target system, FDSP in our case. An option for doing this is to use ADWEKO’s HANA data warehouse manager as a staging area. This tool automatically checks a pre-defined set of constraints for FSDP. The user has the possibility to modify them and add new ones. It also gives the user a graphical interface for correcting erroneous data violating these constraints, and automatically loads the data into the target tables as soon as all constraints are fulfilled.
Another option is to include additional components in the CSF pipeline for doing specific data quality checks. To ensure referential integrity, for example, an additional step can be added. It could store all foreign keys of the target table in a Kafka state store and then check if the referenced key exists for the mapped data before it is written into FSDP. These components can again use the error handling mechanism for escalating non-resolvable errors.
End of Day Determination
Classical ETL processes often use batch loading. This means that all data for a certain day is loaded by an overnight process to the target system. If a batch process was completed successfully, it can be assumed that all data for the key date has arrived in the analytical system. Opposed to this, streaming ETL works on a continual basis, using the event stream as an abstraction and loading data in real time.
For use cases like reporting, it is often crucial to know when all data for a certain day has arrived in the analytical systems, making sure that the data is loaded as intended. In the context of streaming ETL, this is challenging. Kafka cannot determine if a source system decides to write another message with data for a certain day into it or not. Using Kafka topic and consumer offsets, it is however possible to see which messages have been processed already. If it can be ensured that all messages for a certain day have arrived in Kafka, it can be checked whether the data has arrived in the target systems using Kafka’s offsets.
CSF has an end-of-day-determination component that does exactly this. The end-of-day component has a definition of all pipeline steps and dependencies between them. It is triggered by a message in a specific topic that indicates all data has arrived in the source topics for a certain day. This message can be written by the data extraction component. Another possible approach to create this message would for example be, assuming data is written in sequence to the source topic, checking when a message from then next day arrives, and then writing the end of day message for the last day.
The control flow for a simplified case where no parallel processing steps exist in the pipeline is visualized in figure 3. After the message indicating all data has been written has arrived, the current offsets of the source topics are retrieved for each topic’s partitions. Then, the application waits until it can be determined that the pipeline step has processed the messages in a topic. For this purpose, the Kafka AdminClient, which can retrieve committed offsets for applications, is used. This step is then recursively applied to all following pipeline steps, until it is checked for the whole pipeline. Afterwards, an end of day message is written, confirming that at least all data that was present in the source topics at the time the end of day process started was processed by the pipeline.
Tests and Verification
There are multiple ways in which an ETL system can be tested. Regression tests can be re-run after software changes. This provides a high confidence that the system still works as intended afterwards if there is a sufficient test coverage. Load tests can be used to check if the system is able to process a high data load in an acceptable time.
Regression tests are important to ensure that changes do not break existing functionality. One type of regression tests is a test on a component basis. Another type is an integration test, which tests the interaction of different components. Finally, a system test can test the interaction of the system under test with neighbouring systems.
There are two fundamental ways of performing regression tests: They can be done automatically or manually. Manual tests have the disadvantage that it takes a lot of time to thoroughly test every time the tests should be repeated. It is also not possible to run the test automatically on a continual basis to find errors right after they are introduced in the code while expanding or adjusting the ETL solution. Therefore, it is highly recommendable to automatize tests if possible.
In the Kafka streaming ETL case, the system can be divided into the individual pipeline steps, where each step is a component of the system. For writing automatized component tests, Kafka Streams provides the Kafka Streams test utils. They provide the capability to test a Kafka Streams topology. Messages for input topic are specified here. The test utils can then simulate the Kafka Streams topology that is also used by the application and creates messages for the output topics. These output messages can be compared to the expected results. The test utils were used to write tests for the mapping component of CSF.
For writing integration tests that test the interaction of multiple Kafka applications, a whole Kafka cluster can be simulated in a test. For this purpose, the EmbeddedKafkaCluster, that is a part of Kafka, exits. It runs in memory and enables the tests to simulate multiple brokers in a cluster.
System tests in the CSF case would include Kafka, the source system and FSDP as target system. For realizing them, a test environment should exist where CSF, an instance of the source system and FSDP are running. Test cases can then be defined by defining source data, which is inserted into the source system, and checking, using a specification of the expected content of FSDP tables, if the data is arriving in FSDP as required.
An ETL system is required to be able to handle the data load of its real-world use case. Especially in the context of financial data, consistency of data is crucial, and no data loss can be tolerated. For performance testing and checking if new data arrives in the target system completely, CSF includes a test data generation component that can be used for load tests.
This generation component can inject a large amount of generated data into the source topics. It can be configured using JSON files that specify the schema of the source system data. For generating data for the specified fields, methods of the embedded Java faker can directly be specified. These are then called in the generation process. For fields that need more specific content or that have content that depends on other fields, Java methods implemented in the test data creation component can be specified.
It is also possible to generate faulty data to test error handling. Such faulty data is specified in the same way as non-faulty test data. When the test data generation component is started, the user specifies the number of messages and the number of faulty messages that will be generated for each target Kafka topic. The load test component also gives the user the opportunity to test what happens if there are application failures by, for example, stopping an application in the pipeline.
In this blog post series, an introduction to the streaming platform Kafka was given. Afterwards, we explored the implementation of the streaming ETL framework CSF that provides functionality to implement ETL delivery of data from Kafka to SAP’s new analytical platform FSDP. We have seen how CSF solves a diverse set of challenges occurring in streaming ETL. CSF is naturally extendable with further streaming applications by leveraging the decoupling of applications separated by topics in Kafka. By storing messages in Kafka, if errors occur, they can easily be analysed. The user can therefore trace where the problem lies.
FSDP is SAP’s new, standardized central data platform for financial data. For integrating such a scalable and future-proof analytical platform in a modern enterprise and fully taking advantage of it, real-time data delivery is required, which is enabled by streaming ETL. Kafka as the state-of-the-art streaming platform is the right tool for realizing this. Many benefits are offered by the streaming ETL approach and the new use cases it enables, providing real-time insights into the current state of a company. It is also a process connected with many challenges both in itself as well as in its interplay with the surrounding systems.
As we have seen in this blog post series, CSF as a flexible, easily configurable, scalable and extendable system already solving many ETL challenges is a perfect fit for integrating FSDP into the IT landscape of a financial company on top of Kafka.