Skip to content

shannonlowder.com

Menu
  • About
  • Biml Interrogator Demo
  • Latest Posts
Menu

Developing Datbricks Ingestion locally

Posted on January 10, 2023January 5, 2023 by slowder

Spark engines like Databricks are optimized for dealing with many small-ish files that have already been loaded into your Hadoop-compatible file system. If you want to process data from external sources, you’ll want to extract that data into files and store those in your Azure Data Lake Storage (ADLS) account attached to your Databricks Workspace. In this demo, we will build a data pipeline to work with the New York City Taxi & Limousine Commission data.

Structuring Our Solution

First and foremost, I advocate setting up a repository to work in. That way, if yo get into trouble, you can roll back changes line-by-line if nec ssary. Use whatever backend system you want to support this re sitory. In this case, I’ll spin up a repository named Databricks_IDE_Tr ining in my sandbox Azure Dev Ops acc unt.

Our solution will consist of a single Python package that contains several modules. The first package we’re go g to build is our ingestion package. We’ll need to create file and folders to begin this solution. First, we’ll add a folder to our local development folder named nyctlc. In that folder, we’ll add ingest.py. This is where our ingestion code will live.

Next, we l add a file to our local development folder called pytest.ini. This is where we’ll configure our project so we ca run tests. Adding the following ode to pytest.ini tells pytest that all our tests will be stored in files named test_<package name>.py.

[pytest]
python_files = test_*.py

Finally, we’ll add a folder for our tests and a file named test_ingest.py. This is where we’ll write our tests for our ingestion code.

We’re also going to need a local folder to store data. This will be a stand-in for ADLS. I created a folder named LocalStorage and added it to my workspace to quickly access data files while developing the solutio

You’re ready to start writing code when your solution loo s like the screenshot.

Extract and Load Trip Record Data

Step one in our solution needs to be able to download the trip data. If you hit https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page you’ll learn a few essential details about building our first function. The source files are now in parquet format instead of CSV. Also, if you highlight over each link for the four source file types, you’ll discover they follow a pattern.

  • Yellow Taxi trip records — https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_<YYYY>-<MM>.parquet
  • Green Taxi trip records — https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_<YYYY>-<MM>.parquet
  • For-Hire Vehicle trip records –https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_<YYYY>-<MM>.parquet
  • High Volume For-Hire Vehicle trip records — https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_<YYYY>-<MM>.parquet

Databricks is best at ELT. That means some other process will be developed to move these source files to our bronze (raw) zone. I usually use Azure Data Factory to orchestrate these Extract-Load workloads. In this case, we’re just going to download these files to our LocalStorage \bronze folder to emulate that work.

Remember in datalakes, all files in a given folder are expected to be compatible schemas. So in our case we load each file to a matching folder where more files of that schema can land. For example, all fhv_tripdata files go into the folder /bronze/fhv.

Transform Trip Record Data

Parquet file format is nice. It’s compressed, and optimized for high row volumes. The only thing that could be better is Delta. We can add schema and ACID compliance and audit history to our data store. In this transform code, we’re going to write some code to transform our data from Parquet to Delta files and tables.

In the code snippets below, I’m excluding the in-code documentation for clarity. I follow the default PyLint suggestions for documentation, so every package, module, class, and method are commented. I also suggest comments for obscure code inside a module. Obscure is any code a junior programmer couldn’t understand by just reading the code.

Let’s add two files to our nycltc folder, __init__.py and ingest.py. In the init file insert the following code.

"""
Import helper, list all nyctlc modules here as they're developed.
"""
from nyctlc import ingest

__all__ = ['ingest']

This file helps when you’re importing your new package. I’ve found pytest struggles to resolve packages and modules without this file. For now, this file will ensure we can import our new ingestion module we’re about to create. To use build python solutions around delta, we need to import two modules. We installed these requirements in the previous article.

from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession

Next, we’ll stub out a class in our ingest module called Ingest. My practice is to name classes after the module file name. A module file can contain more than one class, in that case the classes are named <ModuleName_Functionality>. So in our Ingestion case, if I needed a second class to handle ingesting SQL Server sources, I’d name that Ingest_SqlServer. ClassNames are PascalCase.

