0

I'm trying to create a job to mirror a view that I have in my PostgreSQL DB to a BigQuery table in my Google Cloud Project through Dataflow, I created the job using the "Job builder", and I've got the following YAML generated from it:

pipeline:
  transforms:
    - name: otto-bq-sbx-postgresql-source
      type: ReadFromPostgres
      config:
        jdbc_url: 'jdbc:postgresql://host:5432/otto'
        read_query: SELECT * FROM public."Carlos_PowerBi"
        username: user
        password: pass
        type: postgres
    - name: otto-bq-sbx-bigquery-update-param
      type: ReadFromBigQuery
      config:
        table: 'migracao-zrp:otto_bq_sbx.Carlos_PowerBi'
        fields:
          - lastMessage
    - name: otto-bq-sbx-incremental-transform
      type: Sql
      config:
        query: |-
          SELECT *
          FROM
            input1 AS source
          WHERE
            source."lastMessage" > (
              SELECT COALESCE(MAX(lastMessage), TIMESTAMP '1970-01-01')
              FROM input0
            )
      input:
        input0: otto-bq-sbx-bigquery-update-param
        input1: otto-bq-sbx-postgresql-source
    - name: otto-bq-sbx-sink
      type: WriteToBigQuery
      input: otto-bq-sbx-incremental-transform
      config:
        table: 'migracao-zrp:otto_bq_sbx.Carlos_PowerBi'
        num_streams: 1

So, in summary, I want the job to incrementally update the BQ table's data based on the lastMessage column.

Another point is that I'm connecting to the DB through a VPC, which apparently is working fine (I've correctly configured the connection in the job parameters).

However, when I run the job I run into an error where JDBC tells me that it couldn't figure out the correct type of a field:

ValueError: Failed to decode schema due to an issue with Field proto:
"name: \"proposalSent\"
type {
  nullable: true
  logical_type {
    urn: \"beam:logical_type:javasdk_bit:v1\"
    payload: \"\\202SNAPPY\\000\\000\\000\\000\\001\\000\\000\\000\\001\\000\\000\\002a\\301\\007\\360@\\254\\355\\000\\005sr\\000*org.apache.beam.sdk.io.jdbc.LogicalTypes$1%\\263\\224\\246%\\031\\260\\355\\002\\000\\000xr\\000?N9\\000 schemas.l\\t9Dtypes.PassThroughL\\t\\030\\001Q\\270\\210\\324\\331\\211\\313P\\033\\263\\002\\000\\004L\\000\\010argumentt\\000\\022Ljava/lang/Object;L\\000\\014a\\r \\001:\\034t\\000.Lorg/\\t\\266\\000/\\001\\266\\020/sdk/\\r}\\004/S\\005\\205\\024$Field\\0010\\020;L\\000\\tf\\021\\rDq\\000~\\000\\003L\\000\\nidentifier6s\\000\u003cString;xpt\\000\\000sr\\0006n\\346\\000$AutoValue_\\ts\\000_\\025sh9\\304m\\364S\\243\\227P\\002\\000\\010L\\000\\025collectionEle\\001\\346\\001\\226\\r\\211\\000\\013-+\\001\\023\\010t\\0000\\216\\331\\000\\000L9E$;L\\000\\nmapKey\\001@\\rS\\014\\014map\\005\\227\\035\\024,\\010metadatat\\000\\017)aXutil/Map;L\\000\\010nullablet\\000\\023\\t\\035%~\\030Boolean!?\\010row\\t\\343\\010t\\000$\\212\\243\\000\\001T!\\374\\030Namet\\000-\\2122\\000\\000$\\001\\254\\001/\\020;xr\\000,nu\\001\\t\\2109\\3360\\013PLl[\\357\\3103\\002\\000\\000xp\\001\\001\\014sr\\000\\036AC\\000.\\001\\342\\004.C5|Ds$EmptyMapY6\\024\\205Z\\334\\347\\320\\0053\\014sr\\000\\021\\005/\\020lang.\\r\\3648\\315 r\\200\\325\\234\\372\\356\\002\\000\\001Z\\000\\005v!\\344\\034xp\\000p~r\\000+\\212\\234\\000\\021\\314\\000\\000\\r\\001\\000\\022e1\\000\\016\\031f\\014Enum\\r\\034\\005\\035(pt\\000\\006STRINGs!\\304\\020\\007pppp\\001\\t\\000\\020\\001\\005\\010\\022p~\\001\\007@\\023t\\000\\007BOOLEANt\\000\\003BIT\"
   representation {
     atomic_type: BOOLEAN
   }
   argument_type {
     atomic_type: STRING
   }
   argument {
     atomic_value {
       string: \"\"
     }
   }
 }
}

