I'm trying to create a job to mirror a view that I have in my PostgreSQL DB to a BigQuery table in my Google Cloud Project through Dataflow, I created the job using the "Job builder", and I've got the following YAML generated from it:
pipeline:
transforms:
- name: otto-bq-sbx-postgresql-source
type: ReadFromPostgres
config:
jdbc_url: 'jdbc:postgresql://host:5432/otto'
read_query: SELECT * FROM public."Carlos_PowerBi"
username: user
password: pass
type: postgres
- name: otto-bq-sbx-bigquery-update-param
type: ReadFromBigQuery
config:
table: 'migracao-zrp:otto_bq_sbx.Carlos_PowerBi'
fields:
- lastMessage
- name: otto-bq-sbx-incremental-transform
type: Sql
config:
query: |-
SELECT *
FROM
input1 AS source
WHERE
source."lastMessage" > (
SELECT COALESCE(MAX(lastMessage), TIMESTAMP '1970-01-01')
FROM input0
)
input:
input0: otto-bq-sbx-bigquery-update-param
input1: otto-bq-sbx-postgresql-source
- name: otto-bq-sbx-sink
type: WriteToBigQuery
input: otto-bq-sbx-incremental-transform
config:
table: 'migracao-zrp:otto_bq_sbx.Carlos_PowerBi'
num_streams: 1
So, in summary, I want the job to incrementally update the BQ table's data based on the lastMessage column.
Another point is that I'm connecting to the DB through a VPC, which apparently is working fine (I've correctly configured the connection in the job parameters).
However, when I run the job I run into an error where JDBC tells me that it couldn't figure out the correct type of a field:
ValueError: Failed to decode schema due to an issue with Field proto:
"name: \"proposalSent\"
type {
nullable: true
logical_type {
urn: \"beam:logical_type:javasdk_bit:v1\"
payload: \"\\202SNAPPY\\000\\000\\000\\000\\001\\000\\000\\000\\001\\000\\000\\002a\\301\\007\\360@\\254\\355\\000\\005sr\\000*org.apache.beam.sdk.io.jdbc.LogicalTypes$1%\\263\\224\\246%\\031\\260\\355\\002\\000\\000xr\\000?N9\\000 schemas.l\\t9Dtypes.PassThroughL\\t\\030\\001Q\\270\\210\\324\\331\\211\\313P\\033\\263\\002\\000\\004L\\000\\010argumentt\\000\\022Ljava/lang/Object;L\\000\\014a\\r \\001:\\034t\\000.Lorg/\\t\\266\\000/\\001\\266\\020/sdk/\\r}\\004/S\\005\\205\\024$Field\\0010\\020;L\\000\\tf\\021\\rDq\\000~\\000\\003L\\000\\nidentifier6s\\000\u003cString;xpt\\000\\000sr\\0006n\\346\\000$AutoValue_\\ts\\000_\\025sh9\\304m\\364S\\243\\227P\\002\\000\\010L\\000\\025collectionEle\\001\\346\\001\\226\\r\\211\\000\\013-+\\001\\023\\010t\\0000\\216\\331\\000\\000L9E$;L\\000\\nmapKey\\001@\\rS\\014\\014map\\005\\227\\035\\024,\\010metadatat\\000\\017)aXutil/Map;L\\000\\010nullablet\\000\\023\\t\\035%~\\030Boolean!?\\010row\\t\\343\\010t\\000$\\212\\243\\000\\001T!\\374\\030Namet\\000-\\2122\\000\\000$\\001\\254\\001/\\020;xr\\000,nu\\001\\t\\2109\\3360\\013PLl[\\357\\3103\\002\\000\\000xp\\001\\001\\014sr\\000\\036AC\\000.\\001\\342\\004.C5|Ds$EmptyMapY6\\024\\205Z\\334\\347\\320\\0053\\014sr\\000\\021\\005/\\020lang.\\r\\3648\\315 r\\200\\325\\234\\372\\356\\002\\000\\001Z\\000\\005v!\\344\\034xp\\000p~r\\000+\\212\\234\\000\\021\\314\\000\\000\\r\\001\\000\\022e1\\000\\016\\031f\\014Enum\\r\\034\\005\\035(pt\\000\\006STRINGs!\\304\\020\\007pppp\\001\\t\\000\\020\\001\\005\\010\\022p~\\001\\007@\\023t\\000\\007BOOLEANt\\000\\003BIT\"
representation {
atomic_type: BOOLEAN
}
argument_type {
atomic_type: STRING
}
argument {
atomic_value {
string: \"\"
}
}
}
}
I've created the BQ table matching the types of my PostgreSQL view, so I have no clue why this is happening. Also, my BQ table is currently empty, could this be a problem too?
Just to make sure I'm not missing anything, here are the view types:
And here are my BQ table types, matching the ones of the view:
Here is my most recent attempt as a YAML:
pipeline:
transforms:
- name: otto-bq-sbx-postgresql-source
type: ReadFromPostgres
config:
jdbc_url: 'jdbc:postgresql://host:5432/otto'
read_query: >-
SELECT
"idConversa"::INTEGER AS "idConversa",
"dataConversa"::TIMESTAMP AS "dataConversa",
"estadoFunil"::TEXT AS "estadoFunil",
"tags"::TEXT AS "tags",
"propostaEnviada"::BOOLEAN AS "propostaEnviada",
"termoEnviado"::BOOLEAN AS "termoEnviado",
"termoAssinado"::BOOLEAN AS "termoAssinado",
"idCanal"::INTEGER AS "idCanal",
"nomeCanal"::TEXT AS "nomeCanal",
"nomeCliente"::TEXT AS "nomeCliente",
"numeroCliente"::TEXT AS "numeroCliente",
"ultimaMensagem"::TIMESTAMP AS "ultimaMensagem",
"infoExtra"::TEXT AS "infoExtra"
FROM
public."Carlos_PowerBi"
username: user
password: pass
type: postgres
- name: otto-bq-sbx-bigquery-update-param
type: ReadFromBigQuery
config:
table: 'migracao-zrp:otto_bq_sbx.Carlos_PowerBi'
fields:
- ultimaMensagem
- name: otto-bq-sbx-incremental-transform
type: Sql
config:
query: >-
SELECT
source."idConversa"::INTEGER AS "idConversa",
source."dataConversa"::TIMESTAMP AS "dataConversa",
source."estadoFunil"::TEXT AS "estadoFunil",
source."tags"::TEXT AS "tags",
source."propostaEnviada"::BOOLEAN AS "propostaEnviada",
source."termoEnviado"::BOOLEAN AS "termoEnviado",
source."termoAssinado"::BOOLEAN AS "termoAssinado",
source."idCanal"::INTEGER AS "idCanal",
source."nomeCanal"::TEXT AS "nomeCanal",
source."nomeCliente"::TEXT AS "nomeCliente",
source."numeroCliente"::TEXT AS "numeroCliente",
source."ultimaMensagem"::TIMESTAMP AS "ultimaMensagem",
source."infoExtra"::TEXT AS "infoExtra"
FROM
input1 AS source
WHERE
source."ultimaMensagem" (
SELECT
COALESCE(MAX("ultimaMensagem"), TIMESTAMP '1970-01-01')
FROM
input0
)
input:
input0: otto-bq-sbx-bigquery-update-param
input1: otto-bq-sbx-postgresql-source
- name: otto-bq-sbx-sink
type: WriteToBigQuery
input: otto-bq-sbx-incremental-transform
config:
table: 'migracao-zrp:otto_bq_sbx.Carlos_PowerBi'
num_streams: 1
It's still a mystery to me why this is still struggling with the types, since they are all well-defined.

