# Snowflake with Ray Air
By using the Ray Snowflake connector to read and write data into and out of Ray Datasets, all of the capabilities of Ray AIR can be used to build end to end machine learning applications. 

## What is Ray AIR?
Ray AI Runtime (AIR) is a scalable and unified toolkit for ML applications. AIR enables simple scaling of individual workloads, end-to-end workflows, and popular ecosystem frameworks, all in just Python.

![Ray AIR](../../images/ray-air.svg)

AIR builds on Ray’s best-in-class libraries for [Preprocessing](https://docs.ray.io/en/latest/data/dataset.html#datasets), [Training](https://docs.ray.io/en/latest/train/train.html#train-docs), [Tuning](https://docs.ray.io/en/latest/tune/index.html#tune-main), [Scoring](https://docs.ray.io/en/latest/ray-air/predictors.html#air-predictors), [Serving](https://docs.ray.io/en/latest/serve/index.html#rayserve), and [Reinforcement Learning](https://docs.ray.io/en/latest/rllib/index.html#rllib-index) to bring together an ecosystem of integrations.

## ML Compute, Simplified
Ray AIR aims to simplify the ecosystem of machine learning frameworks, platforms, and tools. It does this by leveraging Ray to provide a seamless, unified, and open experience for scalable ML:

![Why Ray AIR](../../images/why-air-2.svg)

1. **Seamless Dev to Prod:** AIR reduces friction going from development to production. With Ray and AIR, the same Python code scales seamlessly from a laptop to a large cluster.

2. **Unified ML API:** AIR’s unified ML API enables swapping between popular frameworks, such as XGBoost, PyTorch, and HuggingFace, with just a single class change in your code.

3. **Open and Extensible:** AIR and Ray are fully open-source and can run on any cluster, cloud, or Kubernetes. Build custom components and integrations on top of scalable developer APIs.

## Ray AIR unifies ML API's
Below are some of the unified ML APIs with wich Ray AIR enables scaling of end-to-end ML workflows, focusing on a few of the popular frameworks AIR integrates with (XGBoost, Pytorch, and Tensorflow).

![Ray AIR Workflow](../../images/why-air.svg)

## Snowflake with Ray AIR and LightGBM
For this example we will show how to train and tune a [distributed LightGBM](https://docs.ray.io/en/master/ray-air/examples/lightgbm_example.html) model with Ray AIR using Snowflake data. We will then show how to score data with the trained model and push the scored data back into another Snowflake table.

### Set up the connector
The first step si to create a connector. All the properties required should be in the connection.yml file already if you have followed the [connection settings guide](connection.ipynb).

In [1]:
import ray
from ray_db.provider.snowflake import SnowflakeConnector

# turn down logging to avoid cluttered output
if not ray.is_initialized():
    ray.init(logging_level='ERROR', runtime_env={'env_vars':{'RAY_DISABLE_PYARROW_VERSION_CHECK': '1'}})

# create the connector
connector = SnowflakeConnector.create()

### Training and Tuning
A typical training or tuning workload will have the following logic when working with tabular data in Snowflake:

![Ray Train with Snowflake](../../images/snowflake_train_with_air.png)

#### Step 1: Stage data in Snowflake
When working with databases, it is best to take advantage of native join and aggregation features of the database prior to ingesting data into Ray Datasets. Ray datasets is designed to power machine learning workflows, and does not provide some typical analytics capabilities like large joins. For these reasons, as first step, the data required for training the model will be staged to a table within Snowflake prior to reading with the Ray Snowflake connector.

 The code below creates a dataset of customer returns data from several Snowflake sample tables. We will use this data throughtout the train, tune and scoring process. In the code below, we use the Ray Snowflake connector to run DDL and DML, but you could just as easily use the Snowflake Python API. By using the Ray connector, the connection logic is unified in a single API.

> Note: The data set size is set to be small to keep execution times small. If you would like to try larger dataset size, increase the `DATASET_SIZE` and be sure to have a large enough cluster defined.

> NOTE: This table may have already been been loaded to the customer_returns table in when setting up sample data previously.

In [2]:
SIZE = 1000000
SRC = 'SNOWFLAKE_SAMPLE_DATA.TPCDS_SF10TCL'
print(f'creating table CUSTOMER_RETURNS')
with connector:
    connector.query(f"""
            CREATE TABLE IF NOT EXISTS CUSTOMER_RETURNS as (   
                WITH cstmrs as (
                        SELECT 
                            c_customer_sk as c_customer_sk, 
                            c_current_cdemo_sk as c_current_cdemo_sk
                        FROM {SRC}.customer LIMIT {SIZE}),
                    
                    sales as (
                        SELECT 
                            c_customer_sk, 
                            COUNT(c_customer_sk) as n_sales 
                        FROM cstmrs JOIN {SRC}.store_sales ON c_customer_sk = ss_customer_sk
                        GROUP BY c_customer_sk),
                    
                    rtrns as (
                        SELECT 
                            c_customer_sk, 
                            COUNT(c_customer_sk) as n_returns 
                        FROM cstmrs JOIN {SRC}.store_returns ON c_customer_sk = sr_customer_sk
                        GROUP BY c_customer_sk)
                        
                SELECT
                    cstmrs.c_customer_sk as customer_sk,
                    ZEROIFNULL(n_sales) as n_sales,
                    ZEROIFNULL(n_returns) as n_returns,
                    IFF(n_sales is null or n_sales = 0 or n_returns is null, 0, n_returns/n_sales) as return_probability,
                    demos.* 
                FROM cstmrs 
                JOIN {SRC}.customer_demographics as demos ON cstmrs.c_current_cdemo_sk = demos.cd_demo_sk
                LEFT OUTER JOIN sales on cstmrs.c_customer_sk = sales.c_customer_sk
                LEFT OUTER JOIN rtrns on cstmrs.c_customer_sk = rtrns.c_customer_sk
            )
    """)

creating table CUSTOMER_RETURNS


#### Step 2: Read data into Ray
Now that we have created the data, we can read it with the connector. Since our data was stroed in a different databse and schema, pass these new values to the connector create method.

In [3]:
with connector:
    ds = connector.read_table('CUSTOMER_RETURNS')
    
ds.limit(10).to_pandas()

Read progress: 100%|██████████| 1/1 [00:02<00:00,  2.75s/it]


Unnamed: 0,CUSTOMER_SK,N_SALES,N_RETURNS,RETURN_PROBABILITY,CD_DEMO_SK,CD_GENDER,CD_MARITAL_STATUS,CD_EDUCATION_STATUS,CD_PURCHASE_ESTIMATE,CD_CREDIT_RATING,CD_DEP_COUNT,CD_DEP_EMPLOYED_COUNT,CD_DEP_COLLEGE_COUNT
0,1650347,389,41,0.105398,187143,M,S,2 yr Degree,7000,Low Risk,5,4,0
1,34455966,824,74,0.089806,187144,F,S,2 yr Degree,7000,Low Risk,5,4,0
2,34435622,360,36,0.1,187145,M,D,2 yr Degree,7000,Low Risk,5,4,0
3,34491426,374,35,0.093583,187145,M,D,2 yr Degree,7000,Low Risk,5,4,0
4,1779004,375,42,0.112,187145,M,D,2 yr Degree,7000,Low Risk,5,4,0
5,7447500,380,51,0.134211,187146,F,D,2 yr Degree,7000,Low Risk,5,4,0
6,2446212,397,42,0.105793,187147,M,W,2 yr Degree,7000,Low Risk,5,4,0
7,7396410,369,46,0.124661,187149,M,U,2 yr Degree,7000,Low Risk,5,4,0
8,2411252,401,34,0.084788,187149,M,U,2 yr Degree,7000,Low Risk,5,4,0
9,34540383,367,36,0.098093,187151,M,M,4 yr Degree,7000,Low Risk,5,4,0


#### Step 3: Train
Now that the data is read into a Ray dataset, we can use it to train or tune a LighGBM model. 

**Prepare the data**

After reading the data, it is likely neccesary to do some simple manipualtions, like dropping columns  splitting it into training and test sets.

In [4]:
DROP_COLUMNS = ['N_SALES', 'N_RETURNS', 'CD_DEMO_SK']

ds = ds.drop_columns(DROP_COLUMNS).repartition(100)
train_dataset, valid_dataset = ds.train_test_split(test_size=0.3)

Read->Map_Batches: 100%|██████████| 9/9 [00:03<00:00,  2.40it/s]
Repartition: 100%|██████████| 100/100 [00:02<00:00, 34.58it/s]


**Create preprocessors**

In Ray Air, all trainers, tuners and predcitors allow for the addition of preprocessors. Preprocessors help to featurize data, by providing common operations like on-hot-encoding, categorizing, scaling, etc. For more on the available preprocessors, read the [RayAIR docs](https://docs.ray.io/en/latest/ray-air/package-ref.html#preprocessor). The code below will use a chain of pre-processors. The `BatchMapper` will drop the ID column so it wont be used when training. The `Categorizer` will categorize columns, and the `StandardScaler` will scale columns. All of the pre-processing logic only modifes the data as it is being passed into training algorithms, and the underlying dataset will remain the same.

In [5]:
from ray.data.preprocessors import Chain, BatchMapper, Categorizer, StandardScaler

ID_COLUMN = 'CUSTOMER_SK'
CATEGORICAL_COLUMNS = ['CD_GENDER', 'CD_MARITAL_STATUS', 'CD_EDUCATION_STATUS', 'CD_CREDIT_RATING']
SCALAR_COLUMNS = ['CD_PURCHASE_ESTIMATE', 'CD_DEP_COUNT', 'CD_DEP_EMPLOYED_COUNT', 'CD_DEP_COLLEGE_COUNT']

# Scale some random columns, and categorify the categorical_column,
# allowing LightGBM to use its built-in categorical feature support
preprocessor = Chain(
    BatchMapper(lambda df: df.drop(ID_COLUMN, axis=1)),
    Categorizer(CATEGORICAL_COLUMNS), 
    StandardScaler(columns=SCALAR_COLUMNS)
)

**Configure scaling**

Training requires compute infrastructure, and specifying what type is needed to optimize your training time and costs. When first beginning, it is best to start with a small dataset size and compute to get things working and then scale up data and compute together. Below we create a `ScalingConfig` that provides 10 workers for distributed trianing. This will likely keep training on a single instance. We also don't request GPU's.

In [6]:
from ray.air.config import ScalingConfig

scaling_config=ScalingConfig(num_workers=10, use_gpu=False),

**Create a trainer**

Now that we have everything required, we can create a trainer. In Ray AIR, the logic to create a trainer and fit it are very simliar. The main differences are in the parameters passed to the algorithm. This makes it easy to swap out algorithms. For example, swapping LightGBM for XGBoost, or even PyTorch tabular, will typically be just a few lines of code.

In [7]:
from ray.train.lightgbm import LightGBMTrainer

TARGET_COLUMN = 'RETURN_PROBABILITY'

# LightGBM specific params
params = {
    "objective": "regression",
    "metric": ["rmse", "mae"],
}

trainer = LightGBMTrainer(
    scaling_config=ScalingConfig(num_workers=2, use_gpu=False),
    label_column=TARGET_COLUMN,
    params=params,
    datasets={"train": train_dataset, "valid": valid_dataset},
    preprocessor=preprocessor,
    num_boost_round=10
)

**Fit the model**

Now that the trainer is defined, al that is required is to call fit to begin the training process. The `fit` method will return a results object that containes the model checkpoint as well as model training metrics.

In [8]:
result = trainer.fit()

Trial name,status,loc,iter,total time (s),train-rmse,train-l1,valid-rmse
LightGBMTrainer_89c60_00000,TERMINATED,10.0.247.141:2203,11,12.5248,0.0269531,0.0183303,0.02693


(_RemoteRayLightGBMActor pid=2522, ip=10.0.247.141) [LightGBM] [Info] Trying to bind port 60181...
(_RemoteRayLightGBMActor pid=2522, ip=10.0.247.141) [LightGBM] [Info] Binding port 60181 succeeded
(_RemoteRayLightGBMActor pid=2522, ip=10.0.247.141) [LightGBM] [Info] Listening...
(_RemoteRayLightGBMActor pid=2522, ip=10.0.247.141) [LightGBM] [Info] Connected to rank 1
(_RemoteRayLightGBMActor pid=2522, ip=10.0.247.141) [LightGBM] [Info] Local rank: 0, total number of machines: 2
(_RemoteRayLightGBMActor pid=2523, ip=10.0.247.141) [LightGBM] [Info] Trying to bind port 58927...
(_RemoteRayLightGBMActor pid=2523, ip=10.0.247.141) [LightGBM] [Info] Binding port 58927 succeeded
(_RemoteRayLightGBMActor pid=2523, ip=10.0.247.141) [LightGBM] [Info] Listening...
(_RemoteRayLightGBMActor pid=2523, ip=10.0.247.141) [LightGBM] [Info] Connected to rank 0
(_RemoteRayLightGBMActor pid=2523, ip=10.0.247.141) [LightGBM] [Info] Local rank: 1, total number of machines: 2
Result for LightGBMTrainer_89c60

### Score a model
Once there is a trained model, we can use it to score data. The flow for training and scoring are similar in that data is staged in Snowflake and read into a Ray dataset with the connector. Once the data is read in, the previously created model checkpoint can be used to creat a batch predictor for scoring. Scored data can then be written back into Snowflake with the connector. 

The typical logical flow for a batch scoring in Snowflake with Ray AIR is the following:

![Snowflake batch scoring](../../images/snowflake_score_with_air.png)

#### Steps 1-2: Stage and read data
Since the data has already been staged and loaded, we dont need any extra code to do that now. Typically, you will have a script for training, and a script for scoring that will be run independently. THe staging and loading of data should be sperated into a shared script that can be used by each of these workflows.

#### Step 3: Score the data
The previously trained checkpoint can be used to create a predictor. This predictor will already contain the pre-processors used to train the model. All that is needed is to drop the target column before feeding it into the model to simulate a real dataset where we dont know the results.

> Note: Typically model checkpoints will be stored in a model registry provided by Weights and Biases or MLFlow, or into an objects store like S3. Checkpoints are written and read using the [checkpoint API](https://docs.ray.io/en/latest/ray-air/package-ref.html#ray.air.checkpoint.Checkpoint).

In [9]:
from ray.train.batch_predictor import BatchPredictor
from ray.train.lightgbm import LightGBMPredictor

predictor = BatchPredictor.from_checkpoint(
    result.checkpoint, LightGBMPredictor
)

test_dataset = valid_dataset.drop_columns(TARGET_COLUMN)
predictions = predictor.predict(test_dataset, keep_columns=[ID_COLUMN])
predictions.limit(10).to_pandas()

Map_Batches: 100%|██████████| 30/30 [00:03<00:00,  8.67it/s]
Map Progress (1 actors 1 pending): 100%|██████████| 30/30 [00:04<00:00,  6.46it/s]


Unnamed: 0,predictions,CUSTOMER_SK
0,0.102189,1639066
1,0.102189,34453320
2,0.102316,7465318
3,0.102374,50443610
4,0.102374,2535927
5,0.102374,1663533
6,0.102374,7505486
7,0.102374,7398215
8,0.102374,2570281
9,0.10232,1725142


#### Step 4: Write data to Snowflake
Now that we have the predictions we can write them into a SNowflake table.

In [10]:
import os
userid = os.getenv('ANYSCALE_EXPERIMENTAL_USERNAME')
table = 'CUSTOMER_PREDICTIONS_'+userid.upper()

# create a table
print('creating table '+table)
with connector:
    connector.query(f'''
        CREATE OR REPLACE TABLE {table} (
            PREDICTIONS FLOAT NOT NULL,
            CUSTOMER_SK VARCHAR(20) NOT NULL
	    )
    ''')

# write to the table    
with connector:
    connector.write_table(predictions, table)
    
with connector:
    ds2 = connector.read_table(table)
    
ds2.limit(10).to_pandas()

creating table CUSTOMER_PREDICTIONS_ERIC_GREENE_A9C7590


Write Progress: 100%|██████████| 30/30 [00:05<00:00,  5.48it/s]
Read progress: 100%|██████████| 1/1 [00:00<00:00, 896.99it/s]


Unnamed: 0,PREDICTIONS,CUSTOMER_SK
0,0.102312,7496063
1,0.102312,7562373
2,0.102331,7532594
3,0.102384,7436509
4,0.102313,7555153
5,0.102235,7576147
6,0.102235,34472610
7,0.102311,7559201
8,0.102312,34549247
9,0.102312,1829286
