spark scala scala.MatchError класса scala.коллекция.неизменный.$colon$colon

Я строю карту столбца Struct, запрашивая и объединяя столбцы таблицы hive. Позже я сгруппирую эти записи в столбце id, чтобы построить связанные карты для этих ID. Это будет позже присоединено к другим Dataframe, прежде чем записать его обратно в таблицу hive.

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType, StructField, StringType, MapType, ArrayType, LongType}
import scala.collection.Map
import scala.collection.JavaConversions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.rdd.RDD

val eschema = new StructType(Array(StructField("id", LongType, nullable = false), StructField("DEFINITION", MapType(StringType, StructType(List(StructField("a",LongType,true), StructField("b", StringType, true), StructField("c",StringType,true), StructField("d",StringType,true), StructField("e",StringType,true), StructField("f",StringType,true), StructField("g",StringType,true), StructField("h",StringType,true), StructField("i",StringType,true), StructField("j",StringType,true), StructField("k",StringType,true))))) ))
val etrans = sqlContext.sql("""select id, map(table.col1, named_struct("a", table.col2, "b", table.col3, "c", table.col4, "d", table.col5, "e", table.col6, "f", table.col7, "g", table.col8, "h", table.col9, "i", table.col10, "j", table.col11, "k", table.col12)) AS DEFINITION from table""")
val aggregatedRdd: RDD[Row] = etrans.rdd.groupBy(r => r.getAs[Long]("id")).map(row => Row(row._1, row._2.map(_.getAs[Map[String, List[(String, Any)]]]("DEFINITION")).toList))
val aggregatedDf = sqlContext.createDataFrame(aggregatedRdd, eschema)
aggregatedDf.registerTempTable("event")
aggregatedDf.printSchema()
aggregatedDf.show()             

Я сталкиваюсь со следующей ошибкой матча

ERROR Executor: Exception in task 0.0 in stage 83.0 (TID 3652)
scala.MatchError: List(Map(qwe -> [204,,abc,,positive,False,everywhere,always_record,counter,xyz,disabled]), Map(N/A -> [20,,something,,null,null,null,null,null,null,null]), Map(xyz -> [220,,something,,positive,False,everywhere,always_record,counter,xyz,enabled])) (of class scala.collection.immutable.$colon$colon)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$MapConverter.toCatalystImpl(CatalystTypeConverters.scala:201)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$MapConverter.toCatalystImpl(CatalystTypeConverters.scala:193)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)

1 ответ

  1. Класс В MatchError:

    class scala.collection.immutable.$colon$colon
    

    Это тип, реализующий ArrayType:

    Я думаю, что проблема в том, что ArrayType не классифицирует приведение к списку [(String, Any)]:

    _.getAs[Map[String, List[(String, Any)]]]("DEFINITION"))
    

    Код, используемый в getAs (), не рекурсивный, а просто asInstanceOf[Map[…. :

    Это определение getMap (), которое может работать лучше:

    def getMap[K, V](i: Int): scala.collection.Map[K, V] = getAs[Map[K, V]](i)
    

    Затем вы могли бы иметь вторичный класс cast, который пробегает по значениям карты и приводит от неизвестного V к списку[(String, Any)].