Applied Predictive Maintenance

Part 6 of 6: "Continuous Applied Predictive Maintenance with Snowflake and Snowpark"

Author: Josh Patterson
Date: May 24th, 2022

Other entries in this series:

Introduction

In the previous article (Part 5) in this series, we validated that the best model did meet the business requirements set by the business unit.

In this article we put our pilot project into production with Snowpark to scalably deliver predicted machine failures to the maintenance teams each day.

Key Take Aways:

  1. Introduction to the Snowpark API
  2. Building and running code to run Snowpark dataframes inside Snowflake
  3. Using a PMML Model to score machine sensor data inside Snowflake

Other relevant articles on our blog:

In Part 4 of this series we built an XGBoost model and saved it as PMML. In this article we'll take the saved PMML model and run it inside the Snowflake cloud database to score data in the cloud. We'll then build a quick dashboard that displays the daily Top 18 most likely to fail machines.

With that out of the way, let's dig into Snowpark.

What is Snowpark

Snowpark is an API created by Snowflake that is based on data frames and provides its own classes and methods. These classes and methods can be used to assist in transferring and working with your data within the Snowflake database. In order to do this, the API provides language constructs for building SQL statements that will be used on the server. For example, rather than writing ‘select column_name’ as a string to select a column of data, you can simply use a select method provided by the Snowpark API. You may still use the string statement to work with your data, if you wish to do so, but by using the constructs Snowpark provides, you also get the added benefit of Snowpark’s intelligent code completion, which may help when first getting to know Snowpark’s syntax.

By using the Snowpark API, client applications can process data within the Snowflake database without needing to move their data to the same location as the client application. They can simply connect to the Snowpark API by adding the API libraries to their client code, which is great for those wanting to process large sets of data for machine learning or artificial intelligence projects.

Once you connect to the Snowpark API, you will be able to construct a Snowpark DataFrame to store the data you want to use for the project. Snowpark DataFrames are executed lazily, so Snowpark will only retrieve and operate on the defined range of data when you actively choose to run the client code. Lazy execution is a great perk of this service as it saves time and money when you want to use and pay for Snowflake’s resources on a smaller section of the data rather than the entire dataset. The DataFrame class also provides a set of methods you can use on the created DataFrame, giving the user the flexibility to use built-in functions or create their own user-defined functions (UDFs) to upload to Snowflake, which we will briefly cover below.

If you have a specific task you need to perform on your data that doesn’t have a Snowpark equivalent, the Snowpark API also supports the use of UDFs in the programming language of your choice to create custom functions you need for your project. Snowflake explains how to perform this process from a lambda or function in Scala in this section of their documentation. You don’t need to transfer the data back to your client from Snowflake. Simply write the function on your client, push the code to the server through Snowflake, and use it to work with your data in Snowflake.

Snowpark supports anonymous UDFs, where you can assign and call the function using a variable, and named UDFs, which uses a custom name.

Deploying Our XGBoost Model to Snowflake with Snowpark

The steps to use our model to score new data in a Snowflake database are:

  • Save model as PMML
  • Set up Snowpark API
  • Write UDF code for Snowpark to use PMML model
  • Run job via Snowpark Session and save predictions
In this section we'll walk through all of the above while referencing our Snowpark code specficially.

If you'd like to try out the code for yourself check out the following repository:

git clone https://github.com/pattersonconsulting/predictive_maintenance.git

Now let's dig into how to use the Snowpark API to work with data in Snowflake.

Using the Snowpark API to Process Data in Snowflake

Our functional goal is to use the model to make a machine failure prediction based on each row in the table SUMMARY_SENSOR_DATA_TEST. To do this we need to use the snowpark API. We'll create a Snowpark session and a dataframe for the table SUMMARY_SENSOR_DATA_TEST.

We then load our model via pmml4s (PMML implementation for scala) to load our pmml model file. Once we have our Snowpark table data represented by a snowpark dataframe, we can apply a UDF in a functional programming-style that makes a prediction for each record in the table. The results are stored back in new table linking the machine ID to the failure score predicted by the model.

Scaling Up Batch Inference with UDFs

Dataframes processed with UDFs in a functional style will scale well as the size of the input table grows large.

Building a Scala PMML Wrapper for our Model

Previously we saved the XGBoost model as a pmml file. Let's now take that file and package it in a jar file so that snowflake can upload it as a dependency.

jar cfm pm_pmml_model.jar pmml/pm_sklearn_xgb.jar

Writing Snowpark Code to Run Our Model on Snowflake

