User Guide#

This user guide walks through the basics of reading and writing data with the Ray Snowflake connector.

Before you begin#

How it works#

The Ray connector for Snowflake holds the connection properties required to establish a connection with Snowflake. The connector implements the same interface as a DBAPi2Connector, with the normal query methods as well as read_table and write_table methods optimized for large scale parallel data exchange with Snowflake.

Connector factory#

The SnowflakeConnector class has a factory create method that is used to create the connector. The create method can be called with any connection properties, as well as an optional connection property file path. Read more here about how connection properties are read.

Connections and transactions#

The connector is a Python context manage, and can be used the with semantics to define when a connection should be established, db operations commited to the database, and the connection closed.

Transaction context

The code below will read from sample table using the connector to manage the connection.

from ray_db.provider.snowflake import SnowflakeConnector
import ray

if not ray.is_initialized():
    ray.init(logging_level='ERROR', runtime_env={'env_vars':{'RAY_DISABLE_PYARROW_VERSION_CHECK': '1'}})

SOURCE_TABLE = 'CUSTOMER'

connector = SnowflakeConnector.create()
with connector:
    count = connector.query_value(f'SELECT COUNT(*) from {SOURCE_TABLE}')
    
print(count)
150000

Alternatively, you can use try blocks with the connector’s open, commit and close methods.

try:
    connector.open()
    count = connector.query_value(f'SELECT COUNT(*) from {SOURCE_TABLE}')
    
    # if this was a write operation:
    # connector.commit()
finally:
    connector.close()
    
print(count)
150000

Read from tables#

In order to read an entire table into a a Ray cluster, the connector provides a read_table method. Under the hood, Ray will use Snowflake optimizations that allow query results to be read in parallel. This enable the connector to read in gigbytes of data per second into a Ray cluster. The Snowflake API has a read_batch method that enables this optimization. Ray datasets are composed of PyArrow tables that are spread across the entrire Ray cluster to allow for the distributed operations required in machine learning.

Snowflake read table

The code below will read in a sample table from the Snowflake sample database. The fully_executed method triggers the entire dataset to load. If this is not called, the data is only loaded when a dataset operation requires it.

with connector:
    ds = connector.read_table(SOURCE_TABLE).fully_executed()
        
ds.limit(3).to_pandas()
  0%|                                                                                                                                                                          | 0/8 [00:00<?, ?it/s]
Read progress:   0%|                                                                                                                                                           | 0/8 [00:00<?, ?it/s]
Read progress:  12%|██████████████████▍                                                                                                                                | 1/8 [00:00<00:01,  3.79it/s]
Read progress:  25%|████████████████████████████████████▊                                                                                                              | 2/8 [00:00<00:01,  3.25it/s]
Read progress:  38%|███████████████████████████████████████████████████████▏                                                                                           | 3/8 [00:02<00:05,  1.06s/it]
Read progress:  50%|█████████████████████████████████████████████████████████████████████████▌                                                                         | 4/8 [00:03<00:04,  1.12s/it]
Read progress: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8/8 [00:03<00:00,  2.74it/s]
Read progress: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8/8 [00:03<00:00,  2.05it/s]

C_CUSTKEY C_NAME C_ADDRESS C_NATIONKEY C_PHONE C_ACCTBAL C_MKTSEGMENT C_COMMENT
0 30001 Customer#000030001 Ui1b,3Q71CiLTJn4MbVp,,YCZARIaNTelfst 4 14-526-204-4500 8848.47 MACHINERY frays wake blithely enticingly ironic asymptote
1 30002 Customer#000030002 UVBoMtILkQu1J3v 11 21-340-653-9800 5221.81 MACHINERY he slyly ironic pinto beans wake slyly above t...
2 30003 Customer#000030003 CuGi9fwKn8JdR 21 31-757-493-7525 3014.89 BUILDING e furiously alongside of the requests. evenly ...

DML and DDL#

The connector can also be used for any DDL or DML operations you would normally execute through the Snowflake Python API. These operations just pass through to the underlying Snowflake API.

