@@ -77,6 +77,12 @@ def poll_for_job_completion(runner, job_id):
7777
7878 last_error_rank = float ('-inf' )
7979 last_error_msg = None
80+ last_job_state = None
81+ # How long to wait after pipeline failure for the error
82+ # message to show up giving the reason for the failure.
83+ # It typically takes about 30 seconds.
84+ final_countdown_timer_secs = 50.0
85+ sleep_secs = 5.0
8086 # Try to prioritize the user-level traceback, if any.
8187 def rank_error (msg ):
8288 if 'work item was attempted' in msg :
@@ -91,11 +97,24 @@ def rank_error(msg):
9197 # If get() is called very soon after Create() the response may not contain
9298 # an initialized 'currentState' field.
9399 if response .currentState is not None :
94- logging .info ('Job %s is in state %s.' , job_id ,
95- str (response .currentState ))
100+ if response .currentState != last_job_state :
101+ logging .info ('Job %s is in state %s' , job_id , response .currentState )
102+ last_job_state = response .currentState
96103 if str (response .currentState ) != 'JOB_STATE_RUNNING' :
97- break
98- time .sleep (5.0 )
104+ # Stop checking for new messages on timeout, explanatory
105+ # message received, success, or a terminal job state caused
106+ # by the user that therefore doesn't require explanation.
107+ if (final_countdown_timer_secs <= 0.0
108+ or last_error_msg is not None
109+ or str (response .currentState ) == 'JOB_STATE_DONE'
110+ or str (response .currentState ) == 'JOB_STATE_CANCELLED'
111+ or str (response .currentState ) == 'JOB_STATE_UPDATED'
112+ or str (response .currentState ) == 'JOB_STATE_DRAINED' ):
113+ break
114+ # The job has failed; ensure we see any final error messages.
115+ sleep_secs = 1.0 # poll faster during the final countdown
116+ final_countdown_timer_secs -= sleep_secs
117+ time .sleep (sleep_secs )
99118
100119 # Get all messages since beginning of the job run or since last message.
101120 page_token = None
@@ -463,7 +482,7 @@ def run_Read(self, transform_node):
463482 standard_options = (
464483 transform_node .inputs [0 ].pipeline .options .view_as (StandardOptions ))
465484 if not standard_options .streaming :
466- raise ValueError ('PubSubSource is currently only available for use in '
485+ raise ValueError ('PubSubSource is currently available for use only in '
467486 'streaming pipelines.' )
468487 step .add_property (PropertyNames .PUBSUB_TOPIC , transform .source .topic )
469488 if transform .source .subscription :
@@ -534,7 +553,7 @@ def run__NativeWrite(self, transform_node):
534553 standard_options = (
535554 transform_node .inputs [0 ].pipeline .options .view_as (StandardOptions ))
536555 if not standard_options .streaming :
537- raise ValueError ('PubSubSink is currently only available for use in '
556+ raise ValueError ('PubSubSink is currently available for use only in '
538557 'streaming pipelines.' )
539558 step .add_property (PropertyNames .PUBSUB_TOPIC , transform .sink .topic )
540559 else :
0 commit comments