Running benchmarks#

Benchmarks are perfromed with a 20 node m5.3xlarge Ray cluster and x4large Snowflake cluster. To replicate this example, configure your Ray cluster to have max worker of 20 m5.3xlarge instances, and configure a Snowflake Warehouse in your account that is 4x-Large.

The code below will run the read and write bechmarks.

from ray_db.provider.dbapi2 import run_benchmarks
from ray_db.provider.snowflake import SnowflakeConnector

customer_size_range = [1]#[1,10,100,1000]
source_tables = [f'SNOWFLAKE_SAMPLE_DATA.TPCH_SF{s}.CUSTOMER' for s in customer_size_range]
destination_tables = [f'ANYSCALE_SAMPLE.PUBLIC.CUSTOMER_{s}' for s in customer_size_range] 
    
def prepare_snowflake_table(connector, operation, source_table, destination_table) -> None:
    if operation == 'write':
        with connector:
            connector.query(f'CREATE OR REPLACE TABLE {destination_table} LIKE {source_table}') 

connector = SnowflakeConnector.create(warehouse = 'X4LARGE')
read, write = run_benchmarks(connector, source_tables, destination_tables, prepare_snowflake_table)
read, write = read.to_pandas(), write.to_pandas()

Read benchmarks#

def to_summary(df):
    return df.groupby(['n_rows']).agg({
        'elapsed_time': 'mean',
        'n_bytes': lambda x: x.mean()/1000000,
        'throughput': lambda x: x.max()/1000000
    }).reset_index()

def to_markdown(df, header=['rows', 'time (s)', 'bytes (MB)', 'mean throughput (MB/s)']):
    md = '| ' + ' | '.join(header) + ' |\n'
    md = md + '| -----: ' * len(header) + '|\n'
    for i, row in df.iterrows():
        md = md + '| ' + ' | '.join([f'{row[c]:.2f}' for c in df.columns]) + ' |\n'
    return md

print(to_markdown(to_summary(read)))

Write benchmarks#


print(to_markdown(to_summary(write)))