Below we can see the main function of our scala code that will run inside Snowflake via Snowpark:

  def main(args: Array[String]): Unit = {
    
    Console.println("\n=== Creating the session ===\n")

    val session = Session.builder.configFile("conn.properties").create

    val df_raw_device_data_test = session.table("SUMMARY_SENSOR_DATA_TEST") //.select(col())
    df_raw_device_data_test.show()

    val libPath = new java.io.File("").getAbsolutePath
    println(libPath)
    
    session.addDependency(s"$libPath/lib/pmml4s_2.12-0.9.11.jar")
    session.addDependency(s"$libPath/lib/spray-json_2.12-1.3.5.jar")
    session.addDependency(s"$libPath/lib/scala-xml_2.12-1.2.0.jar")
    // have to wrap pmml in a jar for this to work
    session.addDependency(s"$libPath/lib/pm_pmml_model.jar")

    session.sql("drop function if exists model_PM_UDF_predict(DOUBLE);").show()
    session.udf.registerTemporary("model_PM_UDF_predict", UDFCode.predictFailureScoreUDF)    
    session.sql("SHOW USER FUNCTIONS;").show()

    val df_machines_scored = df_raw_device_data_test.withColumn("FAILURE_SCORE", callUDF("model_PM_UDF_predict", col("TYPE"), col("AIR_TEMPERATURE"), col("PROCESS_TEMPERATURE"), col("ROTATIONAL_SPEED"), col("TORQUE"), col("TOOL_WEAR"))) 
    df_machines_scored.show()
    df_machines_scored.write.mode(SaveMode.Overwrite).saveAsTable("DAILY_SCORED_MACHINES")

    Console.println("\n=== CLOSING the session ===\n")

    session.close();

  }

The above code section represents a "typical" Snowpark API main() entry point to drive your analysis code. In the following line, you can see where we register our specific UDF with the Snowpark API:

session.udf.registerTemporary("model_PM_UDF_predict", UDFCode.predictFailureScoreUDF)    

Once we have this UDF registered with Snowpark, we can use it in our Snowpark API dataframe code:

val df_machines_scored = df_raw_device_data_test.withColumn("FAILURE_SCORE", callUDF("model_PM_UDF_predict", col("TYPE"), col("AIR_TEMPERATURE"), col("PROCESS_TEMPERATURE"), col("ROTATIONAL_SPEED"), col("TORQUE"), col("TOOL_WEAR"))) 

The callUDF("model_PM_UDF_predict... part of the dataframe API calls our UDF code and passes in the column data. We can see our UDF code below.


  object UDFCode extends Serializable {

    lazy val udf_model = {

      import java.io._

      val resourceName = "/pmml/pm_sklearn_xgb.pmml" // this is the path of the pmml in the .jar
      val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
      Model.fromInputStream(inputStream)

    }

    val predictFailureScoreUDF = (TYPE_col: String, AIR_TEMPERATURE: Double, PROCESS_TEMPERATURE: Double, ROTATIONAL_SPEED: Double, TORQUE: Double, TOOL_WEAR: Double) => {
      udf_model.predict(Map("TYPE" -> TYPE_col, "AIR_TEMPERATURE" -> AIR_TEMPERATURE, "PROCESS_TEMPERATURE" -> PROCESS_TEMPERATURE, "ROTATIONAL_SPEED" -> ROTATIONAL_SPEED, "TORQUE" -> TORQUE, "TOOL_WEAR" -> TOOL_WEAR)).get("probability(1)").get.asInstanceOf[Double]
    }

  }

Our UDFCode class needs to extend Serializable and inside the class, we only want the UDF to instantiate the PMML model once, we we tag the udf_model variable with the lazy keyword.

As we can see, this function loads the pmml file from the jar class we packed it in for transport to Snowflake.

The method predictFailureScoreUDF is where we call .predict() on our pmml model, passing in the column data from the dataframe, and returning the output from the model as the output of the UDF.

Advantages of Using PMMLPileline

Back in Part 4 of this series, at the end of the Jupyter Notebook we save the model as a PMML file. As part of this process, we wrap the xgboost model in a PMMLPipeline (from sklearn2pmml) object that includes the DataFrameMapper instance (from sklearn_pandas) used to transform the training and test data.

This all comes in handy at this point in our Scala UDF, because we can now feed the model raw column data and let our pmml model (with the built in data transforms) take care of the vector transforms for us before it makes the prediction.

Once we have our code the way we want it, we need to connect to Snowflake with the Snowpark API.

Running the Snowpark Code

To connect to Snowflake via the Snowpark API with scala we need to create a session:

    val session = Session.builder.configFile("conn.properties").create

If we take a look at the contents of the conn.properties file we see:

URL = https://nna57244.us-east-1.snowflakecomputing.com
USER = your_user_name
PASSWORD = your_password
#
# Note: To use key-pair authentication instead of a password,
# comment out (or remove) PASSWORD. Then, uncomment PRIVATE_KEY_FILE
# and set it to the path to your private key file.
# PRIVATE_KEY_FILE = 
#
# If your private key is encrypted, you must also uncomment
# PRIVATE_KEY_FILE_PWD and set it to the passphrase for decrypting the key.
# "PRIVATE_KEY_FILE_PWD" -> "",
#
ROLE = SYSADMIN
WAREHOUSE = ANALYTICS_WH
DB = PREDICTIVE_MAINTENANCE
SCHEMA = PUBLIC

There are multiple ways to run scala code locally and one of the most popular is SBT.

Snowpark Dependencies

There are multiple languages supported for Snowpark, but if you are going to use SBT with scala then install snowpark as dependency in build.sbt. Your versions may vary, but below are the dependencies we used with this version of the code.


scalaVersion := "2.12.13"

libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.2"

libraryDependencies += "com.snowflake" % "snowpark" % "0.11.0"

libraryDependencies += "org.pmml4s" %%  "pmml4s" % "0.9.11"

Once the above code is compiled by SBT (or your favorite tool) and run locally, Snowpark connects to Snowflake and compiles the code down to SQL that Snowflake understands natively, as shown below.

[run-main-0] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: 01a0dc6d-0501-7279-004e-7f83000691ae] CREATE OR REPLACE TABLE DAILY_SCORED_MACHINES AS SELECT * FROM ( SELECT "UDI", "PRODUCT_ID", "TYPE", "AIR_TEMPERATURE", "PROCESS_TEMPERATURE", "ROTATIONAL_SPEED", "TORQUE", "TOOL_WEAR", "MACHINE_FAILURE", "TWF", "HDF", "PWF", "OSF", "RNF", model_PM_UDF_predict("TYPE", "AIR_TEMPERATURE", "PROCESS_TEMPERATURE", "ROTATIONAL_SPEED", "TORQUE", "TOOL_WEAR") AS "FAILURE_SCORE" FROM ( SELECT * FROM (SUMMARY_SENSOR_DATA_TEST)))

