# Working With pandas Dataframes in Databricks
<img src='https://pandas.pydata.org/static/img/pandas.svg' width="400" />

In Databricks, we typically work with Spark dataframes.  They are great for doing distributed computing on a cluster.
However, there are many Python libraries that are not designed for distributed computing.  They use dataframes from the [pandas](https://pandas.pydata.org/)
library.  These datafames are designed to be stored in the memory of a single machine, not distributed across a cluster.

*Is there a way we can distribute pandas-based Python code across a cluster?*

Yes, you can!  Spark 2.3 added a feature called "[Pandas User-Defined Functions](https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html)"
(or "UDF" for short).  This provides the ability to define low-overhead, high-performance UDFs entirely in Python.

Here's how it works:
 - Write a Python function that accepts a pandas dataframe as the input
 - Divide your Spark dataframe into groups using the `groupBy` method
 - Each group will be converted to a pandas dataframe and passed to your function. These will be executed in parallel as each group's processing can occur on a different executor.
 
Let's walk through an example!

## Load Some Data Into Spark
First, we'll make a Spark dataframe.

In [0]:
# Read in sample data from a CSV file and infer the schema
df = spark\
      .read\
      .option("inferSchema", "true")\
      .option("header", "true")\
      .csv("/databricks-datasets/flights/departuredelays.csv")

display(df)

date,delay,distance,origin,destination
1011245,6,602,ABE,ATL
1020600,-8,369,ABE,DTW
1021245,-2,602,ABE,ATL
1020605,-4,602,ABE,ATL
1031245,-4,602,ABE,ATL
1030605,0,602,ABE,ATL
1041243,10,602,ABE,ATL
1040605,28,602,ABE,ATL
1051245,88,602,ABE,ATL
1050605,9,602,ABE,ATL


In [0]:
# In this dataset, flights originated from 255 unique airports
df.select("origin").distinct().count()

Out[2]: 255

In [0]:
# How many flights originated from each airport?
display(df.groupBy("origin").count().sort("origin"))

origin,count
ABE,448
ABI,706
ABQ,5739
ABY,252
ACT,437
ACV,767
ADQ,132
AEX,878
AGS,724
ALB,1959


## Create a Python Function

Now we want to create a user-defined function that performs our calculations.  There are two types of pandas UDF's in Spark:  Scalar and Grouped-Map.
Scalar functions accept a single pandas *series* (i.e. a single column).  Grouped-map functions accept a whole pandas dataframe, and they can operate on all of the
data in that dataframe.  We'll be using a grouped-map function for our example.

Grouped-map pandas UDFs are designed for the "split-apply-combine" pattern of data analysis, and they operate on all the data for some group.
(e.g., "for each airport, apply this operation")  Grouped-map pandas UDFs first split a Spark DataFrame into groups based on the conditions specified
in the `groupBy` operator, applies a user-defined function (pandas.DataFrame -> pandas.DataFrame) to each group, and then combines all of the results
into a new Spark DataFrame.

### Declaring the Output Schema
When you declare your Python function as a pandas UDF, you must provide the schema that will be used for the new dataframe that you are creating.  In our case,
we're going to use the structure of the existing dataframe and just add some new columns.  So we can start with the schema of the Spark dataframe and append the
definintions of our new columns.

In [0]:
from pyspark.sql.types import *

# Start with the fields already in the Spark dataframe
output_schema_fields = [f for f in df.schema.fields]

# Add the new fields that we will be creating in our UDF
output_schema_fields.append(StructField("run_id", StringType(), True))
output_schema_fields.append(StructField("hostname", StringType(), True))
output_schema_fields.append(StructField("airport_average_delay", DoubleType(), True))

# Convert the list of fields to a Spark "StructType" schema
output_schema = StructType(output_schema_fields)

### Declaring the Function
When you declare your Python function as a pandas UDF, you must use the `pandas_udf` decorator.  This decorator accepts two parameters: the schema of the output
Spark dataframe (which we just created) and the type of pandas UDF (in our case, "GROUPED_MAP").

Inside the function, you work with the pandas dataframe just like you would in straight Python. You can add or remove columns or rows.  You can even
synthesize the data into a new pandas dataframe with a totally different structure and return that back to Spark.

In our example below, we will do three things:
  1. Compute a new unique string and apply the value as a new column to all rows in the dataset.  This will prove that each grouping is processed separately.
  2. Add a new column with the hostname of the computer that is doing the processing.  This will prove that the computation has been distributed to different machines in the cluster.
  3. Compute the average delay time for the airport as a demonstration of how you can compute using a pandas dataframe
  
Let's write our function!

In [0]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
import uuid
import socket

@pandas_udf(output_schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def my_pandas_function(pdf):
  # 1. Compute a guid and assign it as a column to all rows in the pandas dataframe
  my_run_id = str(uuid.uuid4())
  pdf["run_id"] = my_run_id
  
  # 2. Add a new column that contains the name of the computer on which this code is being executed
  pdf["hostname"] = socket.gethostname()
  
  # 3. Compute the average delay for the airport in this grouping and add that as a column to the output pandas dataframe
  pdf["airport_average_delay"] = pdf["delay"].mean()

  return pdf

Now that we have our pandas UDF, we just perform a `groupBy` on our Spark dataframe and then `apply` the UDF to the groupings.  This will run the UDF on each grouping,
collect all the results, and then combine them into a *new* Spark dataframe.

In [0]:
new_df = df.groupBy("origin").apply(my_pandas_function)
display(new_df)



date,delay,distance,origin,destination,run_id,hostname,airport_average_delay
2010635,0,311,BTM,SLC,e0a6b587-aef5-4354-97bc-b427643989cd,0522-135927-tjhdgp5d-10-139-64-8,-0.7666666666666667
2011305,9,311,BTM,SLC,e0a6b587-aef5-4354-97bc-b427643989cd,0522-135927-tjhdgp5d-10-139-64-8,-0.7666666666666667
2020635,-1,311,BTM,SLC,e0a6b587-aef5-4354-97bc-b427643989cd,0522-135927-tjhdgp5d-10-139-64-8,-0.7666666666666667
2021305,-7,311,BTM,SLC,e0a6b587-aef5-4354-97bc-b427643989cd,0522-135927-tjhdgp5d-10-139-64-8,-0.7666666666666667
2030635,-11,311,BTM,SLC,e0a6b587-aef5-4354-97bc-b427643989cd,0522-135927-tjhdgp5d-10-139-64-8,-0.7666666666666667
2031305,0,311,BTM,SLC,e0a6b587-aef5-4354-97bc-b427643989cd,0522-135927-tjhdgp5d-10-139-64-8,-0.7666666666666667
2040635,-1,311,BTM,SLC,e0a6b587-aef5-4354-97bc-b427643989cd,0522-135927-tjhdgp5d-10-139-64-8,-0.7666666666666667
2041305,1,311,BTM,SLC,e0a6b587-aef5-4354-97bc-b427643989cd,0522-135927-tjhdgp5d-10-139-64-8,-0.7666666666666667
2050635,-7,311,BTM,SLC,e0a6b587-aef5-4354-97bc-b427643989cd,0522-135927-tjhdgp5d-10-139-64-8,-0.7666666666666667
2051305,2,311,BTM,SLC,e0a6b587-aef5-4354-97bc-b427643989cd,0522-135927-tjhdgp5d-10-139-64-8,-0.7666666666666667


Now let's run some checks against `new_df` to make sure Spark is processing the data as we expected.  All of the rows for an airport should have been processed by
a single call to the `my_pandas_function` UDF.  Therefore, we would expect all of the rows for each airport to have the same `run_id` value and to have the same
`hostname` value.  We'll use a distinct count to make sure that each airport has only one value for those columns.

In [0]:
from pyspark.sql.functions import *

new_df\
  .groupBy("origin")\
  .agg(
    count("*").alias("row_count"),
    countDistinct("run_id").alias("run_id_count"),
    countDistinct("hostname").alias("hostname_count")
  ).show()

+------+---------+------------+--------------+
|origin|row_count|run_id_count|hostname_count|
+------+---------+------------+--------------+
|   MSY|    10277|           1|             1|
|   GEG|     2044|           1|             1|
|   BUR|     5079|           1|             1|
|   SNA|     9411|           1|             1|
|   GRB|     1109|           1|             1|
|   GTF|      425|           1|             1|
|   IDA|      654|           1|             1|
|   GRR|     2585|           1|             1|
|   PSG|      180|           1|             1|
|   EUG|     1271|           1|             1|
|   PVD|     2895|           1|             1|
|   GSO|     1902|           1|             1|
|   MYR|      383|           1|             1|
|   OAK|    10026|           1|             1|
|   MQT|       77|           1|             1|
|   FAR|     1219|           1|             1|
|   FSM|      600|           1|             1|
|   MSN|     2359|           1|             1|
|   COD|     

Next, since each group of airports was processed separately, we would expect the number of unique `run_id` values to be the same as the number of unique airport codes.
Let's check!

In [0]:
print(new_df.select("origin").distinct().count())
print(new_df.select("run_id").distinct().count())

255
255


We also expect the code to be executed across different machines in the cluster.  If we group by `hostname`, we should see one row for each node in our cluster.  Also, the number of rows processed by each node should be roughly symetrical.

In [0]:
display(new_df.groupBy("hostname").count())

hostname,count
0522-135927-tjhdgp5d-10-139-64-4,287311
0522-135927-tjhdgp5d-10-139-64-8,345022
0522-135927-tjhdgp5d-10-139-64-6,345628
0522-135927-tjhdgp5d-10-139-64-7,413617


Lastly, the `airport_average_delay` value should be the same for every row for a given airport.  We can also use Spark to check our math and make sure that the
average was calculated correctly.

In [0]:
new_df\
  .groupBy("origin")\
  .agg(
    min("airport_average_delay").alias("min_avg_delay"),
    max("airport_average_delay").alias("max_avg_delay"),
    avg("delay").alias("avg_computed_by_spark")
  ).show()

+------+-------------------+-------------------+---------------------+
|origin|      min_avg_delay|      max_avg_delay|avg_computed_by_spark|
+------+-------------------+-------------------+---------------------+
|   CDC|0.33116883116883117|0.33116883116883117|  0.33116883116883117|
|   ACT|  0.897025171624714|  0.897025171624714|    0.897025171624714|
|   HPN| 14.065278559369725| 14.065278559369725|   14.065278559369725|
|   RDD| 13.059259259259258| 13.059259259259258|   13.059259259259258|
|   AUS| 10.835627368841013| 10.835627368841013|   10.835627368841013|
|   MLI| 13.699604743083004| 13.699604743083004|   13.699604743083004|
|   SJU| 14.286952688803996| 14.286952688803996|   14.286952688803996|
|   ATW| 14.175652173913043| 14.175652173913043|   14.175652173913043|
|   DHN|   6.95575221238938|   6.95575221238938|     6.95575221238938|
|   AVL|  8.158119658119658|  8.158119658119658|    8.158119658119658|
|   GJT| 2.4332939787485244| 2.4332939787485244|   2.4332939787485244|
|   LG

So there you have it.  Spark allows you to distribute execution of your pure Python functions across a cluster by dividing the data into groups, converting each group
to a pandas dataframe, and then processing the groups in parallel.