7  {SparkR} to {sparklyr}

7.1 Introduction

Beginning with Spark 4.x, {SparkR} will be deprecated. Going forward, {sparklyr} will be the recommended R package for working with Apache Spark. This guide is intended to help users understand the differences between {SparkR} and {sparklyr} across Spark APIs, and aid in code migration from one to the other. It combines basic concepts with specific function mappings where appropriate.

7.1.1 Overview of {SparkR} and {sparklyr}

{SparkR} and {sparklyr} are both R packages designed to work with Apache Spark, but differ significantly in design, syntax, and integration with the broader R ecosystem.

{SparkR} is developed as part of Apache Spark itself, and its design mirrors Spark’s core APIs. This makes it straightforward for those familiar with Spark’s other language interfaces - Scala and Python. However, this may be less intuitive for R users accustomed to the tidyverse.

In contrast, {sparklyr} is developed and maintained by Posit PBC with a focus on providing a more R-friendly experience. It leverages {dplyr} syntax, which is highly familiar to users of the {tidyverse}, enabling them to interact with Spark DataFrames using R-native verbs like select(), filter(), and mutate(). This makes {sparklyr} easier to learn for R users, especially those who are not familiar with Spark’s native API.

7.2 Environment setup

7.2.1 Installation

If working inside of the Databricks Workspace, no installation is required - you can simply load {sparklyr} with library(sparklyr). To install {sparklyr} on a machine outside of Databricks, follow these steps.

7.2.2 Connecting to Spark

When working inside of the Databricks workspace, you can connect to Spark with {sparklyr} with the following code:

library(sparklyr)
sc <- spark_connect(method = "databricks")

When connecting to Databricks remotely via Databricks Connect, a slightly different method is used:

sc <- spark_connect(method = "databricks_connect")

For more details and an extended tutorial on Databricks Connect with {sparklyr}, see the official documentation.

7.3 Reading & Writing Data

In contrast to generic read.df() and write.df() functions in {SparkR}, {sparklyr} has a family of spark_read_*()and spark_write_*() functions to load and save data. There are also unique functions to create Spark DataFrames or Spark SQL temporary views from R data frames in memory.

7.3.1 TL;DR

Recommended function mapping
Task {SparkR} {sparklyr}
Copy data to Spark createDataFrame() copy_to()
Create temporary view createOrReplaceTempView() Use invoke() with method directly
Write data to table saveAsTable() spark_write_table()
Write data to a specified format write.df() spark_write_<format>()
Read data from table tableToDF() tbl() (or spark_read_table() when it’s fixed)
Read data from a specified format read.df() spark_read_<format>()

7.3.2 Loading Data

To convert a R data frame to a Spark DataFrame, or to create a temporary view out of a DataFrame to apply SQL to it:

SparkR

# create SparkDataFrame from R data frame
mtcars_df <- createDataFrame(mtcars)

sparklyr

# create SparkDataFrame and name temporary view 'mtcars_tmp'
mtcars_tbl <- copy_to(
  sc,
  df = mtcars,
1  name = "mtcars_tmp",
  overwrite = TRUE,
2  memory = FALSE
) 
1
copy_to() will create a temporary view of the data with the given name, you can use name to reference data if using SQL directly (e.g. sdf_sql()).
2
Default behaviour of copy_to() will set memory as TRUE which caches the table. This helps when reading the data multiple times - sometimes its worth setting to FALSE if data is read as one-off.

7.3.3 Creating Views

SparkR

# create temporary view
createOrReplaceTempView(mtcars_df, "mtcars_tmp_view")

sparklyr

# direct equivlent from SparkR requires `invoke`
# usually redundant given `copy_to` already creates a temp view
spark_dataframe(mtcars_tbl) |>
  invoke("createOrReplaceTempView", "mtcars_tmp_view")

7.3.4 Writing Data

SparkR

# save SparkDataFrame to Unity Catalog
saveAsTable(
  mtcars_df,
  tableName = "<catalog>.<schema>.<table>",
  mode = "overwrite"
)

