Apache Spark + Scala To Become a Big Data Balla

turbin3

BuSo Pro
Joined
Oct 9, 2014
Messages
613
Likes
1,285
Degree
3

First off, let me start by saying this is directed primarily at those that operate with extremely large and/or complex data sets, that are attempting to derive valuable and actionable insights from those data sets. In essence, you have tens, hundreds of millions, or maybe billions of rows of data......but maybe you're still stuck with using spreadsheet programs to try to pull something of value out of the whole incoherent mess. Operating at that scale really demands a programming language, and might also benefit from a functional programming language. At the end of the day, the name of the game is auto/semi-automating as much of the busy work, to lead to actionable insights you can make money with. Time is money.

Also, one caveat. I pieced a lot of this back together from some of my code that has since evolved. Some of it may not be 100% correct, but worst case, should still be 95%+. If anyone sees problems and or has corrections, please feel free to list them so we can get the right code out there!

Spark-logo-192x100px.png

Introduce Apache Spark. Some of you have probably heard of things like Apache Hadoop, and Hadoop's MapReduce. Spark is MapReduce on crack. It's an engine for large-scale data processing, cluster programming, and cluster computing in a parallel and fault-tolerant way. In short, if you need to process massive data sets to derive insights or summarize data, instead of performing all of this computing work on 1 system, you can program and utilize the processing power of entire clusters of systems for far greater performance. It can run standalone, if you have your own set of serious hardware infrastructure. It can run on AWS EC2. You can utilize third party, managed systems that operate on AWS, such as Databricks (highly recommended), so that you're only paying for what you use, when you use it for the most part. It takes money to make money, and spending a few hundred bucks a month for massive processing power can allow you to more efficiently derive the insights necessary to make you a whole lot more money!

The nice thing about Spark, is it can utilize a variety of languages, such as Java, Scala, Python, R, SQL, and it can use a variety of data sources. I've found good results using largely AWS infrastructure, such as S3 to store the data, since it's an inexpensive store as well as easily/quickly accessible with Spark (especially if using something like Databricks, in which you might have an EC2 spot instance running, so S3 will naturally be quicker to access).

Scala_logo.png

Introduce Scala. Scala is what is considered to be a "functional programming" language. It seems to be a favorite, or at least has grown significantly in popularity, in the big data world. I'm not going to go into detail into exactly what Scala is, and all of its components, as I'm still a relative novice with the language. I'll primarily be showing you some practical examples of using it to perform useful functions. Although I'm more experienced with and generally prefer Python for most things, I've found great success in beginning to learn Scala. As I've grown in learning it, it has helped me make significant leaps in the code I'm writing, as well as improving the speed and efficiency of that code. It does, as does any functional programming language, really take a paradigm shift in how you think about coding. For instance, there are significantly less-efficient ways you can write code with Scala, especially if you think in and are used to coding in a fairly "linear" manner like I have been. I tend to want to address a problem line by line, dealing with issues in order. With Scala, you can often group a lot of efforts into more massive variables, and potentially do a whole lot more work in less time, with less effort. An easy way to comprehend what Scala is, as well as functional programming in general, is that it's like code Inception. Code inside of code, being twisted around itself and manipulated, ultimately becoming a force multiplier towards getting things done more efficiently.

parquet_logo.png

Introduce Parquet. I'm briefly mentioning Apache Parquet, as it's a storage format I find myself utilizing frequently, as it can be highly efficient. Parquet is a columnar storage format designed to allow more efficient compression and encoding of data, as well as complex data structures. Once you start getting into the Apache Hadoop ecosystem, or pretty much anything related to Hadoop or its evolutions/derivations (Spark), Parquet is something you're going to see pretty frequently. What I'll typically use Parquet for, is I will access a dataset, filtering it, derive some type of desired data or summary data from it, and I will then Parquet that data out to storage (S3). For further processing, I can then access that data on S3, run whatever analysis across it, and be processing at MUCH greater speed. For example, if I wanted to analyze a bunch of complex traffic and conversion data to determine what on-page optimizations I might need for a complex ecommerce site, I would first define the primary variables I wanted to pay attention to. I would then parse and filter the original dataset, and Parquet a summary of the data I actually care about. I would then determine insights I wanted to derive from that data (bounce rate vs load times for an A/B of group of pages/categories with different layouts, month over month, etc.), and determine what manner of visualization might help me see what I need to see. Line graph? Scatter Plot? What about a distribution graph? You could use something like ggplot or matplotlib for Python, or maybe the JS D3 library if you want to get real creative with visualizations. Bottom line, regardless of how you use it, it can be a real time saver developing the habit of parqueting out efficient summaries and segments of your datasets.

