Skip to content

Commit d547c76

Browse files
aglinxinyuankunwp1
andauthored
feat: add File Scan From Input operator (#4369)
### What changes were proposed in this PR? This PR adds a new File Scan From Input operator that reads file path strings from upstream tuples and uses them as source input at execution time. Both the File Scan From Input operator and the File Scan operator were moved to the "file" subfolder. <img width="1093" height="799" alt="image" src="https://github.com/user-attachments/assets/ffb9cc99-a268-4dce-8bec-27a30d81991d" /> ### Any related issues, documentation, discussions? Closes #4368 ### How was this PR tested? Tested manually, and a new test case was added. ### Was this PR authored or co-authored using generative AI tooling? No. --------- Signed-off-by: Xinyuan Lin <xinyual3@uci.edu> Co-authored-by: Kunwoo (Chris) <143021053+kunwp1@users.noreply.github.com>
1 parent ea1e9d1 commit d547c76

11 files changed

Lines changed: 385 additions & 58 deletions

File tree

common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ import org.apache.texera.amber.operator.source.apis.twitter.v2.{
7777
}
7878
import org.apache.texera.amber.operator.source.dataset.FileListerSourceOpDesc
7979
import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc
80-
import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc
8180
import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc
8281
import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
8382
import org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc
@@ -140,6 +139,7 @@ import org.apache.texera.amber.operator.visualization.waterfallChart.WaterfallCh
140139
import org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc
141140
import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder, ToStringBuilder}
142141
import org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc
142+
import org.apache.texera.amber.operator.source.scan.file.{FileScanOpDesc, FileScanSourceOpDesc}
143143
import org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc
144144

145145
import java.util.UUID
@@ -165,6 +165,7 @@ trait StateTransferFunc
165165
// new Type(value = classOf[ParallelCSVScanSourceOpDesc], name = "ParallelCSVFileScan"),
166166
new Type(value = classOf[JSONLScanSourceOpDesc], name = "JSONLFileScan"),
167167
new Type(value = classOf[FileScanSourceOpDesc], name = "FileScan"),
168+
new Type(value = classOf[FileScanOpDesc], name = "FileScanOp"),
168169
new Type(value = classOf[TextInputSourceOpDesc], name = "TextInput"),
169170
new Type(
170171
value = classOf[TwitterFullArchiveSearchSourceOpDesc],
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.texera.amber.operator.source.scan.file
21+
22+
import com.fasterxml.jackson.annotation.JsonProperty
23+
import com.kjetland.jackson.jsonSchema.annotations.{
24+
JsonSchemaInject,
25+
JsonSchemaString,
26+
JsonSchemaTitle
27+
}
28+
import org.apache.texera.amber.core.executor.OpExecWithClassName
29+
import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
30+
import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
31+
import org.apache.texera.amber.core.workflow.{
32+
InputPort,
33+
OutputPort,
34+
PhysicalOp,
35+
SchemaPropagationFunc
36+
}
37+
import org.apache.texera.amber.operator.metadata.annotations.HideAnnotation
38+
import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo}
39+
import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
40+
import org.apache.texera.amber.operator.source.scan.FileDecodingMethod
41+
import org.apache.texera.amber.operator.source.scan.text.TextSourceOpDesc
42+
import org.apache.texera.amber.util.JSONUtils.objectMapper
43+
44+
class FileScanOpDesc extends SourceOperatorDescriptor with TextSourceOpDesc {
45+
@JsonProperty(defaultValue = "UTF_8", required = true)
46+
@JsonSchemaTitle("Encoding")
47+
var fileEncoding: FileDecodingMethod = FileDecodingMethod.UTF_8
48+
49+
@JsonProperty(defaultValue = "false")
50+
@JsonSchemaTitle("Extract")
51+
val extract: Boolean = false
52+
53+
@JsonProperty(defaultValue = "false")
54+
@JsonSchemaTitle("Include Filename")
55+
var outputFileName: Boolean = false
56+
57+
override def getPhysicalOp(
58+
workflowId: WorkflowIdentity,
59+
executionId: ExecutionIdentity
60+
): PhysicalOp = {
61+
PhysicalOp
62+
.sourcePhysicalOp(
63+
workflowId,
64+
executionId,
65+
operatorIdentifier,
66+
OpExecWithClassName(
67+
"org.apache.texera.amber.operator.source.scan.file.FileScanOpExec",
68+
objectMapper.writeValueAsString(this)
69+
)
70+
)
71+
.withInputPorts(operatorInfo.inputPorts)
72+
.withOutputPorts(operatorInfo.outputPorts)
73+
.withPropagateSchema(
74+
SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> sourceSchema()))
75+
)
76+
}
77+
78+
override def sourceSchema(): Schema = {
79+
var schema = Schema()
80+
if (outputFileName) {
81+
schema = schema.add("filename", AttributeType.STRING)
82+
}
83+
schema.add(attributeName, attributeType.getType)
84+
}
85+
86+
override def operatorInfo: OperatorInfo =
87+
OperatorInfo(
88+
userFriendlyName = "File Scan From Input",
89+
operatorDescription = "Scan data from file paths provided by input tuples",
90+
operatorGroupName = OperatorGroupConstants.INPUT_GROUP,
91+
inputPorts = List(InputPort(displayName = "Filename")),
92+
outputPorts = List(OutputPort())
93+
)
94+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.texera.amber.operator.source.scan.file
21+
22+
import org.apache.texera.amber.core.executor.OperatorExecutor
23+
import org.apache.texera.amber.core.storage.FileResolver
24+
import org.apache.texera.amber.core.tuple.{Tuple, TupleLike}
25+
import org.apache.texera.amber.util.JSONUtils.objectMapper
26+
27+
class FileScanOpExec private[scan] (
28+
descString: String
29+
) extends OperatorExecutor {
30+
private val desc: FileScanOpDesc =
31+
objectMapper.readValue(descString, classOf[FileScanOpDesc])
32+
33+
override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
34+
val originalFileName = tuple.getFields.collectFirst { case s: String => s }.get
35+
val fileName = FileResolver.resolve(originalFileName).toASCIIString
36+
FileScanUtils.createTuplesFromFile(
37+
fileName = fileName,
38+
displayFileName = originalFileName,
39+
attributeType = desc.attributeType,
40+
fileEncoding = desc.fileEncoding,
41+
extract = desc.extract,
42+
outputFileName = desc.outputFileName,
43+
fileScanOffset = desc.fileScanOffset,
44+
fileScanLimit = desc.fileScanLimit
45+
)
46+
}
47+
48+
}

