import dask
import dask.dataframe as dd
import pandas as pd
import pandas as pd
import numpy as np
from pandas.tseries.holiday import USFederalHolidayCalendar
import os
import time
import pyarrow.dataset as ds
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
from dask.utils import parse_bytes
import dask_cudfcluster = LocalCUDACluster()
client = Client(cluster)cluster.scale(2)clustertaxi_parquet_path = "gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.12.parquet"npartitions = len(client.has_what().keys())
print(npartitions)2
taxi_df = dask_cudf.read_parquet(taxi_parquet_path, npartitions=npartitions)with dask.annotate(workers=set(client.has_what().keys())):
taxi_df = client.persist(taxi_df)
wait(taxi_df)DoneAndNotDoneFutures(done={<Future: finished, type: cudf.DataFrame, key: ('read-parquet-349d4269f1fa93859fff5e4a928c999b', 0)>}, not_done=set())
client.who_has(taxi_df){"('read-parquet-349d4269f1fa93859fff5e4a928c999b', 0)": ('tcp://127.0.0.1:39893',)}
Check to see if if repartitioning into two partitons and then writing to disc and subsequently reading from disc still maintains two partitions
taxi_df = taxi_df.repartition(2)taxi_df.to_parquet("test.parquet")new_df = dask_cudf.read_parquet("test.parquet")new_dfDask DataFrame Structure:
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
| tpep_pickup_datetime | VendorID | tpep_dropoff_datetime | passenger_count | trip_distance | pickup_longitude | pickup_latitude | RateCodeID | store_and_fwd_flag | dropoff_longitude | dropoff_latitude | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| npartitions=2 | |||||||||||||||||||
| 0 | datetime64[ns] | int64 | datetime64[ns] | int64 | float64 | float64 | float64 | int64 | object | float64 | float64 | int64 | float64 | float64 | float64 | float64 | float64 | float64 | float64 |
| 403876 | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
| 807751 | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
Dask Name: read-parquet, 2 tasks
with dask.annotate(workers=set(client.has_what().keys())):
new_df = client.persist(new_df)
wait(new_df)DoneAndNotDoneFutures(done={<Future: finished, type: cudf.DataFrame, key: ('read-parquet-84d2d65d597c1a9f79b61d1d66c44af5', 0)>, <Future: finished, type: cudf.DataFrame, key: ('read-parquet-84d2d65d597c1a9f79b61d1d66c44af5', 1)>}, not_done=set())
client.who_has(new_df){"('read-parquet-84d2d65d597c1a9f79b61d1d66c44af5', 0)": ('tcp://127.0.0.1:35709',),
"('read-parquet-84d2d65d597c1a9f79b61d1d66c44af5', 1)": ('tcp://127.0.0.1:39893',)}
Check to see if data frame has one partition then writing to disc and reading without reartitioning still has a single partition and that dask would persist to a single client.
one_partition = taxi_df.repartition(1)one_partitionDask DataFrame Structure:
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
| tpep_pickup_datetime | VendorID | tpep_dropoff_datetime | passenger_count | trip_distance | pickup_longitude | pickup_latitude | RateCodeID | store_and_fwd_flag | dropoff_longitude | dropoff_latitude | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| npartitions=1 | |||||||||||||||||||
| datetime64[ns] | int64 | datetime64[ns] | int64 | float64 | float64 | float64 | int64 | object | float64 | float64 | int64 | float64 | float64 | float64 | float64 | float64 | float64 | float64 | |
| ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
Dask Name: repartition, 5 tasks
one_partition.to_parquet("test_single.parquet") # has a single partitionnew_single_partition_df = dask_cudf.read_parquet("test_single.parquet", npartitions=2)new_single_partition_dfDask DataFrame Structure:
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
| tpep_pickup_datetime | VendorID | tpep_dropoff_datetime | passenger_count | trip_distance | pickup_longitude | pickup_latitude | RateCodeID | store_and_fwd_flag | dropoff_longitude | dropoff_latitude | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| npartitions=1 | |||||||||||||||||||
| 0 | datetime64[ns] | int64 | datetime64[ns] | int64 | float64 | float64 | float64 | int64 | object | float64 | float64 | int64 | float64 | float64 | float64 | float64 | float64 | float64 | float64 |
| 807751 | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
Dask Name: read-parquet, 1 tasks
with dask.annotate(workers=set(client.has_what().keys())):
new_single_partition_df = client.persist(new_single_partition_df)
wait(new_single_partition_df)DoneAndNotDoneFutures(done={<Future: finished, type: cudf.DataFrame, key: ('read-parquet-5832fa7d00a750180cacf3b4810f6d85', 0)>}, not_done=set())
client.who_has(new_single_partition_df){"('read-parquet-5832fa7d00a750180cacf3b4810f6d85', 0)": ('tcp://127.0.0.1:35709',)}
client.has_what().keys()dict_keys(['tcp://127.0.0.1:35709', 'tcp://127.0.0.1:39893'])