In this blogpost we are generating our first large dummy dataset and migrating it to AWS S3 using the multipart uploading concept.
April 8, 2022
Is the thought of having to process a terabyte of data keeping you up at night? Are you having nightmares about an error after the process has been running for a week? Look no further! We are able to create and move a seemingly immovable object with the unstoppable force called Spark on Databricks in a matter of minutes.
This blog-post is in part inspired by the following blogpost from 2008: New York Times - Article. The New York times had just digitized their entire history and wanted to process all their articles from 1851 to 1922. The digitized form of the newspapers from this era would contain large TIFF-images, some metadata and text captured using optical character recognition. This added up to terabytes of data to process. Doing this sequentially would result in days upon weeks of processing. This is where MapReduce came in and helped with processing the information in parallel. Due to the ability to process data in parallel using MapReduce they were able to cut down the processing time to a mere 36 hours. For more information on MapReduce, we refer you to Wikipedia: MapReduce -Wikipedia.
It is now 14 years later, and we have seen the emergence of Spark and Databricks as a natural evolution of MapReduce.
Spark has become the market leader in big data processing, and for good reason. Things that set Spark apart from MapReduce is its comparative ease of use and flexibility.(MapReduce was notorious for being hard to master for developers). Spark allows you to write your code in java, Scala, python, SQL or R whilst MapReduce only allows you to use Java.
In a technical sense, the main difference is that Spark uses memory to store data whilst processing. MapReduce does this on regular disk storage. Reading and writing from memory is significantly faster than from disk and with the relatively low cost of resources compared to how it was a couple of years ago, the overall computing price is less of an issue. (Memory is more expensive than disk)
Databricks is a managed data and analytics platform developed by the people responsible for creating Spark. It can be used to harness the power of the Spark architecture and provides you with a wide array of tools to clean, transform, analyze and predict. Databricks provides an all-in-one solution for basically all your data processing needs!
Even writing and reading from storage has become comparatively quick with Spark over the last few years. Spark is now 100X faster in memory and 10X faster in storage than MapReduce!
Let’s begin with a nice little use-case that shows you how to harness the power of Spark by processing a large file with data that we’ll create ourselves. We will use the multipart upload approach to then migrate that data from our local storage to Amazon AWS. You will see that the numbers from that initial New York times use-case will appear like rookie numbers when using Spark.
We will teach you about the following concepts:
Using Spark as a parallel processing engine
Using Python packages in a Databricks notebook
AWS Multipart uploads
In this post we’re going to look at how we can generate a large dataset with dummy data for testing purposes. We will do this using the Datagen library (Datagen - Github) that we will import into our Databricks environment. After the creation of this big dataset, we will try to upload it to a bucket in S3 on AWS. We will use a multipart upload approach to make this process more efficient but more on that later. For a move of this magnitude, we will need a big cluster set-up if we want to process it in a reasonable amount of time. You could play with this a little yourself, it depends on how much money you want to spend and how much time you have. The following picture gives you a brief summary of the cluster we're using:
Let’s begin our coding endeavor with an empty notebook!
First off, we will install the Datagen library using the pip install command. You provide the location of the git repository as a parameter preceded by ‘git+’. In this case it’ll be: ‘git+https://github.com/Databrickslabs/dbldatagen’
Now we have the library installed, we can use it after we import the Datagen library onto our notebook. With the as-statement we give the library an alias. By doing so we can call all the functionalities within this library by starting with ‘dg’.
Through using the help-command, we can get a sense of what the Datagen library has to offer. For this blogpost our focus is going to be on the data_generator function in this package.
We import the os library to make it easier to interact with files on our local Databricks file storage. We use this to store our data intermittently before migrating it to an S3 bucket. The boto3 library will allow us to interact with the bucket that we will use to store our output at the end of this post.
In the next command, we generate a dataframe using the DataGenerator library. We do this by calling the DataGenerator API using a set of parameters. These parameters are: - A name for our dataset. - The number of rows that need to be generated. - The number of columns that need to be generated. - The number of partitions the data is distributed to.
We create some variables that allow us to easily increase the number of rows, partitions and number of columns. By using a multiplier for both the number of rows and the number of partitions, we make sure that the datasets on each partition don't get too big. To achieve optimal performance for this use-case we’ll aim for partition sizes between 100 and 300 mb.
The reason we define a number of partitions is because that allows spark to process the data in parallel across the worker nodes on the cluster. You can calculate the amount of tasks spark can do in parallel by using the "spark.sparkContext.defaultParallelism"-command. When you run this, you will see that the amount will be the same as the amount of workers multiplied by the amount of cores.
Feel free to change up these variables and the multiplier to either increase or decrease the size of the dataset. More data in this dataset might require you to increase the size of your cluster and the amount of workers. You can play around with this.
The following snippet generates an array using a string concatenation combined with a range from zero to a hundred. With python you can use an f-string to easily insert variables.
The expression command allows us to randomly generate some data (based on a small snippet of code) in the columns we built dynamically using the column_count variable.
After running the above snippet of code, we have created a reasonably large dataset with a variety of different datatypes and a small variety of different values. Hopefully this will mimic the dataset you're working with. If we perform a simple display command with our dataframe as a parameter, we can get a little preview.
Bonus: considering the amount of columns in this dataframe, the “.show()”-command is not a great way to preview. You can give it a go and see for yourself!
The next thing we are going to do is to store this big dataset as a CSV to our local Databricks file system.
The number of files we generate is going to be based on the number of partitions we provided in the build. In this case it's 50. We can check this by running the following snippet:
We can use the dbutils library in Databricks to interact with files on the Databricks file system. The command “dbutils.fs.ls” provides us with a list of all the files in the folder that we provide as a parameter.
The reason we loop through the files to only get the ones that have the CSV-format is because Databricks will often create a set of files containing some metadata about the creation of the set of files. These files are prepended with an underscore, the _SUCCESS file indicates that the job to create the set of files has succeeded.
Hopefully, the display-command will show you all the 50 file-paths we just created!
In this step we're going to change the file-location to a FUSE format. This allows us to interact with the files as if they were local files. We also add an index that starts at 1 because we need a part-identifier for our eventual multipart upload to AWS. If you want to know more about file-interaction on Databricks you can go here.
We then calculate the size of all the CSV files in the folder by doing a sum of the fourth attribute (a list index starts at zero) in our list, the file size. The list will provide us with the size in bytes. To make it a little bit more readable, we calculate the size in GB and show that.
Python assert commands are an easy way of checking your work and do some error-handling. In this case we will assert the number of files and the file size. We have to make sure that we abide by the AWS throttling, hence the maximum of 10.000 pars.
Currently, we are lacking a header and a CSV dataset requires one. We will append the header by creating a string (line) from the columns on the dataframe. We have a cool snippet to create a comma delimited line from the list of columns:
Next, we will append this string to the first CSV. We created a little function to prepend a line to a file. We use the os library to interact with files on local storage more easily. We will first get the size of the file size before we prepend our header line. We want to get the first file from our multidimensional list.
The function to prepend a line to our file can be defined as follows:
We can execute the function we just created with the location of our first file and our newly created header string. We then print the size of our file form before and after appending the line. You will see that the byte size is different. Makes sense, right?
When uploading the parts to our S3 storage we need to provide the right file sizes. Our item list now still contains the file size without our header. We want to remedy this by adding the byte size of our header to the file size attribute in our multidimensional list. The +1 is for the new-line character.
You will need to set up your own AWS environment first with an IAM user that has the full rights to amazon S3 (AmazonS3FullAccess). On top of that you need the AWS CLI installed as well. Here’s a guide on how to do that: How to Setup Your Development Environment for AWS. After you’ve configured your AWS, we can create a bucket to write our dataset to. You do this by using the following command:
For the region, preferably pick the area that you’re residing in. If your bucket is in the same region as your cluster, this should mean that the upload process is a bit faster.
In order to make a connection to the newly created bucket, we will use the secret access key ID and secret access key value to make a connection to that bucket from Databricks. We will store both values in a secret scope we create using the Databricks-cli. How you install the Databricks-cli can be found here: Databricks CLI | Databricks on AWS.
We create the scope with the following command in the command line console.
If this goes well, the console will return nothing. We can check whether our secret scope exists by listing all the secret scopes in our Databricks environment.
Is the newly created scope on there? Great!
We can then start putting our secrets in. We do that using the following set of commands:
We can get our secrets in Databricks by using the dbutils library.
For our use-case we will put them in a variable that we will use to make a connection with AWS.
We will convert our list to a dataframe using the following statement:
We will now set up a client which will provide us a low-level interface with the bucket we created. We will use the credentials stored in our variables.
We will further initialize the multipart upload by defining the bucket we've created and the upload key which is going to be the name of the single file we create from the set of files we'll be loading in the next step. The id we get returned is the unique identifier for the upload.
Next up, we're creating a function that takes in a FUSE-path, our access key and secret key. The function opens the file and uses the 'upload_part'-function from the client to migrate the file to the bucket.
The 'upload-part'-function requires a set of parameters of it's own: -Body --> opened file -Bucket --> our bucket name -ContentLength --> length of the opened file -Key --> our upload key -PartNumber --> The indexed we defined at step 8 -UploadId --> the unique identifier for the multipart upload
More information on the boto3 client can be found here: boto3 client.
You can see that the function also requires a correct file size. This is why it was important to adjust the file size with the header length earlier on in this blogpost. It looks a little bit like this:
The function returns us back some of the file information that was ingested along with the ETag, which is an identifier for the newly created file-part in our bucket.
We want to call the function above for all of the items in our dataframe. We will convert the dataframe to an RDD so we can use map to perform our newly created function for every row (the dataframe API doesn't have the map-functionality).
Here is some valuable background information for you: RDD stands for resilient distributed dataset and it underlies all of the dataframes we've been working with. The distributed nature of the RDD allows Spark to process the data in parallel. A dataframe is an RDD organized into named columns.
We catch the output from this process in the RDD we created from this snippet called ETags.
We are almost there, the only thing left for us to do is to combine the files into the one big CSV we created initially. For combining the files we just uploaded into one, we will use the 'complete_multipart_upload'-function from the client. Most of the parameters required for this process, we already have. What we don't yet have, is the correctly formatted list of objects representing the parts. The format we require is an array with JSON-objects containing both the ETag and the PartNumber attribute in a proper sequential order. We create this data by doing the following to the RDD we created in the last step.
We can now use the 'complete_multipart_upload'-function.
Now the proof is in eating the pudding, so let's check if the file is in the bucket we created....
It looks pretty good on our end, hopefully it worked for you!
Even though the use-case is relatively simple, we touched on a fair amount of important concepts in Spark. Hopefully, you now have the tools to:
generate your own datasets that you can use to test the performance of your pipelines.
interact with files in Databricks in a few different ways
upload your files to AWS
And maybe you learned a little bit more about how you can harness the power of the spark architecture to swiftly process your data in parallel. I certainly hope that you enjoyed going through this little use-case and maybe it has inspired you to delve deeper into the world of Spark and Databricks.