diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index d777784f64..1251e64fb0 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -34,6 +34,7 @@ Union, ) +import google.api_core.exceptions import google.cloud.bigquery as bigquery import numpy import pandas @@ -2508,7 +2509,14 @@ def to_gbq( ) if_exists = "replace" - if "." not in destination_table: + table_parts = destination_table.split(".") + default_project = self._block.expr.session.bqclient.project + + if len(table_parts) == 2: + destination_dataset = f"{default_project}.{table_parts[0]}" + elif len(table_parts) == 3: + destination_dataset = f"{table_parts[0]}.{table_parts[1]}" + else: raise ValueError( f"Got invalid value for destination_table {repr(destination_table)}. " "Should be of the form 'datasetId.tableId' or 'projectId.datasetId.tableId'." @@ -2523,11 +2531,16 @@ def to_gbq( f"Valid options include None or one of {dispositions.keys()}." ) + try: + self._session.bqclient.get_dataset(destination_dataset) + except google.api_core.exceptions.NotFound: + self._session.bqclient.create_dataset(destination_dataset, exists_ok=True) + job_config = bigquery.QueryJobConfig( write_disposition=dispositions[if_exists], destination=bigquery.table.TableReference.from_string( destination_table, - default_project=self._block.expr.session.bqclient.project, + default_project=default_project, ), ) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 663a7ceb49..ab68543d91 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -3683,3 +3683,18 @@ def test_to_pandas_downsampling_option_override(session): total_memory_bytes = df.memory_usage(deep=True).sum() total_memory_mb = total_memory_bytes / (1024 * 1024) assert total_memory_mb == pytest.approx(download_size, rel=0.3) + + +def test_to_gbq_and_create_dataset(session, scalars_df_index, dataset_id_not_created): + dataset_id = dataset_id_not_created + destination_table = f"{dataset_id}.scalars_df" + + result_table = scalars_df_index.to_gbq(destination_table) + assert ( + result_table == destination_table + if destination_table + else result_table is not None + ) + + loaded_scalars_df_index = session.read_gbq(result_table) + assert not loaded_scalars_df_index.empty