-
Notifications
You must be signed in to change notification settings - Fork 618
[VL] pyarrow UDF is broken #12280
Open
Description
Backend
VL (Velox)
Bug description
Backend
VL (Velox)
Bug description
code:
@udf(returnType='int', useArrow=True) # An Arrow Python UDF def arrow_slen(s): # type: ignore[no-untyped-def] return len(s) df=spark.read.parquet("s3a://workload/tpch_sf100_parquet_zstd/lineitem/part-00023.zstd.parquet") df.select(arrow_slen("l_shipmode").alias("ship_len")).agg(F.max("ship_len")).show()
The error msg:
org.apache.gluten.exception.GlutenException: org.apache.gluten.exception.GlutenException: Error during calling Java code from native code: org.apache.gluten.exception.GlutenException: org.apache.gluten.exception.GlutenException: Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: Operator::getOutput failed for [operator: TableScan, plan node ID: value-stream:0]: Error during calling Java code from native code: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/lib64/python3.12/site-packages/pyarrow/ipc.py", line 187, in open_stream
return RecordBatchStreamReader(source, options=options,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib64/python3.12/site-packages/pyarrow/ipc.py", line 52, in __init__
self._open(source, options=options, memory_pool=memory_pool)
File "pyarrow/ipc.pxi", line 1092, in pyarrow.lib._RecordBatchStreamReader._open
File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
OSError: Invalid IPC stream: negative continuation token
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:581)
at org.apache.spark.api.python.ColumnarArrowPythonRunner$$anon1ドル.read(ColumnarArrowEvalPythonExec.scala:128)
at org.apache.spark.api.python.ColumnarArrowPythonRunner$$anon1ドル.read(ColumnarArrowEvalPythonExec.scala:78)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon9ドル.hasNext(Iterator.scala:583)
at scala.collection.Iterator$$anon16ドル.hasNext(Iterator.scala:823)
at scala.collection.Iterator$$anon9ドル.hasNext(Iterator.scala:583)
at org.apache.gluten.iterator.IteratorsV1$IteratorCompleter.hasNext(IteratorsV1.scala:66)
at org.apache.gluten.iterator.IteratorsV1$PayloadCloser.hasNext(IteratorsV1.scala:38)
at org.apache.gluten.iterator.IteratorsV1$ReadTimeAccumulator.hasNext(IteratorsV1.scala:122)
at scala.collection.Iterator$$anon9ドル.hasNext(Iterator.scala:583)
at scala.collection.Iterator$$anon9ドル.hasNext(Iterator.scala:583)
at org.apache.gluten.iterator.IteratorsV1$InvocationFlowProtection.hasNext(IteratorsV1.scala:154)
at org.apache.gluten.iterator.IteratorsV1$ReadTimeAccumulator.hasNext(IteratorsV1.scala:122)
at org.apache.gluten.iterator.IteratorsV1$IteratorCompleter.hasNext(IteratorsV1.scala:66)
at scala.collection.Iterator$$anon9ドル.hasNext(Iterator.scala:583)
at org.apache.gluten.iterator.IteratorsV1$ReadTimeAccumulator.hasNext(IteratorsV1.scala:122)
at scala.collection.Iterator$$anon9ドル.hasNext(Iterator.scala:583)
at scala.collection.Iterator$$anon9ドル.hasNext(Iterator.scala:583)
at org.apache.gluten.iterator.IteratorsV1$InvocationFlowProtection.hasNext(IteratorsV1.scala:154)
at org.apache.gluten.iterator.IteratorsV1$ReadTimeAccumulator.hasNext(IteratorsV1.scala:122)
at org.apache.gluten.iterator.IteratorsV1$IteratorCompleter.hasNext(IteratorsV1.scala:66)
at scala.collection.Iterator$$anon9ドル.hasNext(Iterator.scala:583)
at scala.collection.convert.JavaCollectionWrappers$IteratorWrapper.hasNext(JavaCollectionWrappers.scala:32)
at org.apache.gluten.vectorized.ColumnarBatchInIterator.hasNext(ColumnarBatchInIterator.java:36)
at org.apache.gluten.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native Method)
at org.apache.gluten.vectorized.ColumnarBatchOutIterator.hasNext0(ColumnarBatchOutIterator.java:67)
at org.apache.gluten.iterator.ClosableIterator.hasNext(ClosableIterator.java:36)
at scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46)
at org.apache.gluten.iterator.IteratorsV1$InvocationFlowProtection.hasNext(IteratorsV1.scala:154)
at org.apache.gluten.iterator.IteratorsV1$IteratorCompleter.hasNext(IteratorsV1.scala:66)
at org.apache.gluten.iterator.IteratorsV1$PayloadCloser.hasNext(IteratorsV1.scala:38)
at org.apache.gluten.iterator.IteratorsV1$LifeTimeAccumulator.hasNext(IteratorsV1.scala:95)
at org.apache.gluten.iterator.IteratorsV1$ReadTimeAccumulator.hasNext(IteratorsV1.scala:122)
at scala.collection.Iterator$$anon9ドル.hasNext(Iterator.scala:583)
at scala.collection.convert.JavaCollectionWrappers$IteratorWrapper.hasNext(JavaCollectionWrappers.scala:32)
at org.apache.gluten.vectorized.ColumnarBatchInIterator.hasNext(ColumnarBatchInIterator.java:36)
at org.apache.gluten.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native Method)
at org.apache.gluten.vectorized.ColumnarBatchOutIterator.hasNext0(ColumnarBatchOutIterator.java:67)
at org.apache.gluten.iterator.ClosableIterator.hasNext(ClosableIterator.java:36)
at scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46)
at org.apache.gluten.iterator.IteratorsV1$InvocationFlowProtection.hasNext(IteratorsV1.scala:154)
at org.apache.gluten.iterator.IteratorsV1$ReadTimeAccumulator.hasNext(IteratorsV1.scala:122)
at org.apache.gluten.iterator.IteratorsV1$IteratorCompleter.hasNext(IteratorsV1.scala:66)
at org.apache.gluten.iterator.IteratorsV1$PayloadCloser.hasNext(IteratorsV1.scala:38)
at scala.collection.Iterator$$anon9ドル.hasNext(Iterator.scala:583)
at scala.collection.Iterator$$anon9ドル.hasNext(Iterator.scala:583)
at org.apache.spark.shuffle.ColumnarShuffleWriter.internalWrite(ColumnarShuffleWriter.scala:135)
at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:316)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
at org.apache.spark.scheduler.Task.run(Task.scala:147)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run5ドル(Executor.scala:647)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Gluten version
main branch
Spark version
spark-4.0.x
Spark configurations
No response
System information
No response
Relevant logs
Metadata
Metadata
Assignees
Type
Fields
Give feedbackNo fields configured for issues without a type.