common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpDesc.scala renamed to common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.texera.amber.operator.source.scan
20+
package org.apache.texera.amber.operator.source.scan.file
2121

2222
import com.fasterxml.jackson.annotation.{JsonIgnoreProperties, JsonProperty}
2323
import com.kjetland.jackson.jsonSchema.annotations.{
@@ -31,6 +31,7 @@ import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, Workflow
3131
import org.apache.texera.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc}
3232
import org.apache.texera.amber.operator.metadata.annotations.HideAnnotation
3333
import org.apache.texera.amber.operator.source.scan.text.TextSourceOpDesc
34+
import org.apache.texera.amber.operator.source.scan.{FileDecodingMethod, ScanSourceOpDesc}
3435
import org.apache.texera.amber.util.JSONUtils.objectMapper
3536

3637
@JsonIgnoreProperties(value = Array("limit", "offset", "fileEncoding"))
@@ -73,7 +74,7 @@ class FileScanSourceOpDesc extends ScanSourceOpDesc with TextSourceOpDesc {
7374
executionId,
7475
operatorIdentifier,
7576
OpExecWithClassName(
76-
"org.apache.texera.amber.operator.source.scan.FileScanSourceOpExec",
77+
"org.apache.texera.amber.operator.source.scan.file.FileScanSourceOpExec",
7778
objectMapper.writeValueAsString(this)
7879
)
7980
)
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.texera.amber.operator.source.scan.file
21+
22+
import org.apache.texera.amber.core.executor.SourceOperatorExecutor
23+
import org.apache.texera.amber.core.tuple.TupleLike
24+
import org.apache.texera.amber.util.JSONUtils.objectMapper
25+
26+
import java.io.IOException
27+
28+
class FileScanSourceOpExec private[scan] (
29+
descString: String
30+
) extends SourceOperatorExecutor {
31+
private val desc: FileScanSourceOpDesc =
32+
objectMapper.readValue(descString, classOf[FileScanSourceOpDesc])
33+
34+
@throws[IOException]
35+
override def produceTuple(): Iterator[TupleLike] = {
36+
FileScanUtils.createTuplesFromFile(
37+
fileName = desc.fileName.get,
38+
attributeType = desc.attributeType,
39+
fileEncoding = desc.fileEncoding,
40+
extract = desc.extract,
41+
outputFileName = desc.outputFileName,
42+
fileScanOffset = desc.fileScanOffset,
43+
fileScanLimit = desc.fileScanLimit
44+
)
45+
}
46+
}

0 commit comments

Comments
 (0)