# User Guide
This user guide walks through the basics of reading and writing data with the Ray Snowflake connector.

## Before you begin
- [Install](install.ipynb) the connector.
- [Define](connection.ipynb) connection properties.
- [Setup](setup.ipynb) database objects. 

## How it works
The Ray connector for Snowflake holds the connection properties required to establish a connection with Snowflake. The connector implements the same interface as a DBAPi2Connector, with the normal `query` methods as well as `read_table` and `write_table` methods optimized for large scale parallel data exchange with Snowflake. 

### Connector factory
The `SnowflakeConnector` class has a factory `create` method that is used to create the connector. The `create` method can be called with any connection properties, as well as an optional connection property file path. Read more [here](connection.ipynb) about how connection properties are read.

### Connections and transactions
The connector is a Python context manage, and can be used the `with` semantics to define when a connection should be established, db operations commited to the database, and the connection closed. 

![Transaction context](../../images/snowflake_transactions.png)

The code below will read from sample table using the connector to manage the connection.

In [7]:

from ray_db.provider.snowflake import SnowflakeConnector
import ray

if not ray.is_initialized():
    ray.init(logging_level='ERROR', runtime_env={'env_vars':{'RAY_DISABLE_PYARROW_VERSION_CHECK': '1'}})

SOURCE_TABLE = 'CUSTOMER'

connector = SnowflakeConnector.create()
with connector:
    count = connector.query_value(f'SELECT COUNT(*) from {SOURCE_TABLE}')
    
print(count)

150000


Alternatively, you can use `try` blocks with the connector's `open`, `commit` and `close` methods. 

In [8]:
try:
    connector.open()
    count = connector.query_value(f'SELECT COUNT(*) from {SOURCE_TABLE}')
    
    # if this was a write operation:
    # connector.commit()
finally:
    connector.close()
    
print(count)

150000


## Read from tables
In order to read an entire table into a a Ray cluster, the connector provides a `read_table` method. Under the hood, Ray will use Snowflake optimizations that allow query results to be read in parallel. This enable the connector to read in gigbytes of data per second into a Ray cluster. The Snowflake API has a `read_batch` method that enables this optimization. Ray datasets are composed of PyArrow tables that are spread across the entrire Ray cluster to allow for the distributed operations required in machine learning.

![Snowflake read table](../../images/snowflake_read_table.png)

The code below will read in a sample table from the Snowflake sample database. The `fully_executed` method triggers the entire dataset to load. If this is not called, the data is only loaded when a dataset operation requires it.

In [9]:
with connector:
    ds = connector.read_table(SOURCE_TABLE).fully_executed()
        
ds.limit(3).to_pandas()

RayTaskError(ImportError): [36mray::_get_read_tasks()[39m (pid=95812, ip=10.0.21.126)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/read_api.py", line 1380, in _get_read_tasks
    reader = ds.create_reader(**kwargs)
  File "/tmp/ray/session_2022-11-15_12-34-06_679234_139/runtime_resources/working_dir_files/_ray_pkg_bfb55b36b2eeb9b7cb1dd2cfbf4abe5c/ray_db/__init__.py", line 207, in create_reader
    self.reader.prepare(**read_args)
  File "/tmp/ray/session_2022-11-15_12-34-06_679234_139/runtime_resources/working_dir_files/_ray_pkg_bfb55b36b2eeb9b7cb1dd2cfbf4abe5c/ray_db/provider/dbapi2/__init__.py", line 326, in prepare
    self.sample = BlockAccessor.for_block(sample).to_arrow()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/block.py", line 378, in for_block
    _check_pyarrow_version()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/util.py", line 78, in _check_pyarrow_version
    raise ImportError(
ImportError: Datasets requires pyarrow >= 6.0.1, < 7.0.0, but 8.0.0 is installed. Reinstall with `pip install -U "pyarrow<7.0.0"`. If you want to disable this pyarrow version check, set the environment variable RAY_DISABLE_PYARROW_VERSION_CHECK=1.

## DML and DDL
The connector can also be used for any DDL or DML operations you would normally execute through the Snowflake Python API. These operations just pass through to the underlying Snowflake API. 

The code below will create the objects needed for writing to tables. Note that a commit is issued between our queries so the DDL operation executes prior to the next one that is dependent. An alternative is to use two `with` blocks to define transaction boundaries.

In [None]:
DEST_DATABASE = 'ANYSCALE_SAMPLE'
DEST_TABLE = f'CUSTOMER_COPY'

with connector:
    connector.query(f'CREATE DATABASE IF NOT EXISTS {DEST_DATABASE}')
    connector.commit()
    connector.query(f'CREATE OR REPLACE TABLE {DEST_TABLE} LIKE {SOURCE_TABLE}')

## Write to tables
In order to write a dataset into database table, the connector provides a `write_table` method. Under the hood, the connector will use `PUT` operations to copy the Ray datasets PyArrow tables to a Snowflake tables stage. This happens in parallel, so every worker within Ray that holds a PyArrow table for the dataset will write to Snowflake at the same time. After the parallel put operation, the connector will issue a single `COPY` command to Snowflake which will trigger the Snowflake warehouse to beign moving the raw data into the production table. Since this is divided into two operations, one in parallel, and the second as a single operation, this is a two phased commmit. If any of the intial put opertations fail, the final copy operation will fail. 

![Snowflake write table](../../images/snowflake_write_table.png)

The code below writes the previously read dataset into the newly created table.

In [None]:
with connector:
    connector.write_table(ds, DEST_TABLE)

The table can now be read.

In [None]:
with connector:
    ds2 = connector.read_table(DEST_TABLE).fully_executed()

ds2.limit(3).to_pandas()