# UDF Speed Testing

When using Spark, you have a few different options for manipulating data.  However, some of these options are more performant than others.
In this notebook, we'll try out a few options and measure their peformance.

Here are the methods we will be evaluating:
 - **Spark Commands**: PySpark is a wrapper around the Spark API.  By using PySpark, we can use Python code to send commands directly to Spark.
 - **Spark SQL**: Similar to PySpark, Spark SQL allows us to use SQL-like syntax to send commands directly to Spark.
 - **Scala UDF**: We can extend Spark by writing functions in Scala and registering them with Spark.  This allows Spark to run our custom function
   against the data it is storing.  Since the Scala code can be executed within the same JVM in which Spark runs, the data does not need to be
   serialized to a different process.
 - **Python UDF**:  We can also extend Spark by writing functions in Python and leverage packages from Python's vast eco-system.  However, unlike
   a Scala UDF, this Python code is executed in a separate process (a Python runtime environment).  Therefore, all of the data needed by the
   function must be serialized and sent from the JVM to the Python process.  This incurs a performance penalty.
   
With this understanding of how Spark works, we would expect the native Spark commands to be the most performant.  We would expect the Spark SQL
commands to have almost identical performance since they are functionaly the same as the native commands.  A Scala UDF should perform well
but might be slightly slower than a native command.  Lastly, we would expect a Python UDF to perform the slowest since it requires the data
to be serialized between processes.

For this test, we will do some arbitrary string manipulation.  We will take a `publicationuuid`, which is a string of 64 characters.  We will
take the last 16 characters of the string, append a dash, and then append the remaining 48 characters.  Therefore: <br/>
`63dc895fe59caecedeae490969eee8a061fa1e02e5ea354fe6d1e353b7b8ece4`<br/>
becomes:<br />
`e6d1e353b7b8ece4-63dc895fe59caecedeae490969eee8a061fa1e02e5ea354f`<br />

We will implement this same transformation using the four techniques listed above.  We will apply each implementation to our data frame
and see how they perform.

## Create Our UDF Functions

Now we can implement our UDF functions in Python and in Scala

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

def chop_uuid(text:str) -> str:
  return text[-16:] + '-' + text[:48]

spark.udf.register("python_chop_uuid", F.udf(lambda x: chop_uuid(x), T.StringType()))

In [0]:
%scala
import org.apache.spark.sql.functions.udf

val chop_uuid = udf((x: String) => x.takeRight(16) + "-" + x.substring(0, 48))
spark.udf.register("scala_chop_uuid", chop_uuid)

## Get Some Data

We just need a dataframe with a bunch of double UUID's in it that we can run our functions against.  Half a billion rows should be plenty.

In [0]:
print("Loading data...")
df = spark.read.format("delta").load("/mnt/silver/patent/family").select("publicationuuid").cache()
df.foreach(lambda x: True)
print(f"Number of Rows: {df.count():,}")

## Run the Tests in an Automated Fashion

The code below is a loop.  Each time, we will perform one of a randomly-selected implementation of the transformation.
We will track how long each run takes and then compare the results across the different implementations of the transformation.

In [0]:
import random
from datetime import datetime
from pyspark.sql import functions as F

results = {}

run_types = ['spark', 'sql', 'scala', 'python'] * 20
random.shuffle(run_types)

for run_type in run_types:
  print("\n--------------------------------------------------\n")
  print(f"Trying '{run_type}'")
  
  print("   Applying transformation...")
  if run_type == 'spark':
    df = df.withColumn("spark_command", F.concat(F.substring("publicationuuid", -16, 16), F.lit("-"), F.substring("publicationuuid", 0, 48)))

  elif run_type == 'sql':
    df = df.withColumn("sql_command", F.expr("CONCAT(SUBSTRING(publicationuuid, -16, 16), '-', SUBSTRING(publicationuuid, 0, 48))"))
      
  elif run_type =='scala':
    df = df.withColumn("scala_udf", F.expr("scala_chop_uuid(publicationuuid)"))

  elif run_type =='python':
    df = df.withColumn("python_udf", F.expr("python_chop_uuid(publicationuuid)"))
      
  print("   Triggering action...")
  start = datetime.now()
  df.foreach(lambda x: True)
  end = datetime.now()

  columns = df.columns
  print(f"   Columns: {columns}")
  df = df.drop(columns[-1])

  run_time = (end - start).total_seconds() / 60
  results[run_type] = results.get(run_type, []) + [run_time]
  print(f"   Completed in {run_time} minutes")

## Results
Use some loops to display the results as a table.

**NOTE**:  These results are expressed in **_minutes_**, not seconds.

In [0]:
from statistics import mean

keys = results.keys()

line = "   "
for k in keys:
  line = line + f"{k:>20}"

print(line)


for i in range(0, len(results[run_type])):
  line = f"{i:>3d}"
  for k in keys:
     line = line + f"{results[k][i]:>20.2f}"
  
  print(line)
  
line = "---"
for k in keys:
  line = line + f"{'--------':>20}"

print(line)

  
line = "Avg"
for k in keys:
  line = line + f"{mean(results[k]):>20.2f}"

print(line)

- Scala UDF:  0.66 minutes
 - Python UDF: 0.90 minutes

---

 - Python UDF is 36% slower than Scala UDF