class Ingest:

Since our ingestion module runs spark and delta functionality, and to use those, you have to set up a SparkSession object, I set up two private attributes. The first is the SparkBuilder, the object that lets you configure the SparkSession, then the SparkSession instance itself. Private attributes’ names begin with an underscore.

    _builder = SparkSession.builder.appName("nyctlc.Ingest") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

    _spark = configure_spark_with_delta_pip(_builder).getOrCreate()

Now we’re ready to add our first method. This method will read a source folder full of parquet files, then write that data back out in spark format. This example of transforming code is meant to illustrate the pattern. Our method only requires a source_path and a destination folder. The error handler is simplified at this point. In a real case, we’d build in as many error handlers to this code as we can uncover in the development process.

def load_parquet_to_delta(self, source_path, destination_folder) -> bool:
try:
   source_df = self._spark.read.parquet(source_path)
   source_df.write.format("delta").mode("append").save(destination_folder)
   return True
except BaseException as e: 
   print(e)
   return False

Here’s the code as a whole.

Next Time

Next time, I’ll walk you through my approach for simultaneous code and test development. By taking this approach I can work through coding issues as I go, and when I’m done, I have tests that can be used for the deployment process to help me check my code before release!

If you have any questions, let me know.

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Recent Posts

  • A New File Interrogator
  • Using Generative AI in Data Engineering
  • Getting started with Microsoft Fabric
  • Docker-based Spark
  • Network Infrastructure Updates

Recent Comments

  1. slowder on Data Engineering for Databricks
  2. Alex Ott on Data Engineering for Databricks

Archives

  • July 2023
  • June 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • October 2018
  • August 2018
  • May 2018
  • February 2018
  • January 2018
  • November 2017
  • October 2017
  • September 2017
  • August 2017
  • June 2017
  • March 2017
  • February 2014
  • January 2014
  • December 2013
  • November 2013
  • October 2013
  • August 2013
  • July 2013
  • June 2013
  • February 2013
  • January 2013
  • August 2012
  • June 2012
  • May 2012
  • April 2012
  • March 2012
  • February 2012
  • January 2012
  • December 2011
  • November 2011
  • October 2011
  • September 2011
  • August 2011
  • July 2011
  • June 2011
  • May 2011
  • April 2011
  • March 2011
  • February 2011
  • January 2011
  • December 2010
  • November 2010
  • October 2010
  • September 2010
  • August 2010
  • July 2010
  • June 2010
  • May 2010
  • April 2010
  • March 2010
  • January 2010
  • December 2009
  • November 2009
  • October 2009
  • September 2009
  • August 2009
  • July 2009
  • June 2009
  • May 2009
  • April 2009
  • March 2009
  • February 2009
  • January 2009
  • December 2008
  • November 2008
  • October 2008
  • September 2008
  • August 2008
  • July 2008
  • June 2008
  • May 2008
  • April 2008
  • March 2008
  • February 2008
  • January 2008
  • November 2007
  • October 2007
  • September 2007
  • August 2007
  • July 2007
  • June 2007
  • May 2007
  • April 2007
  • March 2007
  • February 2007
  • January 2007
  • December 2006
  • November 2006
  • October 2006
  • September 2006
  • August 2006
  • July 2006
  • June 2006
  • May 2006
  • April 2006
  • March 2006
  • February 2006
  • January 2006
  • December 2005
  • November 2005
  • October 2005
  • September 2005
  • August 2005
  • July 2005
  • June 2005
  • May 2005
  • April 2005
  • March 2005
  • February 2005
  • January 2005
  • November 2004
  • September 2004
  • August 2004
  • July 2004
  • April 2004
  • March 2004
  • June 2002

Categories

  • Career Development
  • Data Engineering
  • Data Science
  • Infrastructure
  • Microsoft SQL
  • Modern Data Estate
  • Personal
  • Random Technology
  • uncategorized
© 2025 shannonlowder.com | Powered by Minimalist Blog WordPress Theme