Companies are always looking for better ways to manage and process their data. In this blog post, we’ll explore a proof of concept developed for a leading beauty brand, demonstrating how Mage Pro can turn complicated data tasks into smooth operations. Whether you’re a data engineer, analyst, or simply interested in data management, this article will offer valuable insights into building effective data pipelines using Mage Pro, Azure Blob Storage, and Snowflake.
Table of contents
Introduction
Understanding the challenge
Solution overview
Step-by-step implementation
Loading data from Azure Blob Storage
Splitting data into chunks
Transforming data in chunks
Combining chunks while maintaining order
Exporting data to Snowflake
Conclusion
Understanding the challenge
The global leader in beauty products needed to verify Mage Pro by verifying the data pipeline tool could handle the following:
Ingest Data: Retrieve files from Azure Blob Storage.
Process in Chunks: Split large files into manageable chunks for efficient processing.
Maintain Order: Ensure that the original order of data is preserved throughout processing.
Export Data: Send the processed data to a generic exporter, such as Snowflake, maintaining data integrity and order.
This POC aimed to demonstrate Mage Pro’s capability to handle these requirements.
Solution overview
The solution involved building a data pipeline with the following steps:
Load Data: Fetch the file from Azure Blob Storage.
Split Data: Divide 20,000 rows into 10 chunks of 2,000 rows each using dynamic blocks.
Transform Data: Apply necessary transformations to each chunk.
Export Data: Transfer data to Snowflake using Mage Pro’s exporter block in it’s original order.
What are dynamic blocks?
Dynamic blocks in Mage are a special type of block that can create multiple downstream blocks at runtime. This feature allows for incredible flexibility in pipeline design, enabling data engineers to create workflows that adapt to the data they’re processing. The power of dynamic blocks lies in their ability to generate a variable number of blocks based on the output of an upstream block. This means your pipeline can scale and adjust itself depending on the data it receives, without requiring manual intervention or redesign. Dynamic blocks run in parallel, reducing the processing time and improving the efficiency of your data pipelines.
How dynamic blocks work
Let’s break down the mechanics of dynamic blocks:
Output Structure: A dynamic block must return a list of two lists of dictionaries. The first list contains the data that will be passed to downstream blocks, while the second list contains metadata for each dynamically created block.
Downstream Block Creation: The number of downstream blocks created is equal to the number of items in the output data multiplied by the number of direct downstream blocks.
Data Flow: Each dynamically created block receives a portion of the data from the dynamic block, allowing for parallel processing of different data subsets.
Metadata: The metadata provided by the dynamic block is used to uniquely identify each dynamically created block, ensuring proper data routing and execution.
Step-by-step implementation
Let’s explore each step in more detail:
Loading data from Azure Blob Storage
The first step involved fetching data from Azure Blob Storage. Mage Pro connects to an plethora of different data sources through its data_loader
decorator. Many data loader block templates are available in Mage Pro that make extracting data from different sources more efficient.
Explanation of code:
Connection: The script connects to Azure Blob Storage using a secure connection string saved in and retrieved from the Mage Pro secrets manager.
Blob Selection: It accesses a specific container and lists blobs with a given prefix, selecting the latest one based on sorting.
Data Loading: The selected CSV file is downloaded and read into a pandas DataFrame for further processing.
2. Splitting data into chunks
Processing large datasets can be resource-intensive. To optimize performance, the data is split into smaller chunks using dynamic blocks. Here, 20,000 rows are divided into 10 chunks of 2,000 rows each. To mark the block dynamic click the three dots located in the top right of the pipeline block and click “Set block as dynamic parent.” Setting the block as dynamic will create 10 runs of the data based on a chunk size of 2000. The blocks will run in parallel with each other.
Explanation of code:
Chunking Logic: The function checks if the DataFrame is empty. If not, it calculates the total number of rows and splits the DataFrame into chunks of 2,000 rows each using pandas’
iloc
indexing.Return Structure: The function returns a list of DataFrames, each representing a chunk of the original data.
3. Transforming data dynamically
Each data chunk undergoes transformation to enhance or modify its structure. In this POC, a new column risk_score_2
is added based on the existing cardiac_risk_score
. With the previous block being checked as dynamic, this block run will be a dynamic child block of the previous. All subsequent blocks will be dynamic child blocks of the original child block until the dynamic block is reduced.
Explanation of code:
Data Copy: To prevent altering the original DataFrame, a copy is made.
Data Type Handling: The cardiac_risk_score column is converted to a string to handle any non-numeric values gracefully.
Transformation Logic: A nested function assign_risk_score categorizes the risk based on the cardiac_risk_score. It handles various scenarios, including invalid or missing scores.
Applying Transformation: The transformation is applied to each row using pandas’ apply method, and the transformed chunks are collected into a new list.
4. Exporting data to Snowflake
The final step involves exporting the processed data to Snowflake, a cloud-based data warehousing solution. The data is exported in 10 chunks to the data warehouse, maintaining the original order of the extracted file.
Explanation of code:
Configuration: The exporter reads configuration settings from an
io_config.yaml
file, ensuring secure and flexible connection parameters.Export Logic: The
Snowflake
loader handles the data export, specifying the target table, database, and schema. Theif_exists='replace'
parameter ensures that the table is updated with the new data, replacing any existing entries.Scalability: This setup allows for easy scalability and integration with other data warehouses or destinations by modifying the exporter configuration.
Conclusion
This proof of concept for a leading beauty company shows how Mage Pro simplifies the management of complex data pipelines. Thanks to its easy-to-use features like dynamic blocks the company can process large amounts of data efficiently and with minimal hassle.
If you’re aiming to build a similar pipeline or improve your current data processes, Mage Pro provides the tools and flexibility to make it happen. Take advantage of Mage Pro to streamline your data operations and keep things running smoothly.
Cole Freeman
Oct 31, 2024