0

I have a dataflow job where I will read from bigquery query first (in standard sql). It works perfectly in direct runner mode. However I tried to run this dataflow in dataflow runner mode and encountered this error :

response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Thu, 24 Dec 2020 09:28:21 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '470', '-content-encoding': 'gzip'}>, content <{ "error": { "code": 400, "message": "Querying tables partitioned on a field is not supported in Legacy SQL: 551608533331:GoogleSearchConsole.search_query_analytics_log.", "errors": [ { "message": "Querying tables partitioned on a field is not supported in Legacy SQL: 551608533331:GoogleSearchConsole.search_query_analytics_log.", "domain": "global", "reason": "invalid" } ], "status": "INVALID_ARGUMENT" } } >

Apparently the use_standard_sql parameter doesn't work in dataflow runner mode. Version: apache-beam: 2.24.0 python: 3.8

last_update_date = pipeline | 'Read last update date' >> beam.io.Read(beam.io.BigQuerySource(
    query='''
        SELECT
            MAX(date) AS date
        FROM
            GoogleSearchConsole.search_query_analytics_log
    ''',
    use_standard_sql=True
))

1 Answer 1

0

Try out following code which read data from Bigquery and write to Bigquery. Code is a apache beam dataflow runner code:-

#------------Import Lib-----------------------#
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import apache_beam as beam, os, sys, argparse, logging
from apache_beam.options.pipeline_options import SetupOptions

#------------Set up BQ parameters-----------------------#
# Replace with Project Id
project = 'xxxxx'
#plitting Of Records----------------------#

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument(
          '--cur_suffix',
          dest='cur_suffix',
          help='Input table suffix to process.')
    known_args, pipeline_args = parser.parse_known_args(argv)


    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    p1 = beam.Pipeline(options=pipeline_options)


    logging.info('***********')
    logging.info(known_args.cur_suffix)
    data_loading = (
        p1
        | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(query='''SELECT SUBSTR(_time, 1, 19) as _time, dest FROM `project.dataset.table`''', use_standard_sql=True))
    )

    project_id = "xxxxxxx"
    dataset_id = 'AAAAAAAA'
    table_schema_Audit = ('_time:DATETIME, dest:STRING')

#---------------------Type = audit----------------------------------------------------------------------------------------------------------------------
    result = (
    data_loading
        | 'Write-Audit' >> beam.io.WriteToBigQuery(
                                                    table='YYYYYYY',
                                                    dataset=dataset_id,
                                                    project=project_id,
                                                    schema=table_schema_Audit,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
                                                    ))



    result = p1.run()
    result.wait_until_finish()


if __name__ == '__main__':
  path_service_account = 'ABGFDfc927.json'
  os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account
  run()

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for your reply. I still don't understand why I failed in dataflow runner mode. Could you help explain?
Unfortunately, i can can see complete code in your description, it will be better if you can share complete code then i can investigate for issue. Section of code you have given over description looks good to me. I am assuming the can be other problem.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.