@@ -217,14 +217,68 @@ def row_to_singer_message(stream, row, version, columns, time_extracted, md_map,
217217 version = version ,
218218 time_extracted = time_extracted )
219219
220- def consume_message (streams , state , msg , time_extracted , conn_info , end_lsn ):
221- payload = json .loads (msg .payload )
222- lsn = msg .data_start
220+ def consume_message_format_2 (payload , conn_info , streams_lookup , state , time_extracted , lsn ):
221+ ## Action Types:
222+ # I = Insert
223+ # U = Update
224+ # D = Delete
225+ # B = Begin Transaction
226+ # C = Commit Transaction
227+ # M = Message
228+ # T = Truncate
229+ action = payload ['action' ]
230+ if action not in ['U' , 'I' , 'D' ]:
231+ LOGGER .debug ("Skipping message of type %s" , action )
232+ yield None
233+ else :
234+ tap_stream_id = post_db .compute_tap_stream_id (conn_info ['dbname' ], payload ['schema' ], payload ['table' ])
235+ if streams_lookup .get (tap_stream_id ) is None :
236+ yield None
237+ else :
238+ target_stream = streams_lookup [tap_stream_id ]
239+ stream_version = get_stream_version (target_stream ['tap_stream_id' ], state )
240+ stream_md_map = metadata .to_map (target_stream ['metadata' ])
223241
224- streams_lookup = {}
225- for s in streams :
226- streams_lookup [s ['tap_stream_id' ]] = s
242+ desired_columns = [col for col in target_stream ['schema' ]['properties' ].keys () if sync_common .should_sync_column (stream_md_map , col )]
227243
244+ col_names = []
245+ col_vals = []
246+ if payload ['action' ] in ['I' , 'U' ]:
247+ for column in payload ['columns' ]:
248+ if column ['name' ] in set (desired_columns ):
249+ col_names .append (column ['name' ])
250+ col_vals .append (column ['value' ])
251+
252+ col_names = col_names + ['_sdc_deleted_at' ]
253+ col_vals = col_vals + [None ]
254+
255+ if conn_info .get ('debug_lsn' ):
256+ col_names = col_names + ['_sdc_lsn' ]
257+ col_vals = col_vals + [str (lsn )]
258+
259+ elif payload ['action' ] == 'D' :
260+ for column in payload ['identity' ]:
261+ if column ['name' ] in set (desired_columns ):
262+ col_names .append (column ['name' ])
263+ col_vals .append (column ['value' ])
264+
265+ col_names = col_names + ['_sdc_deleted_at' ]
266+ col_vals = col_vals + [singer .utils .strftime (singer .utils .strptime_to_utc (payload ['timestamp' ]))]
267+
268+ if conn_info .get ('debug_lsn' ):
269+ col_vals = col_vals + [str (lsn )]
270+ col_names = col_names + ['_sdc_lsn' ]
271+
272+ # Yield 1 record to match the API of V1
273+ yield row_to_singer_message (target_stream , col_vals , stream_version , col_names , time_extracted , stream_md_map , conn_info )
274+
275+ state = singer .write_bookmark (state ,
276+ target_stream ['tap_stream_id' ],
277+ 'lsn' ,
278+ lsn )
279+
280+ # message-format v1
281+ def consume_message_format_1 (payload , conn_info , streams_lookup , state , time_extracted , lsn ):
228282 for c in payload ['change' ]:
229283 tap_stream_id = post_db .compute_tap_stream_id (conn_info ['dbname' ], c ['schema' ], c ['table' ])
230284 if streams_lookup .get (tap_stream_id ) is None :
@@ -288,15 +342,33 @@ def consume_message(streams, state, msg, time_extracted, conn_info, end_lsn):
288342 raise Exception ("unrecognized replication operation: {}" .format (c ['kind' ]))
289343
290344
291- singer . write_message ( record_message )
345+ yield record_message
292346 state = singer .write_bookmark (state ,
293347 target_stream ['tap_stream_id' ],
294348 'lsn' ,
295349 lsn )
350+
351+
352+ def consume_message (streams , state , msg , time_extracted , conn_info , end_lsn , message_format = "1" ):
353+ payload = json .loads (msg .payload )
354+ lsn = msg .data_start
355+
356+ streams_lookup = {s ['tap_stream_id' ]: s for s in streams }
357+
358+ if message_format == "1" :
359+ records = consume_message_format_1 (payload , conn_info , streams_lookup , state , time_extracted , lsn )
360+ elif message_format == "2" :
361+ records = consume_message_format_2 (payload , conn_info , streams_lookup , state , time_extracted , lsn )
362+ else :
363+ raise Exception ("Unknown wal2json message format version: {}" .format (message_format ))
364+
365+ for record_message in records :
366+ if record_message :
367+ singer .write_message (record_message )
368+ # Pulled out of refactor so we send a keep-alive per-record
296369 LOGGER .debug ("sending feedback to server with NO flush_lsn. just a keep-alive" )
297370 msg .cursor .send_feedback ()
298371
299-
300372 LOGGER .debug ("sending feedback to server. flush_lsn = %s" , msg .data_start )
301373 if msg .data_start > end_lsn :
302374 raise Exception ("incorrectly attempting to flush an lsn({}) > end_lsn({})" .format (msg .data_start , end_lsn ))
@@ -339,8 +411,17 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
339411 with post_db .open_connection (conn_info , True ) as conn :
340412 with conn .cursor () as cur :
341413 LOGGER .info ("Starting Logical Replication for %s(%s): %s -> %s. poll_total_seconds: %s" , list (map (lambda s : s ['tap_stream_id' ], logical_streams )), slot , start_lsn , end_lsn , poll_total_seconds )
414+
415+ replication_params = {"slot_name" : slot ,
416+ "decode" : True ,
417+ "start_lsn" : start_lsn }
418+ message_format = conn_info .get ("wal2json_message_format" ) or "1"
419+ if message_format == "2" :
420+ LOGGER .info ("Using wal2json format-version 2" )
421+ replication_params ["options" ] = {"format-version" : 2 , "include-timestamp" : True }
422+
342423 try :
343- cur .start_replication (slot_name = slot , decode = True , start_lsn = start_lsn )
424+ cur .start_replication (** replication_params )
344425 except psycopg2 .ProgrammingError :
345426 raise Exception ("unable to start replication with logical replication slot {}" .format (slot ))
346427
@@ -358,13 +439,13 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
358439 LOGGER .info ("gone past end_lsn %s for run. breaking" , end_lsn )
359440 break
360441
361- state = consume_message (logical_streams , state , msg , time_extracted , conn_info , end_lsn )
442+ state = consume_message (logical_streams , state , msg , time_extracted ,
443+ conn_info , end_lsn , message_format = message_format )
362444 #msg has been consumed. it has been processed
363445 last_lsn_processed = msg .data_start
364446 rows_saved = rows_saved + 1
365447 if rows_saved % UPDATE_BOOKMARK_PERIOD == 0 :
366448 singer .write_message (singer .StateMessage (value = copy .deepcopy (state )))
367-
368449 else :
369450 now = datetime .datetime .now ()
370451 timeout = keep_alive_time - (now - cur .io_timestamp ).total_seconds ()
0 commit comments