# DBUtils in Parallel

The `fs` tools of `dbutils` are super handy for all sorts of things when working with files.  However, by default, the tools run only on the driver.
This means files are processed one at a time.  If we are working with many files, this will take a long time.

But we have a whole cluster of computers at our command!  Can we parallelize `dbutils` and divide the work among all the computers in the cluster?

Reference:  [How to list and delete files faster in Azure Databricks](https://docs.microsoft.com/en-us/azure/databricks/kb/data/list-delete-files-faster)

## Create Some Sample Files

For this experiment, we need a bunch of files to move around... so let's make some files!  We'll also use this as an opportunity to show one technique for using `dbutils` in parallel.

But first we need to do some housekeeping and create the directories where we'll put our files.

In [0]:
import random
import string

def make_random_string(length: int) -> string:
  return ''.join(random.choice(string.ascii_lowercase) for i in range(length))

dir_a = "/tmp/" + make_random_string(5)
dir_b = "/tmp/" + make_random_string(5)

dbutils.fs.mkdirs(dir_a)
dbutils.fs.mkdirs(dir_b)

In [0]:
%scala
import org.apache.spark.sql.functions.udf
val r = new scala.util.Random();
val make_random_string = udf((length: Int) => r.alphanumeric.take(length).mkString)
spark.udf.register("make_random_string", make_random_string.asNondeterministic())
spark.sql("SELECT make_random_string(20)").show()

In [0]:
import pyspark.sql.functions as pyf

# This dataframe contains names and contents of the 10,000 gibberish files that we will make.
df = spark.range(1, 10000) \
        .withColumn("file_name", pyf.expr("make_random_string(30)")) \
        .withColumn("prefix", pyf.substring(pyf.concat(pyf.lit("0" * 10), pyf.col("id")), -8, 8)) \
        .withColumn("full_name", pyf.concat(pyf.lit(dir_a + "/"), pyf.col("prefix"), pyf.lit("_"), pyf.col("file_name"))) \
        .drop("file_name", "prefix") \
        .withColumn("length", pyf.round(250 + pyf.rand() * 1000)) \
        .withColumn("content", pyf.expr("make_random_string(length)"))

df.createOrReplaceTempView("file_content")

One way (and perhaps the easiest way) to run code in parallel is to use the `foreach` method on a dataframe.  This will serialize the function passed to the `foreach` method to each node and then run that function against each row that resides on that node.

Note, however, that the Python version of `dbutils` cannot be serialized.  üôÅ  Therefore, this technique is only available in Scala.

In [0]:
%scala
spark.table("file_content").foreach{r =>
  val result = dbutils.fs.put(file = r(1).toString, contents = r(3).toString, overwrite = true)
}

## Move Files - Directory

We're going to compare a few different techniques for moving files and see how they perform.  The first one is to use `dbutils` to just move one directory to another.

In [0]:
dbutils.fs.mv(dir_a, dir_b, recurse = True)

## Move Files - One at a Time

The technique above works if all of the files we want to move are already conveniently located in a directory.  If we need to pick and choose the files to move, then we would have to iterate over a list of files and move them one at a time.

In [0]:
files = dbutils.fs.ls(dir_b)
for f in files:
  dbutils.fs.mv(f.path, f"{dir_a}/{f.name}")

## Move Files - Parallel!

The techniques above get the job done... but they move the files one at a time.  Can we use Spark to perform the individual file moves in parallel, thus completing the job faster?

When we created the files, we demonstrated that we can use the `foreach` method to call `dbutils` in parallel.  But that requires the `foreach` method to be called from Scala.
In a project that is based on Python, we might want to control the move process from Python code.  To accomplish this, we will perform two steps:
  1. First, we will create a Spark UDF with Scala.  This will simply be a wrapper around `dbutils.fs.mv`
  1. Next, we will create a Spark dataframe with Python that lists all of the files.  Then we will use Python to call the Spark UDF that moves the files

Note that we mark the UDF with [`asNondeterministic`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.udf.html).  This tells Spark to only execute the UDF once per row.  Otherwise, Spark's optimization engine may try to invoke the UDF multiple times.
But be aware that because of this, the Spark command may lose some of the auto-magic optimization that Spark does... so when Spark queries with non-deterministic UDF's, they will need to
be optimized by hand (*e.g.* make sure filtering is applied in the right order).

In [0]:
%scala
import org.apache.spark.sql.functions.udf

val move_file = udf((from: String, to: String) => dbutils.fs.mv(from, to)).asNondeterministic()
spark.udf.register("move_file", move_file)

In [0]:
files_list = dbutils.fs.ls(dir_a)

df = spark.createDataFrame(files_list) \
        .withColumn("destination", pyf.concat(pyf.lit(dir_b ), pyf.lit("/"), pyf.col("name")))

display(df)

path,name,size,modificationTime,destination
dbfs:/tmp/mqfzs/00000001_jPa6CRa0kVA5XD92u8bGLiTlOh6vj3,00000001_jPa6CRa0kVA5XD92u8bGLiTlOh6vj3,975,1651151060000,/tmp/dtzlb/00000001_jPa6CRa0kVA5XD92u8bGLiTlOh6vj3
dbfs:/tmp/mqfzs/00000002_kXgVGXTcwHg2H14SP032O7d7sMMGD0,00000002_kXgVGXTcwHg2H14SP032O7d7sMMGD0,292,1651151061000,/tmp/dtzlb/00000002_kXgVGXTcwHg2H14SP032O7d7sMMGD0
dbfs:/tmp/mqfzs/00000003_uyWqfk9aEIAkq2na86UuKMwKW813OQ,00000003_uyWqfk9aEIAkq2na86UuKMwKW813OQ,971,1651151061000,/tmp/dtzlb/00000003_uyWqfk9aEIAkq2na86UuKMwKW813OQ
dbfs:/tmp/mqfzs/00000004_PfTzNitDEoZxy7ucdH4gsaiat6D99d,00000004_PfTzNitDEoZxy7ucdH4gsaiat6D99d,686,1651151061000,/tmp/dtzlb/00000004_PfTzNitDEoZxy7ucdH4gsaiat6D99d
dbfs:/tmp/mqfzs/00000005_YkVouFAH40cwSfVFr977WRbC2uN4Q8,00000005_YkVouFAH40cwSfVFr977WRbC2uN4Q8,1004,1651151061000,/tmp/dtzlb/00000005_YkVouFAH40cwSfVFr977WRbC2uN4Q8
dbfs:/tmp/mqfzs/00000006_nfNDaOv1akJyl5QmNrQkeUUD3mKjPN,00000006_nfNDaOv1akJyl5QmNrQkeUUD3mKjPN,988,1651151061000,/tmp/dtzlb/00000006_nfNDaOv1akJyl5QmNrQkeUUD3mKjPN
dbfs:/tmp/mqfzs/00000007_nmIy7TmDqHJkNtEEEFoEsDsazDB0Ee,00000007_nmIy7TmDqHJkNtEEEFoEsDsazDB0Ee,508,1651151061000,/tmp/dtzlb/00000007_nmIy7TmDqHJkNtEEEFoEsDsazDB0Ee
dbfs:/tmp/mqfzs/00000008_jnqvtd5j0R0NAbWGh8qrrGSGfsf5lt,00000008_jnqvtd5j0R0NAbWGh8qrrGSGfsf5lt,1086,1651151061000,/tmp/dtzlb/00000008_jnqvtd5j0R0NAbWGh8qrrGSGfsf5lt
dbfs:/tmp/mqfzs/00000009_kVqkHFXSHwYAVh7S4fLKsG8hcuJR9E,00000009_kVqkHFXSHwYAVh7S4fLKsG8hcuJR9E,476,1651151062000,/tmp/dtzlb/00000009_kVqkHFXSHwYAVh7S4fLKsG8hcuJR9E
dbfs:/tmp/mqfzs/00000010_LLhCxujFbo3hvrvf6J44zOaIzN4cpX,00000010_LLhCxujFbo3hvrvf6J44zOaIzN4cpX,461,1651151062000,/tmp/dtzlb/00000010_LLhCxujFbo3hvrvf6J44zOaIzN4cpX


In [0]:
# Now we call the Spark UDF!
df = df.withColumn("move_result", pyf.expr("move_file(path, destination)"))

# ... and then make spark actually invoke the code
df.groupBy("move_result").count().show()

In [0]:
print(len(dbutils.fs.ls(dir_a)))
print(len(dbutils.fs.ls(dir_b)))

## Clean Up
Get rid of the temp stuff we created

In [0]:
dbutils.fs.rm(dir_a, recurse = True)
dbutils.fs.rm(dir_b, recurse = True)
dbutils.fs.ls("/tmp")

## Conclusion

Calling `dbutils` directly from the driver processes all of the files serially.  For our 10,000 test files, this takes around 30 minutes.  But when we run in parallel, we get a *huge* speed boost.  On a cluster with 8 worker nodes, it takes only 1.17 minutes to move 10,000 files!

## But What About a Native Python Solution?

Since we can't serialize `dbutils`, can we instantiate it on each node? (No, you cannot.  You will get an error message that says: `You cannot use dbutils within a spark job or otherwise pickle it.`)