I've created the BQ table matching the types of my PostgreSQL view, so I have no clue why this is happening. Also, my BQ table is currently empty, could this be a problem too?

Just to make sure I'm not missing anything, here are the view types:

View types

And here are my BQ table types, matching the ones of the view:

BQ table types

Here is my most recent attempt as a YAML:

pipeline:
  transforms:
    - name: otto-bq-sbx-postgresql-source
      type: ReadFromPostgres
      config:
        jdbc_url: 'jdbc:postgresql://host:5432/otto'
        read_query: >-
          SELECT
            "idConversa"::INTEGER AS "idConversa",
            "dataConversa"::TIMESTAMP AS "dataConversa",
            "estadoFunil"::TEXT AS "estadoFunil",
            "tags"::TEXT AS "tags",
            "propostaEnviada"::BOOLEAN AS "propostaEnviada",
            "termoEnviado"::BOOLEAN AS "termoEnviado",
            "termoAssinado"::BOOLEAN AS "termoAssinado",
            "idCanal"::INTEGER AS "idCanal",
            "nomeCanal"::TEXT AS "nomeCanal",
            "nomeCliente"::TEXT AS "nomeCliente",
            "numeroCliente"::TEXT  AS "numeroCliente",
            "ultimaMensagem"::TIMESTAMP AS "ultimaMensagem",
            "infoExtra"::TEXT AS "infoExtra"
          FROM
            public."Carlos_PowerBi"
        username: user
        password: pass
        type: postgres
    - name: otto-bq-sbx-bigquery-update-param
      type: ReadFromBigQuery
      config:
        table: 'migracao-zrp:otto_bq_sbx.Carlos_PowerBi'
        fields:
          - ultimaMensagem
    - name: otto-bq-sbx-incremental-transform
      type: Sql
      config:
        query: >-
          SELECT
            source."idConversa"::INTEGER AS "idConversa",
            source."dataConversa"::TIMESTAMP AS "dataConversa",
            source."estadoFunil"::TEXT AS "estadoFunil",
            source."tags"::TEXT AS "tags",
            source."propostaEnviada"::BOOLEAN AS "propostaEnviada",
            source."termoEnviado"::BOOLEAN AS "termoEnviado",
            source."termoAssinado"::BOOLEAN AS "termoAssinado",
            source."idCanal"::INTEGER AS "idCanal",
            source."nomeCanal"::TEXT AS "nomeCanal",
            source."nomeCliente"::TEXT AS "nomeCliente",
            source."numeroCliente"::TEXT AS "numeroCliente",
            source."ultimaMensagem"::TIMESTAMP AS "ultimaMensagem",
            source."infoExtra"::TEXT AS "infoExtra"
          FROM
            input1 AS source
          WHERE
            source."ultimaMensagem" (
              SELECT 
                COALESCE(MAX("ultimaMensagem"), TIMESTAMP '1970-01-01')
              FROM
                input0
            )
      input:
        input0: otto-bq-sbx-bigquery-update-param
        input1: otto-bq-sbx-postgresql-source
    - name: otto-bq-sbx-sink
      type: WriteToBigQuery
      input: otto-bq-sbx-incremental-transform
      config:
        table: 'migracao-zrp:otto_bq_sbx.Carlos_PowerBi'
        num_streams: 1

It's still a mystery to me why this is still struggling with the types, since they are all well-defined.

1 Answer 1

0

You have an error of:

ValueError: Failed to decode schema due to an issue with Field proto: ...
logical_type: urn: "beam:logical_type:javasdk_bit:v1" ...

This means that Dataflow’s beam pipeline can not pick up/map the type of PostgreSQL (BIT) sent by the JDBC source. Can you check if the Postgres field is in BOOLEAN? And not in BIT

Try to change your SELECT query to

SELECT *, proposalSent::BOOLEAN AS proposalSent FROM …

Your pipeline may be sending the wrong type of data that’s why you are getting the error. And BigQuery doesn’t have BIT type but instead BOOLEAN

Sign up to request clarification or add additional context in comments.

1 Comment

I've enforced the BOOLEAN type in all places I could. I did it in the source from the PostgreSQL DB, I did it in the transformation query and I also forced the type casting in the view query itself. However, I still get the same error, I'll add my most recent YAML file as a edit in the post

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.