Существует два способа преобразования между DataFrame и RDD в Spark SQL:
DataFrame Информация о структуре данных в Scheme
(Условия использования)известныйдобрыйиз Схема, использование этого метода на основе отражения сделает код более кратким и эффект будет лучше.существовать Scala в, использовать case class Тип импорта RDD и преобразован в DataFrame, через case class создавать Schema,case class Имена параметров будут использоваться в качестве имен столбцов с использованием механизма отражения. случай class Могут быть вложены и объединены в Sequences или Множество. Этот вид RDD можно эффективно преобразовать в DataFrame и зарегистрирован как таблица.
Во-вторых, при необходимости RDD и DFS или DS Для работы между ними необходимо ввести import sqlContext.implicits._
SparkSession да Spark 2.0 представил концепции, которые инкапсулируют SQLContext и HiveContext。
package sparksql
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object DataFrametoRDDofReflection {
def main(args: Array[String]): Unit = {
}
def method1():Unit = {
val sparkConf = new SparkConf().setAppName("DataFrametoRDDofReflection").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
// представлять sqlContext.implicits._
import sqlContext.implicits._
// Воля RDD Преобразовать в DataFrame
/*val people = sc.textFile("people.txt").toDF()*/
val people = sc.textFile("people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()
people.show()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()
// DataFrame Преобразовать в RDD Выполнить операцию: получить значение на основе индексного номера
teenagers.map(t=>"Name:" + t(0)).collect().foreach(println)
// DataFrame Преобразовать в RDD Выполните операцию: получите значение по имени поля
teenagers.map(t=>"Name:" + t.getAs[String]("name")).collect().foreach(println)
// DataFrame Преобразовать в RDD Выполнить операцию: вернуть сразу несколько столбцов значений
teenagers.map(_.getValuesMap[Any](List("name","age"))).collect().foreach(println)
sc.stop()
}
/**
* определение Person добрый
* @param name Имя
* @param age возраст
*/
case class Person(name:String,age:Int)
}
проходить Spark SQL изинтерфейссоздавать RDD из Schema,Такой подход делает код более многословным. Преимущества этого метода,существует знает данные из столбца и столбца из хорошего типа из регистра во время выполнения,Схемы могут генерироваться динамически. Для создания DataFrame вы можете выполнить следующие три шага:
package sparksql
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object DataFrametoRDDofInterface {
def main(args: Array[String]): Unit = {
method2()
}
def method2(): Unit = {
val sparkConf = new SparkConf().setAppName("DataFrametoRDDofInterface").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val people = sc.textFile("people.txt")
// в строке определения пути DataFrame из Schema информация
val schemaString = "name age"
// Требуется импорт издобрый
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}
// По строке с момента определенияиз schema информацияпроизводить DataFrame из Schema
val schema = StructType(
schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,true)))
// Воля RDD преобразован в Row
val rowRDD = people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))
// Воля Schema Действуйте RDD начальство
val peopleDataFrame = sqlContext.createDataFrame(rowRDD,schema)
// Воля DataFrame Зарегистрировать как временную таблицу
peopleDataFrame.registerTempTable("people")
// получать name Поле значения
val results = sqlContext.sql("SELECT name FROM people")
results.map(t => "Name" + t(0)).collect().foreach(println)
sc.stop()
}
}