Note the model_PM_UDF_predict function being referenced inline in the SQL. This is referencing our scala UDF that was uploaded to stage for the snowflake table.

We can see the results of the snowpark UDF operation in the new table DAILY_SCORED_MACHINES as shown below.

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |"UDI" |"PRODUCT_ID" |"TYPE" |"AIR_TEMPERATURE" |"PROCESS_TEMPERATURE" |"ROTATIONAL_SPEED" |"TORQUE" |"TOOL_WEAR" |"MACHINE_FAILURE" |"TWF" |"HDF" |"PWF" |"OSF" |"RNF" |"FAILURE_SCORE" | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |1 |M14860 |M |298.1 |308.6 |1551.0 |42.8 |0.0 |0 |0 |0 |0 |0 |0 |0.0016097500313497588 | |2 |L47181 |L |298.2 |308.7 |1408.0 |46.3 |3.0 |0 |0 |0 |0 |0 |0 |0.002551409878500267 | |3 |L47182 |L |298.1 |308.5 |1498.0 |49.4 |5.0 |0 |0 |0 |0 |0 |0 |0.0020588162930308294 | |4 |L47183 |L |298.2 |308.6 |1433.0 |39.5 |7.0 |0 |0 |0 |0 |0 |0 |0.002986053042197225 | |5 |L47184 |L |298.2 |308.7 |1408.0 |40.0 |9.0 |0 |0 |0 |0 |0 |0 |0.002335747891485407 | |6 |M14865 |M |298.1 |308.6 |1425.0 |41.9 |11.0 |0 |0 |0 |0 |0 |0 |0.0027666157713178356 | |7 |L47186 |L |298.1 |308.6 |1558.0 |42.4 |14.0 |0 |0 |0 |0 |0 |0 |0.0016097500313497588 | |8 |L47187 |L |298.1 |308.6 |1527.0 |40.2 |16.0 |0 |0 |0 |0 |0 |0 |0.0016097500313497588 | |9 |M14868 |M |298.3 |308.7 |1667.0 |28.6 |18.0 |0 |0 |0 |0 |0 |0 |0.0025132880609650995 | |10 |M14869 |M |298.5 |309.0 |1741.0 |28.0 |21.0 |0 |0 |0 |0 |0 |0 |0.0019359489653722097 | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

We'd want to run this table scan at the end of each workday to update the FAILURE_SCORE field for each machine being tracked. This gives the maintenance crew a list of machines they need to focus on for their nightly maintenance run.

We should not have to rebuild the model for this population of machines unless new types of brands of machines were added to the manufacturing lines (potentially changing the distribution of the data). However, checking the distributions with some EDA work would not be that hard.

Let's now move on to providin the report on these specific 18 machines most likely to fail to the maintenance team.

Wiring Up a Daily Machine Scan

To round out operationalizing this pilot program, we'd likely take the scala code and then have a cron job on a schedule execute the scala code each day. This would have the snowpark API code take the most current pmml model and scan the summarized sensor data. In this setup, each day as the minutes per machine accumulated, the model would pick up the changes and make nuanced predictions for which machines were most likely to fail.

Deploying a Notebook-Based Dashboard for the Predictive Maintenance Team

The maintenance team just needs a simple daily report with a list of machine numbers to go service to execute the predictive maintenance program.

The notebook the data science team provides is shown below.

We can see the top 4 rows from the embedded notebook above in the table below:

UDIFAILURE_SCORE
44630.988231
44180.985929
10170.984222
44220.984187

Looking for Analytics Help?

Our team can help -- we help companies with Snowflake analytics.

Summary and Next Steps

In this series we analyzed the core business case for ACME manufacturing and how predictive modeling affected the financial health of their business under different scenarios.

Further, we analyzed the raw data, built a predictive model, and deployed it to the cloud to provide the line of business with operational intelligence.

If you would like to get a free report on the potential value of a specific machine learning use for your business, please reach out and contact us.