The code below will create the objects needed for writing to tables. Note that a commit is issued between our queries so the DDL operation executes prior to the next one that is dependent. An alternative is to use two with blocks to define transaction boundaries.

DEST_DATABASE = 'ANYSCALE_SAMPLE'
DEST_TABLE = f'CUSTOMER_COPY'

with connector:
    connector.query(f'CREATE DATABASE IF NOT EXISTS {DEST_DATABASE}')
    connector.commit()
    connector.query(f'CREATE OR REPLACE TABLE {DEST_TABLE} LIKE {SOURCE_TABLE}')

Write to tables#

In order to write a dataset into database table, the connector provides a write_table method. Under the hood, the connector will use PUT operations to copy the Ray datasets PyArrow tables to a Snowflake tables stage. This happens in parallel, so every worker within Ray that holds a PyArrow table for the dataset will write to Snowflake at the same time. After the parallel put operation, the connector will issue a single COPY command to Snowflake which will trigger the Snowflake warehouse to beign moving the raw data into the production table. Since this is divided into two operations, one in parallel, and the second as a single operation, this is a two phased commmit. If any of the intial put opertations fail, the final copy operation will fail.

Snowflake write table

The code below writes the previously read dataset into the newly created table.

with connector:
    connector.write_table(ds, DEST_TABLE)
  0%|                                                                                                                                                                          | 0/8 [00:00<?, ?it/s]
Write Progress:   0%|                                                                                                                                                          | 0/8 [00:00<?, ?it/s]
Write Progress:  12%|██████████████████▎                                                                                                                               | 1/8 [00:00<00:05,  1.31it/s]
Write Progress:  50%|█████████████████████████████████████████████████████████████████████████                                                                         | 4/8 [00:00<00:00,  5.81it/s]
Write Progress:  75%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████▌                                    | 6/8 [00:01<00:00,  7.28it/s]
Write Progress: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8/8 [00:03<00:00,  1.86it/s]
Write Progress: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8/8 [00:04<00:00,  1.68it/s]

The table can now be read.

with connector:
    ds2 = connector.read_table(DEST_TABLE).fully_executed()

ds2.limit(3).to_pandas()
  0%|                                                                                                                                                                         | 0/33 [00:00<?, ?it/s]
Read progress:   0%|                                                                                                                                                          | 0/33 [00:00<?, ?it/s]
Read progress:   3%|████▍                                                                                                                                             | 1/33 [00:00<00:06,  5.20it/s]
Read progress:  24%|███████████████████████████████████▍                                                                                                              | 8/33 [00:00<00:01, 21.43it/s]
Read progress:  45%|█████████████████████████████████████████████████████████████████▉                                                                               | 15/33 [00:00<00:00, 26.91it/s]
Read progress:  64%|████████████████████████████████████████████████████████████████████████████████████████████▎                                                    | 21/33 [00:00<00:00, 26.22it/s]
Read progress:  73%|█████████████████████████████████████████████████████████████████████████████████████████████████████████▍                                       | 24/33 [00:02<00:01,  7.64it/s]
Read progress:  79%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏                              | 26/33 [00:02<00:00,  7.54it/s]
Read progress:  91%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▊             | 30/33 [00:02<00:00,  9.97it/s]
Read progress:  97%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▌    | 32/33 [00:02<00:00, 10.23it/s]
Read progress: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 33/33 [00:03<00:00, 10.05it/s]

C_CUSTKEY C_NAME C_ADDRESS C_NATIONKEY C_PHONE C_ACCTBAL C_MKTSEGMENT C_COMMENT
0 30666 Customer#000030666 qRng78JcCsiJzYUiEcLkgtG8YAbccNW 18 28-289-466-3224 9309.24 FURNITURE efore the blithely unusual theodoli
1 30667 Customer#000030667 MYWmfMCltgljXo8JYQ3eihvg 5 15-727-495-2134 5434.90 BUILDING requests sleep quickly. carefully even reques...
2 30668 Customer#000030668 9OSUZXVkHfu,,E15s 19 29-168-834-3012 9431.54 BUILDING ons eat. blithely final accounts sleep despite...