Skip to content

Instantly share code, notes, and snippets.

@michan85
Created September 11, 2018 09:25
Show Gist options
  • Select an option

  • Save michan85/28dba73fbe0f1121d383bf1d4aa69c73 to your computer and use it in GitHub Desktop.

Select an option

Save michan85/28dba73fbe0f1121d383bf1d4aa69c73 to your computer and use it in GitHub Desktop.
a hack to allow setting the default location of bigquery query, for direct runner
class BigQueryReader(beam.io.gcp.bigquery.BigQueryReader):
def __init__(self, source, test_bigquery_client=None, use_legacy_sql=True,
flatten_results=True):
super(BigQueryReader,self).__init__(source, test_bigquery_client=test_bigquery_client, use_legacy_sql=use_legacy_sql,
flatten_results=flatten_results)
def _get_source_table_location(self):
loc = super(BigQueryReader, self)._get_source_table_location()
if loc is None:
return self.source.default_location
class BigQuerySource(beam.io.BigQuerySource):
def __init__(self, table=None, dataset=None, project=None, query=None,
validate=False, coder=None, use_standard_sql=False,
flatten_results=True,default_location=None):
super(BigQuerySource,self).__init__(
table=table, dataset=dataset, project=project, query=query,
validate=validate, coder=coder, use_standard_sql=use_standard_sql,
flatten_results=flatten_results
)
self.default_location=default_location
def reader(self, test_bigquery_client=None):
return BigQueryReader(
source=self,
test_bigquery_client=test_bigquery_client,
use_legacy_sql=self.use_legacy_sql,
flatten_results=self.flatten_results)
# Example Usage
p | beam.io.Read(BigQuerySource(default_location='EU', use_standard_sql=True, query="Select * from eu_table"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment