Prepare sample data#

In order to run the sample code in the user guide and ray air guide, sample data is required to be created if the Snowflake sample database is not available, a small set of data files can be used to manually create the tables in Snowflake. The code below only needs to be run once per database.

Download sample data#

Run the below commands to download the sample data.

! wget https://github.com/anyscale/ray_db/raw/main/sample_data/customer.parquet -P /tmp
! wget https://github.com/anyscale/ray_db/raw/main/sample_data/customer_returns.parquet -P /tmp

Create tables in Snowflake#

First, create the tables in Snowflake that will hold the new data.

import os, ray, pandas
from ray_db.provider.snowflake import SnowflakeConnector

if not ray.is_initialized():
    ray.init(logging_level='ERROR')

# add custom connect properties here like schema or database
connector = SnowflakeConnector.create()

path = os.path.abspath('/tmp/customer.parquet')
print('loading:' + path)
ds = ray.data.from_pandas(pandas.read_parquet(path))
#ds = ray.data.from_arrow_refs(ds.to_arrow_refs())

table = f'CUSTOMER'
with connector:
    print('creating table:' + path)
    connector.query(f'''
        CREATE OR REPLACE TABLE {table} (
	        C_CUSTKEY NUMBER(38,0) NOT NULL,
	        C_NAME VARCHAR(25) NOT NULL,
	        C_ADDRESS VARCHAR(40) NOT NULL,
	        C_NATIONKEY NUMBER(38,0) NOT NULL,
	        C_PHONE VARCHAR(15) NOT NULL,
	        C_ACCTBAL NUMBER(12,2) NOT NULL,
	        C_MKTSEGMENT VARCHAR(10),
	        C_COMMENT VARCHAR(117)
    )
    ''')
    print('writing table:' + table)
    connector.write_table(ds, table)
path = os.path.abspath('/tmp/customer_returns.parquet')
print('loading:' + path)
ds = ray.data.from_pandas(pandas.read_parquet(path))

table = f'CUSTOMER_RETURNS'
with connector:
    print('creating table:' + path)
    connector.query(f'''
        CREATE OR REPLACE TABLE {table} (
			CUSTOMER_SK NUMBER(38,0),
			N_SALES NUMBER(18,0),
			N_RETURNS NUMBER(18,0),
			RETURN_PROBABILITY NUMBER(24,6),
			CD_DEMO_SK NUMBER(38,0),
			CD_GENDER VARCHAR(1),
			CD_MARITAL_STATUS VARCHAR(1),
			CD_EDUCATION_STATUS VARCHAR(20),
			CD_PURCHASE_ESTIMATE NUMBER(38,0),
			CD_CREDIT_RATING VARCHAR(10),
			CD_DEP_COUNT NUMBER(38,0),
			CD_DEP_EMPLOYED_COUNT NUMBER(38,0),
			CD_DEP_COLLEGE_COUNT NUMBER(38,0)
		)
    ''')
    print('writing table:' + table)
    connector.write_table(ds, table)

Create local data from Snowflake Samples#

The code below was used to create local parquet files that can be used to create Snowflake tables.

Note: This step is not neeeded unless you wish to recreate the local data used to setup the database. You will need access to Snowflake sample tables to run this.

from ray_db.provider.snowflake import SnowflakeConnector
import os

path = os.path.abspath('/tmp')
os.makedirs(path, exist_ok=True) 

filepath = os.path.join(path, 'customer.parquet')
with connector:
    print('reading table '+table)
    ds = connector.read_table('SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER')
    ds.to_pandas().to_parquet(filepath)
    print('writing '+ filepath)
table = f'CUSTOMER_RETURNS_1000000'
src = 'SNOWFLAKE_SAMPLE_DATA.TPCDS_SF10TCL'
print(f'creating table {table}')
with connector:
        connector.query(f"""
            CREATE TABLE IF NOT EXISTS {table} as (   
                WITH cstmrs as (
                        SELECT 
                            c_customer_sk as c_customer_sk, 
                            c_current_cdemo_sk as c_current_cdemo_sk
                        FROM {src}.customer LIMIT 1000000),
                    
                    sales as (
                        SELECT 
                            c_customer_sk, 
                            COUNT(c_customer_sk) as n_sales 
                        FROM cstmrs JOIN {src}.store_sales ON c_customer_sk = ss_customer_sk
                        GROUP BY c_customer_sk),
                    
                    rtrns as (
                        SELECT 
                            c_customer_sk, 
                            COUNT(c_customer_sk) as n_returns 
                        FROM cstmrs JOIN {src}.store_returns ON c_customer_sk = sr_customer_sk
                        GROUP BY c_customer_sk)
                        
                SELECT
                    cstmrs.c_customer_sk as customer_sk,
                    ZEROIFNULL(n_sales) as n_sales,
                    ZEROIFNULL(n_returns) as n_returns,
                    IFF(n_sales is null or n_sales = 0 or n_returns is null, 0, n_returns/n_sales) as return_probability,
                    demos.* 
                FROM cstmrs 
                JOIN {src}.customer_demographics as demos ON cstmrs.c_current_cdemo_sk = demos.cd_demo_sk
                LEFT OUTER JOIN sales on cstmrs.c_customer_sk = sales.c_customer_sk
                LEFT OUTER JOIN rtrns on cstmrs.c_customer_sk = rtrns.c_customer_sk
            )
    """)
filepath = os.path.join(path, 'customer_returns.parquet')
with connector:
    ds = connector.read_table(table)
    print('reading table '+table)
    ds.to_pandas().to_parquet(filepath)
    print('writing '+ filepath)