import { Component } from '@angular/core'
import { AppModeService } from '../../app-mode.service'

@Component({
    template: `
    <div id="main-content">
    <table class="mission-statement-table">
        <tr>
            <td class="mission-statement-column1">
            </td>  
        </tr>
        <td class="mission-statement-column2">
                <h2><strong>Spark Sql</strong></h2>
                <div class="author-date-item">Jagan Lakshmipathy</div>
                <div class="author-date-item">October 12, 2019</div>
                <p></p>
                <p>Spark enables structured data processing through Spark SQL. Spark SQL documentation provides lot of insights into how the data is structured, computed, and optimized under the hood. Irrespective of interaction type (Refer to Spark SQL documentation <A href="https://spark.apache.org/docs/latest/sql-programming-guide.html">here.</A>)Spark SQL Engine performs the execution under the hood in a unified way. Spark exposes SQL interfaces through DataFrames and DataSets. In this page, we will explore breifly these two interfaces.</p>
                <p>Spark's Resilient Distributed DataSets (RDD), as its name suggests is a distributed data interface is typically accessed through SparkContext. Unlike RDD, Spark SQL interfaces (DataFrame and DataSet) are accessed from SparkSession. SparkSession class provides variants of create functions that gives a DataFrame. SparkSession also provides a read() fuction that returns DataFrameReader that in turn provide different methods to read different formats and return a DataFrame out of the read data. Consider the following fictitious json schema. We have a json "Account" inside an outer json object.</p>
                <pre>&#123;
                "ID":1,
                "Account":&#123; 
                    "Number":"123-ABC-789",
                    "FirstName":"Jay",
                    "LastName":"Smith" 
                &#125;,
                "Date":"1/1/2015",
                "Amount":1.23,
                "Description":"Drug Store" 
            &#125;</pre>
<p>Look at the following scala snippet that reads a json file to a DataFrame. Here we first create a session object and then we create a DataFrame through the DataFrameReader class.</p>           
<pre>val spark = SparkSession.builder
                        .master("local")
                        .appName("Fraud Detector")
                        .config("spark.driver.memory", "2g").enableHiveSupport
                        .getOrCreate()

import spark.implicits._
val df = spark.read.json("your/data/myjson.json")</pre>
                <p>Similarly, a DatatSet can be read from a json file as in the code snippet below. Note how the spark uses the type of the underlying data when it assigns it to the DataSet. We have defined case classes (e.g. nested Transaction and Account) and when we casted the read data to the type Transaction, spark not only automtically reads the data into the nested classes but also resolves the underlying data type to the correct type and name. It is imperative that the  field name in the data and the data member match 1-to-1.</p>
<pre>case class Account(number: String, ....)
case class Transaction(id: Long, account: Account, ....)

import spark.implicits._
val financesDS = spark.read.json("your/data/myjson.json")
                    .cast("timestamp"))).as[Transaction]
</pre>
                <p>We will look into to few more snippets to understand the DataFrame and then will do the same for DataSet afterwards. For now, we will focus on DataFrames.</p>

<p>Windowing is a concept that enables data partitioning and compuation of aggregation function on the window. This feature enable data analysts to observe the trend over the window. PartionBy function in WindowSpec class returns a WindowSpec instance. Funcion rowsBetween() in WindowSpec will let us specifiy the window size and the sliding offset. Function object under Sql package provides several aggregate functions that returns a column object. In the first two lines of the below snippet, we create a window spec and then by passing the window spec to a over() function in the column object returned by an aggregate function (e.g. in our case avg() function) in Function object to create another column object.</p>

<pre>val accountNumberWindowSpec = Window.partitionBy($"AccountNumber").orderBy($"Date").rowsBetween(-4,0)
val rollingAvg = avg($"Amount").over(accountNumberPrevious4WindowSpec)
</pre>
<p>In the next step we query the DataFrame by selecting columns from the DataFrame and insert the above created column "rollingAvg" from the WindowSpec to our selected set of columns. DataFrame provides a na() function that returns DataFrameNaFunctions that provides several variants of drop, fill and replace <A href="https://spark.apache.org/docs/2.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions">functions</A> they all return a DataFrame. Like a typical SQL interface, DataFrame provdes select, selectExpr and where functions. Function withColumn() creates a  new column in the DataFrame.</p>
<pre>df.na.drop("all", Seq("ID","Account","Amount","Description","Date"))
  .na.fill("Unknown", Seq("Description"))
  .where($"Amount" =!= 0 || $"Description" === "Unknown")
  .selectExpr("Account.Number as AccountNumber", "Amount", 
              "to_date(CAST(unix_timestamp(Date, 'MM/dd/yyyy') AS TIMESTAMP)) AS Date")
  .withColumn("RollingAverage", rollingAvg)</pre>
  <p>In the following snippet, everything is self explainatory except for coalesce. Coalese is a method in Dataset class. This method takes an integer argument numPartitions and returns a new Dataset that has exactly numPartitions partitions, when the fewer partitions are requested. If a larger number of partitions is requested, it will stay at the current number of partitions.</p>
  <pre>financesDF.select(concat($"Account.FirstName", lit(" "), $"Account.LastName").as("FullName"), $"Account.Number".as("AccountNumber"))
      .distinct
      .coalesce(5)</pre>

                    
                <pre>1. . DataFrame is subsumed in DataSet (DataFrame = DataSet[Row].
            2. SQLContext/HiveContext -> SparkSession (collapsed)
            3. SparkSession wraps SparkContext
            4. Spark 2.x changes:
                4.a Uses Tungsten 2.0 engine
                4.b Spark is moving towards a compiler.
                4.c Extended SQL
                4.d Accumulator simplication
                4.e DataFrame focused machine learning</pre>
    </table>
    `
})
export class SparkSqlComponent {
   
    constructor(private modeService: AppModeService){
        this.modeService.displaySidebar()
    }
}