Compartir a través de


How to use revoscalepy in a Spark compute context

Important

This content is being retired and may not be updated in the future. The support for Machine Learning Server will end on July 1, 2022. For more information, see What's happening to Machine Learning Server?

This article introduces Python functions in a revoscalepy package with Apache Spark (Spark) running on a Hadoop cluster. Within a Spark cluster, Machine Learning Server leverages these components:

  • Hadoop distributed file system for finding and accessing data.
  • Yarn for job scheduling and management.
  • Spark as the processing framework (versions 2.0-2.1).

The revoscalepy library provides cluster-aware Python functions for data management, predictive analytics, and visualization. When you set the compute context to rx-spark-connect, revoscalepy f automatically distributes the workload across all the data nodes. There is no overhead in managing jobs or the queue, or tracking the physical location of data in HDFS; Spark does both for you.

Note

For installation instructions, see Install Machine Learning Server for Hadoop.

Start Python

On your cluster's edge node, start a session by typing mlserver-python at the command line.

Local compute context on Spark

By default, the local compute context is the implicit computing environment. All mlserver-python code runs here until you specify a remote compute context.

Remote compute context on Spark

From the edge node, you can push computations to the data layer by creating a remote Spark compute context. In this context, execution is on all data nodes.

The following example shows how to set a remote compute context to clustered data nodes, execute functions in the Spark compute context, switch back to a local compute context, and disconnect from the server.

# Load the functions
from revoscalepy import RxOrcData, rx_spark_connect, rx_spark_list_data, rx_lin_mod, rx_spark_cache_data

# Create a remote compute contenxt 
cc = rx_spark_connect()

# Create a col_info object specfiying the factors
col_info = {"DayOfWeek": {"type": "factor"}}

# Load data, factored and cached.
df = RxOrcData(file = "/share/sample_data/AirlineDemoSmallOrc", column_info = col_info)
df = rx_spark_cache_data(df, True)

# After the first run, a Spark data object is added into the list
rx_lin_mod("ArrDelay ~ DayOfWeek", data = df)
rx_spark_list_data(True)

# Disconnect. Switches back to a local compute context.
rx_spark_disconnect(cc)
rx_get_compute_context()

Specify a data source and location

As part of execution in Spark, your data source must be a file format that Spark understands, such as text, Hive, Orc, and Parquet. You can also create and consume .xdf files, a data file format native to Machine Learning Server that you can read or write to from both Python and R script.

Data source objects provided by revoscalepy in a Spark compute context include RxTextData RxXdfData, and the RxSparkData with derivatives for RxHiveData, RxOrcData, RxParquetData and RxSparkDataFrame.

Create a data source

The following example illustrates an Xdf data source object that pulls data from a local sample directory created when you install Machine Learning Server. The "sampleDataDir" argument is a reference to the sampleDataDir folder, known to revoscalepy.

import os
import revoscalepy

sample_data_path = revoscalepy.RxOptions.get_option("sampleDataDir")
d_s = revoscalepy.RxXdfData(os.path.join(sample_data_path, "AirlineDemoSmall.xdf"))

Import data into a data frame

Data is automatically loaded into a data frame even without rx_import, but you can load it explicitly using the rx_import, which is useful if you want to include parameters.

In mlserver-python, you can use head and tail functions, similar to R, to return the first or last part of the data set.

airlinedata = rx_import(input_data = d_s, outFile="/tmp/airExample.xdf")
airlinedata.head()

Summarize data

To quickly understand fundamental characteristics of your data, use the rx_summary function to return basic statistical descriptors. Mean, standard deviation, and min-max values. A count of total observations, missing observations, and valid observations is included.

A minimum specification of the rx_summmary function consists of a valid data source object and a formula giving the fields to summarize. The formula is symbolic, providing variables used in the model. and typically does not contain a response variable. It should be of the form of ~ terms.

Tip

Get the term list from a data source to see what is available: revoscalepy.rx_get_var_names(data_source)

import os
from revoscalepy import rx_summary, RxOptions, RxXdfData
sample_data_path = RxOptions.get_option("sampleDataDir")
ds = RxXdfData(os.path.join(sample_data_path, "AirlineDemoSmall.xdf"))
summary = rx_summary("ArrDelay+DayOfWeek", ds)
print(summary)

Create models

The following example produces a linear regression, followed by predicted values for the linear regression model.

# Linear regression
import os
import tempfile
from revoscalepy import RxOptions, RxXdfData, rx_lin_mod

sample_data_path = RxOptions.get_option("sampleDataDir")
in_mort_ds = RxXdfData(os.path.join(sample_data_path, "mortDefaultSmall.xdf"))

lin_mod = rx_lin_mod("creditScore ~ yearsEmploy", in_mort_ds)
print(lin_mod)
# Add predicted values
import os
from revoscalepy import RxOptions, RxXdfData, rx_lin_mod, rx_predict, rx_data_step

sample_data_path = RxOptions.get_option("sampleDataDir")
mort_ds = RxXdfData(os.path.join(sample_data_path, "mortDefaultSmall.xdf"))
mort_df = rx_data_step(mort_ds)

lin_mod = rx_lin_mod("creditScore ~ yearsEmploy", mort_df)
pred = rx_predict(lin_mod, data = mort_df)
print(pred.head())

See Also