I've a scenario where 10K+ regular expressions are stored in a table along with various other columns and this needs to be joined against an incoming dataset. Initially I was using "spark sql rlike" method as below and it was able to hold the load until incoming record counts were less than 50K
PS: The regular expression reference data is a broadcasted dataset.
dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column")
Then I wrote a custom UDF to transform them using Scala native regex search as below,
- Below val collects the reference data as Array of tuples.
val regexPreCalcArray: Array[(Int, Regex)] = {
regexDataset.value
.select( "col_1", "regex_column")
.collect
.map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r))
}
Implementation of Regex matching UDF,
def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = {
udf((input_column: String) => {
for {
text <- Option(input_column)
matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true)
if matches.nonEmpty
} yield matches.map(x => x._1).min
}, IntegerType)
}
Joins are done as below, where a unique ID from reference data will be returned from UDF in case of multiple regex matches and joined against reference data using unique ID to retrieve other columns needed for result,
dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column"))
.join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")
But this too gets very slow with skew in execution [1 executor task runs for a very long time] when record count spikes to 1M+, which is obvious as the function runs a for loop. If there are 3k Regex, each input record will be scanned against 3000 regular expressions in worst case, which will be executed 3x times the inputincreases above 1M.
Spark suggests not to use UDF as it would degrade the performance, any other best practises I should apply here or if there's a better API for Scala regex match than what I've written here? or any suggestions to do this efficiently would be very helpful.
I've a scenario where 10K+ regular expressions are stored in a table along with various other columns and this needs to be joined against an incoming dataset. Initially I was using "spark sql rlike" method as below and it was able to hold the load until incoming record counts were less than 50K
PS: The regular expression reference data is a broadcasted dataset.
dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column")
Then I wrote a custom UDF to transform them using Scala native regex search as below,
- Below val collects the reference data as Array of tuples.
val regexPreCalcArray: Array[(Int, Regex)] = {
regexDataset.value
.select( "col_1", "regex_column")
.collect
.map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r))
}
Implementation of Regex matching UDF,
def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = {
udf((input_column: String) => {
for {
text <- Option(input_column)
matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true)
if matches.nonEmpty
} yield matches.map(x => x._1).min
}, IntegerType)
}
Joins are done as below, where a unique ID from reference data will be returned from UDF in case of multiple regex matches and joined against reference data using unique ID to retrieve other columns needed for result,
dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column"))
.join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")
But this too gets very slow with skew in execution [1 executor task runs for a very long time] when record count spikes to 1M+, which is obvious as the function runs a for loop. If there are 3k Regex, each input record will be scanned against 3000 regular expressions in worst case, which will be executed 3x times the input.
Spark suggests not to use UDF as it would degrade the performance, any other best practises I should apply here? or any suggestions to do this efficiently would be very helpful.
I've a scenario where 10K+ regular expressions are stored in a table along with various other columns and this needs to be joined against an incoming dataset. Initially I was using "spark sql rlike" method as below and it was able to hold the load until incoming record counts were less than 50K
PS: The regular expression reference data is a broadcasted dataset.
dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column")
Then I wrote a custom UDF to transform them using Scala native regex search as below,
- Below val collects the reference data as Array of tuples.
val regexPreCalcArray: Array[(Int, Regex)] = {
regexDataset.value
.select( "col_1", "regex_column")
.collect
.map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r))
}
Implementation of Regex matching UDF,
def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = {
udf((input_column: String) => {
for {
text <- Option(input_column)
matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true)
if matches.nonEmpty
} yield matches.map(x => x._1).min
}, IntegerType)
}
Joins are done as below, where a unique ID from reference data will be returned from UDF in case of multiple regex matches and joined against reference data using unique ID to retrieve other columns needed for result,
dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column"))
.join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")
But this too gets very slow with skew in execution [1 executor task runs for a very long time] when record count increases above 1M. Spark suggests not to use UDF as it would degrade the performance, any other best practises I should apply here or if there's a better API for Scala regex match than what I've written here? or any suggestions to do this efficiently would be very helpful.
I've a scenario where 10K+ regular expressions are stored in a table along with various other columns and this needs to be joined against an incoming dataset. Initially I was using "spark sql rlike" method as below and it was able to hold the load until incoming record counts were less than 50K
PS: The regular expression reference data is a broadcasted dataset.
dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column")
Then I wrote a custom UDF to transform them using Scala native regex search as below,
- Below val collects the reference data as Array of tuples.
val regexPreCalcArray: Array[(Int, Regex)] = {
regexDataset.value
.select( "col_1", "regex_column")
.collect
.map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r))
}
Implementation of Regex matching UDF,
def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = {
udf((input_column: String) => {
for {
text <- Option(input_column)
matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true)
if matches.nonEmpty
} yield matches.map(x => x._1).min
}, IntegerType)
}
Joins are done as below, where a unique ID from reference data will be returned from UDF in case of multiple regex matches and joined against reference data using unique ID to retrieve other columns needed for result,
dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column"))
.join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")
But this too gets very slow with skew in execution [1 executor task runs for a very long time] when record count spikes to 1M+, sparkwhich is obvious as the function runs a for loop. If there are 3k Regex, each input record will be scanned against 3000 regular expressions in worst case, which will be executed 3x times the input.
Spark suggests not to use UDF as it would degrade the performance, any other best practises I should apply here? or any suggestions to do this efficiently would be very helpful.
I've a scenario where 10K+ regular expressions are stored in a table along with various other columns and this needs to be joined against an incoming dataset. Initially I was using "spark sql rlike" method as below and it was able to hold the load until incoming record counts were less than 50K
PS: The regular expression reference data is a broadcasted dataset.
dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column")
Then I wrote a custom UDF to transform them using Scala native regex search as below,
- Below val collects the reference data as Array of tuples.
val regexPreCalcArray: Array[(Int, Regex)] = {
regexDataset.value
.select( "col_1", "regex_column")
.collect
.map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r))
}
Implementation of Regex matching UDF,
def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = {
udf((input_column: String) => {
for {
text <- Option(input_column)
matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true)
if matches.nonEmpty
} yield matches.map(x => x._1).min
}, IntegerType)
}
Joins are done as below, where a unique ID from reference data will be returned from UDF in case of multiple regex matches and joined against reference data using unique ID to retrieve other columns needed for result,
dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column"))
.join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")
But this too gets very slow with skew in execution [1 executor task runs for a very long time] when record count spikes to 1M+, spark suggests not to use UDF as it would degrade the performance, any other best practises I should apply here? or any suggestions to do this efficiently would be very helpful.
I've a scenario where 10K+ regular expressions are stored in a table along with various other columns and this needs to be joined against an incoming dataset. Initially I was using "spark sql rlike" method as below and it was able to hold the load until incoming record counts were less than 50K
PS: The regular expression reference data is a broadcasted dataset.
dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column")
Then I wrote a custom UDF to transform them using Scala native regex search as below,
- Below val collects the reference data as Array of tuples.
val regexPreCalcArray: Array[(Int, Regex)] = {
regexDataset.value
.select( "col_1", "regex_column")
.collect
.map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r))
}
Implementation of Regex matching UDF,
def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = {
udf((input_column: String) => {
for {
text <- Option(input_column)
matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true)
if matches.nonEmpty
} yield matches.map(x => x._1).min
}, IntegerType)
}
Joins are done as below, where a unique ID from reference data will be returned from UDF in case of multiple regex matches and joined against reference data using unique ID to retrieve other columns needed for result,
dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column"))
.join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")
But this too gets very slow with skew in execution [1 executor task runs for a very long time] when record count spikes to 1M+, which is obvious as the function runs a for loop. If there are 3k Regex, each input record will be scanned against 3000 regular expressions in worst case, which will be executed 3x times the input.
Spark suggests not to use UDF as it would degrade the performance, any other best practises I should apply here? or any suggestions to do this efficiently would be very helpful.
I've a scenario where 10K+ regular expressions are stored withinin a table along with various other columns and this needs to be joined against an incoming dataset. Initially I was using "spark sql rlike" method as below and it was able to hold the load until incoming record counts were less than 50K
PS: The regular expression reference data is a broadcasted dataset.
dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column")
Then I wrote a custom UDF to transform them using Scala native regex search as below,
- Below val collects the reference data as Array of tuples.
val regexPreCalcArray: Array[(Int, Regex)] = {
regexDataset.value
.select( "col_1", "regex_column")
.collect
.map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r))
}
Implementation of Regex matching UDF,
def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = {
udf((input_column: String) => {
for {
text <- Option(input_column)
matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true)
if matches.nonEmpty
} yield matches.map(x => x._1).min
}, IntegerType)
}
Joins are done as below, where a unique ID from reference data will be retrieved inreturned from UDF in case of multiple regex matches and joined against samereference data withusing unique ID to retrieve other columns needed from reference datafor result,
dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column"))
.join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")
But this is alsotoo gets very slow with skew in execution [1 executor task runs for a very long time] when record count spikes to 1M+, spark suggests not to use UDF as it would degrade the performance, any other best practisepractises I should apply here? Anyor any suggestions to do this efficiently would be very helpful. Thanks in advance.
I've a scenario where 10K+ regular expressions are stored within a table along with various other columns and this needs to be joined against an incoming dataset. Initially I was using "spark sql rlike" method as below and it was able to hold the load until incoming record counts were less than 50K
PS: The regular expression reference data is a broadcasted dataset.
dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column")
Then I wrote a custom UDF to transform them using Scala native regex search as below,
- Below val collects the reference data as Array of tuples.
val regexPreCalcArray: Array[(Int, Regex)] = {
regexDataset.value
.select( "col_1", "regex_column")
.collect
.map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r))
}
Implementation of Regex matching UDF,
def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = {
udf((input_column: String) => {
for {
text <- Option(input_column)
matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true)
if matches.nonEmpty
} yield matches.map(x => x._1).min
}, IntegerType)
}
Joins are done as below where a unique ID from reference data will be retrieved in UDF in case of multiple matches and joined against same data with unique ID to retrieve other columns needed from reference data,
dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column"))
.join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")
But this is also gets very slow with skew in execution [1 executor task runs for a very long time] when record count spikes to 1M+, spark suggests not to use UDF as it would degrade the performance, any other best practise I should apply here? Any suggestions would be helpful. Thanks in advance.
I've a scenario where 10K+ regular expressions are stored in a table along with various other columns and this needs to be joined against an incoming dataset. Initially I was using "spark sql rlike" method as below and it was able to hold the load until incoming record counts were less than 50K
PS: The regular expression reference data is a broadcasted dataset.
dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column")
Then I wrote a custom UDF to transform them using Scala native regex search as below,
- Below val collects the reference data as Array of tuples.
val regexPreCalcArray: Array[(Int, Regex)] = {
regexDataset.value
.select( "col_1", "regex_column")
.collect
.map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r))
}
Implementation of Regex matching UDF,
def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = {
udf((input_column: String) => {
for {
text <- Option(input_column)
matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true)
if matches.nonEmpty
} yield matches.map(x => x._1).min
}, IntegerType)
}
Joins are done as below, where a unique ID from reference data will be returned from UDF in case of multiple regex matches and joined against reference data using unique ID to retrieve other columns needed for result,
dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column"))
.join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")
But this too gets very slow with skew in execution [1 executor task runs for a very long time] when record count spikes to 1M+, spark suggests not to use UDF as it would degrade the performance, any other best practises I should apply here? or any suggestions to do this efficiently would be very helpful.