# save DataFrame using delta format to local filesystem
write.df(
  mtcars_df,
  path = "file:/<path/to/save/delta/mtcars>",
1  source = "delta",
  mode = "overwrite"
)
1
write.df() supports other formats via source parameter

sparklyr

# save tbl_spark to Unity Catalog
spark_write_table(
  mtcars_tbl,
  name = "<catalog>.<schema>.<table>",
  mode = "overwrite"
)

# save tbl_spark using delta format to local filesystem
spark_write_delta(
  mtcars_tbl,
  path = "file:/<path/to/save/delta/mtcars>",
  mode = "overwrite"
)

# Using {DBI}
library(DBI)
dbWriteTable(
  sc,
  value = mtcars_tbl,
  name = "<catalog>.<schema>.<table>",
  overwrite = TRUE
)

7.3.5 Reading Data

SparkR

# load Unity Catalog table as SparkDataFrame
tableToDF("<catalog>.<schema>.<table>")

# load csv file into SparkDataFrame
read.df(
  path = "file:/<path/to/read/csv/data.csv>",
  source = "csv",
  header = TRUE,
  inferSchema = TRUE
)

# load delta from local filesystem as SparkDataFrame
read.df(
  path = "file:/<path/to/read/delta/mtcars>",
  source = "delta"
)

# load data from a table using SQL
# recommended to use `tableToDF`
sql("SELECT * FROM <catalog>.<schema>.<table>")

sparklyr

# currently has an issue if using Unity Catalog
# recommend using `tbl` (example below)
spark_read_table(sc, "<catalog>.<schema>.<table>", memory = FALSE)

# load table from Unity Catalog with {dplyr}
tbl(sc, "<catalog>.<schema>.<table>")

# or using `in_catalog`
tbl(sc, in_catalog("<catalog>", "<schema>", "<table>"))

# load csv from local filesystem as tbl_spark
spark_read_csv(
  sc,
  name = "mtcars_csv",
  path = "file:/<path/to/delta/mtcars>",
  header = TRUE,
  infer_schema = TRUE
)

# load delta from local filesystem as tbl_spark
spark_read_delta(
  sc,
  name = "mtcars_delta",
  path = "file:/tmp/test/sparklyr1"
)

# using SQL
sdf_sql(sc, "SELECT * FROM <catalog>.<schema>.<table>")

7.4 Processing Data

7.4.1 Select, Filter

SparkR

# select specific columns
select(mtcars_df, "mpg", "cyl", "hp")

# filter rows where mpg > 20
filter(mtcars_df, mtcars_df$mpg > 20)

sparklyr

# select specific columns
mtcars_tbl |>
  select(mpg, cyl, hp)

# filter rows where mpg > 20
mtcars_tbl |>
  filter(mpg > 20)

7.4.2 Adding Columns

SparkR

# add a new column 'power_to_weight' (hp divided by wt)
withColumn(mtcars_df, "power_to_weight", mtcars_df$hp / mtcars_df$wt)

sparklyr

# add a new column 'power_to_weight' (hp divided by wt)
mtcars_tbl |>
  mutate(power_to_weight = hp / wt)

7.4.3 Grouping & Aggregation

SparkR

# calculate average mpg and hp by number of cylinders
mtcars_df |>
  groupBy("cyl") |>
  summarize(
    avg_mpg = avg(mtcars_df$mpg),
    avg_hp = avg(mtcars_df$hp)
  )

sparklyr

# calculate average mpg and hp by number of cylinders
mtcars_tbl |>
  group_by(cyl) |>
  summarize(
    avg_mpg = mean(mpg),
    avg_hp = mean(hp)
  )

7.4.4 Joins

Suppose we have another dataset with cylinder labels that we want to join to mtcars.

SparkR

# create another SparkDataFrame with cylinder labels
cylinders <- data.frame(
  cyl = c(4, 6, 8),
  cyl_label = c("Four", "Six", "Eight")
)
cylinders_df <- createDataFrame(cylinders)