On to the examples. I'll first show you how to access a dataset (using S3 in this example), define some variables, create some filters, and then parquet out a more efficient form of that dataset to do most of your work with.

First, as with most programming languages, we need to import Spark libraries that we'll need to use:

Code:
import java.time._
import java.time.format._
import java.time.temporal._
import java.sql.Timestamp
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

Next, let's define a "value" variable for our datasource. One thing to note with Scala is, with variables you have both "var" and "val". The easy way to remember the difference is "var" can vary, whereas "val" is constant and cannot change. For example, you might use "var" if you're defining a baseline for something, that might later be modified with various calculations. In this case, we don't need our datasource location to change, so we'll use "val":

Code:
val data = s"/mnt/directory/example.log"

Next, we'll want to define the different elements that are present in our dataset, so we can easily parse and extract the necessary elements. Let's say our dataset is rows of comma-separated data, in 5 columns. We'll first define an immutable value (val) so that we understand the structure of our data. There are probably more efficient ways, but I'll keep this rudimentary. We'll be using regex to denote the comma separation (hence the .r on the end of the variable):

Code:
val dataStructure = """(\,)(\,)(\,)(\,)""".r

We'll then define a complex variable that will help us break these components down further. It starts with defining the variable as strings and then creating a DataFrame of named columns for those strings (lets say the data is actually a mix of strings and ints aka integers). A DataFrame is basically a relational table that can be more efficient for processing:

Code:
def parseDataStructure(path: String): DataFrame = {
    val dataStrings = sc.textFile(data)

    val strings = dataStrings.flatMap { line =>
        line match {
            case dataStructure(dataOne, dataTwo, dataThree, dataFour, dataFive) =>
                case class dataStringsDefined(dataOne: Int, dataTwo: Int, dataThree: Int, dataFour: String, dataFive: String)
        }
    } toDF
}

I'm not 100% sure about that code above ^ as I had to hack this back together based on a more functional and highly modified version of something. If it's not 100%, it should be pretty close to working.

Next we create a variable to apply that DataFrame to our datasource:

Code:
val outputData = parseDataStructure(data)


We'll now write that data out to parquet, for later use.

Code:
outputData.write.parquet(s"/mnt/output-directory/output.parquet")

Again, that ^ might be a little off, as I have almost every component defined as separate variables, and I had to piece this back together in a simplified form.

Let's now access that parquet and do something with it! First, we'll create a variable to access the parquet:

Code:
val readData = sqlContext.read.parquet(s"/mnt/output-directory/output.parquet")

We'll then create a temp table for quick access:

Code:
readData.registerTempTable("read_data")

Now for those of you that have some experience with SQL, you'll like the fact that you can use most of the same SQL syntax in Spark. What I like to do is a lot of work "behind the scenes", to come up with efficient data summaries that I can then query in a simple and familiar manner. For example, I'll take data types I want, filter out those I don't, where possible I'll convert the desired data types to Ints for maximum speed and efficiency, and then I'll parquet out a summary of that data. During a recent project, I was able to reduce the size of a particular dataset by over 90%, cutting down processing time and analysis by 99%. Freakin awesome.

So let's say the datasource was 30 days of data, and let's say we want to see what the overall volume of column "dataOne" was for the month:

Code:
sqlContext.sql("SELECT dataOne,count(*) FROM read_data")

Now let's say the "dataFour" column was actually a timestamp column, and we wanted to query based on dating. We would have needed to add some code to "val strings" up above, to properly parse it as a timestamp, but we'll just pretend we already did that, so as to not complicate things too much:

Code:
display(sqlContext.sql("SELECT dataOne,dataFour,count(*) FROM read_data GROUP BY dataFour ORDER BY dataFour"))

This would give us a table (or visualization if we were using one) that's ordered by timestamp, so we could see the progression of our data over the past 30 days. I won't go into too much more detail on the SQL side, as I'm sure quite a few of you already have plenty of experience in that area. A lot of it is much the same in Spark, with a few minor differences in how you call it in Scala, Python, etc.

Looks like I've run out of time tonight. When I have some more time, I'll try to detail a bit more about how you can optimize this process by filtering data out of your summaries, or simply converting less efficient data types (Strings, for example) to more efficient types (Ints), which can both cut down on storage size as well as processing time.
 
Pretty neat. Another good tool that I like for large data sets (specifically logs and event based data) is Splunk. It has a pretty decent visualization suite attached to it, and it's API is pretty friendly as well. I haven't worked with any of the above, but I am familiar with Spark and Scala.
 
I just noticed today that Coursera has a specialization involving Scala. Pretty neat.
 
Back