File tree Expand file tree Collapse file tree
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics Expand file tree Collapse file tree Original file line number Diff line number Diff line change 3030import org .apache .beam .sdk .lineage .LineageOptions ;
3131import org .apache .beam .sdk .metrics .Metrics .MetricsFlag ;
3232import org .apache .beam .sdk .options .PipelineOptions ;
33+ import org .apache .beam .sdk .options .PipelineOptionsFactory ;
3334import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .annotations .VisibleForTesting ;
3435import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Splitter ;
3536import org .checkerframework .checker .nullness .qual .Nullable ;
@@ -122,16 +123,24 @@ private static Lineage createLineage(PipelineOptions options, LineageDirection d
122123
123124 /** {@link Lineage} representing sources and optionally side inputs. */
124125 public static Lineage getSources () {
125- return checkNotNull (
126- sources ,
127- "Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first." );
126+ Lineage localSources = sources ;
127+ if (localSources == null ) {
128+ return createDefaultLineage (LineageDirection .SOURCE );
129+ }
130+ return localSources ;
128131 }
129132
130133 /** {@link Lineage} representing sinks. */
131134 public static Lineage getSinks () {
132- return checkNotNull (
133- sinks ,
134- "Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first." );
135+ Lineage localSinks = sinks ;
136+ if (localSinks == null ) {
137+ return createDefaultLineage (LineageDirection .SINK );
138+ }
139+ return localSinks ;
140+ }
141+
142+ private static Lineage createDefaultLineage (LineageDirection direction ) {
143+ return createLineage (PipelineOptionsFactory .create (), direction );
135144 }
136145
137146 @ VisibleForTesting
You can’t perform that action at this time.
0 commit comments