# join mtcars_df with cylinders_df
join(
  x = mtcars_df,
  y = cylinders_df,
  mtcars_df$cyl == cylinders_df$cyl,
  joinType = "inner"
)

sparklyr

# create another SparkDataFrame with cylinder labels
cylinders <- data.frame(
  cyl = c(4, 6, 8),
  cyl_label = c("Four", "Six", "Eight")
)
cylinders_tbl <- copy_to(sc, cylinders, "cylinders", overwrite = TRUE)

# join mtcars_tbl with cylinders_tbl
mtcars_tbl |>
  inner_join(cylinders_tbl, by = join_by(cyl))

7.5 User Defined Functions (UDFs)

Suppose we want to categorize horsepower into ‘High’ or ‘Low’ based on a threshold

Note

This is an arbitrary example; in practice we would recommend case_when() combined with mutate().

# define custom function
categorize_hp <- function(df) {
  df$hp_category <- ifelse(df$hp > 150, "High", "Low")
  df
}

SparkR

UDFs in {SparkR} require an output schema, which we define first.

# define the schema for the output DataFrame
schema <- structType(
  structField("mpg", "double"),
  structField("cyl", "double"),
  structField("disp", "double"),
  structField("hp", "double"),
  structField("drat", "double"),
  structField("wt", "double"),
  structField("qsec", "double"),
  structField("vs", "double"),
  structField("am", "double"),
  structField("gear", "double"),
  structField("carb", "double"),
  structField("hp_category", "string")
)

To apply this function to each partition of a Spark DataFrame, we use dapply().

# apply function across partitions using dapply
dapply(
  mtcars_df,
  func = categorize_hp,
  schema = schema
)

To apply the same function to each group of a Spark DataFrame, we use gapply(). Note that the schema is still required.

# apply function across groups
gapply(
  mtcars_df,
  cols = "hp",
  func = categorize_hp,
  schema = schema
)

sparklyr

Tip

Highly recommended ‘Distrubuting R Computations’ guide in {sparklyr} docs, it goes into much more detail on spark_apply().

Note

spark_apply() will do it’s best to derive the column names and schema of the output via sampling 10 rows, this can add overhead that can be omitted by specifying the columns parameter.

# ensure that {arrow} is loaded, otherwise may encounter cryptic errors
library(arrow)

# apply the function over data 
# by default applies to each partition
mtcars_tbl |>
  spark_apply(f = categorize_hp)

# apply the function over data 
# Using `group_by` to apply data over groups
mtcars_tbl |>
  spark_apply(
    f = summary,
1    group_by = "hp"
  )
1
In this example group_by isn’t changing the resulting output as the functions behaviour is applied to rows independently. Other functions that operate on a set of rows would behave differently (e.g. summary()).

SparkR::spark.lapply() is unique in that it applies to lists in R, as opposed to DataFrames. There is no exact equivalent in {sparklyr}, but using spark_apply() with a DataFrame with unique IDs and grouping it by ID will behave similarly in many cases, or, more creative functions that operate on a row-wise basis.

SparkR

# define a list of integers
numbers <- list(1, 2, 3, 4, 5)

# define a function to apply
square <- function(x) {
  x * x
}

# apply the function over list using spark
spark.lapply(numbers, square)

sparklyr

# create a spark DataFrame of given length
sdf <- sdf_len(sc, 5, repartition = 1)

# apply function to each partition of data.frame
1spark_apply(sdf, f = nrow)

# apply function to each row (option 1)
2spark_apply(sdf, f = nrow, group_by = "id")

# apply function to each row (option 2)
3row_func <- function(df) {
  df |>
    dplyr::rowwise() |>
    dplyr::mutate(x = id * 2)
}
spark_apply(sdf, f = row_func)
1
spark_apply() defaults to processing data based on number of partitions, in this case it will return a single row due to repartition = 1.
2
To force behaviour like spark.lapply() you can create a DataFrame with N rows and force grouping with group_by set to a unique row identifier (in this case it’s the id column automatically generated by sdf_len()). This will return N rows.
3
This requires writing a function that operates across rows of a data.frame, in some occasions this may be faster relative to (2). Specifying group_by in optional for this example. This example does not require rowwise(), but is just to illustrate one method to force computations to be for every row. Your function should take care to import required packages, etc.

7.6 Machine learning

Full examples for each package can be found in the official reference for {SparkR} and {sparklyr}, respectively.

If not using Spark MLlib it is recommended to use UDFs to train with the library of your choice (e.g. {xgboost}).

7.6.1 Linear regression

SparkR

# select features
training_df <- select(mtcars_df, "mpg", "hp", "wt")

# fit the model using Generalized Linear Model (GLM)
linear_model <- spark.glm(training_df, mpg ~ hp + wt, family = "gaussian")

# view model summary
summary(linear_model)

sparklyr

# select features
training_tbl <- mtcars_tbl |>
  select(mpg, hp, wt)

# fit the model using Generalized Linear Model
linear_model <- training_tbl |>
  ml_linear_regression(response = "mpg", features = c("hp", "wt"))

# view model summary
summary(linear_model)

7.6.2 K-means clustering

SparkR

# apply KMeans clustering with 3 clusters using mpg and hp as features
kmeans_model <- spark.kmeans(mtcars_df, mpg ~ hp, k = 3)

# get cluster predictions
1predict(kmeans_model, mtcars_df)
1
Predicting on input data to keep example simple

sparklyr

# use mpg and hp as features
features_tbl <- mtcars_tbl |>
  select(mpg, hp)

# assemble features into a vector column
features_vector_tbl <- features_tbl |>
  ft_vector_assembler(
    input_cols = c("mpg", "hp"),
    output_col = "features"
  )

# apply K-Means clustering
kmeans_model <- features_vector_tbl |>
  ml_kmeans(features_col = "features", k = 3)

# get cluster predictions
1ml_predict(kmeans_model, features_vector_tbl)
1
Predicting on input data to keep example simple

7.7 Performance and optimization

7.7.1 Collecting

Both {SparkR} and {sparklyr} use the same function name, collect(), to convert Spark DataFrames to R data frames. In general, only collect small amounts of data back to R data frames or the Spark driver will run out of memory, crashing your script (and you want to use Spark to accelerate workloads as much as possible!).

To prevent out of memory errors, {SparkR} has built-in optimizations in Databricks Runtime that help collect data or execute user-defined functions (which also require collecting data to workers). To ensure smooth performance with {sparklyr} for collecting data and UDFs, make sure to load the {arrow} package in your scripts.

# when on Databricks DBR 14.3 or higher {arrow} is pre-installed
library(arrow)

If you encounter issues with collecting large datasets with {sparklyr} the methods documented here may assist, however, hitting this is typically an indicator that you should defer more work to Spark.

7.7.2 In-Memory Partitioning

SparkR

# repartition the SparkDataFrame based on 'cyl' column
repartition(mtcars_df, col = mtcars_df$cyl)

# repartition the SparkDataFrame to number of partitions
repartition(mtcars_df, numPartitions = 10)

# coalesce the SparkDataFrame to number of partitions
coalesce(mtcars_df, numPartitions = 1)

# get number of partitions
getNumPartitions(mtcars_df)

sparklyr

# repartition the tbl_spark based on 'cyl' column
sdf_repartition(mtcars_tbl, partition_by = "cyl")

# repartition the tbl_spark to number of partitions
sdf_repartition(mtcars_tbl, partitions = 10)

# coalesce the tbl_spark to number of partitions
sdf_coalesce(mtcars_tbl, partitions = 1)

# get number of partitions
sdf_num_partitions(mtcars_tbl)

7.7.3 Caching

SparkR

# cache the SparkDataFrame in memory
cache(mtcars_df)

sparklyr

# cache the tbl_spark in memory
tbl_cache(sc, name = "mtcars_tmp")