본문 바로가기
BigData/Spark & Spark Tuning

[BigData] Spark 예제, Dataframe 특징, 설정 값, SQL

by 스파이디웹 2021. 2. 17.
728x90

파일 읽어오기

csv, json, parquet 등등

df = spark.read.format("json").option("header","true").load("주소") //option("header","true")는 해당 열의 이름을 표기
df = spark.read.format("csv").option("inferSchema","true").load("주소")
df = spark.read.format("parquet").load("주소")

option 종류

  • sep: 구분자
    • default: ,
    • Spark 1.6 방식에서는 delimiter를 사용해야 한다
  • encoding
    • default: UTF-8
  • quote: value가 큰 따옴표로 묶인 경우 "를 지정
    • defualt: "
  • escape: 구분자가 value안에 사용된 경우 escape를 처리할 문자
    • default: \
  • charToEscapeQuoteEscaping
    • default: escape or \0
  • comment: 코멘트의 시작을 알리는 문자
    • default: #
  • header: 첫 줄이 data가 아닌 헤더인 경우 "true"로 설정
    • default: false
  • enforceSchema:
    • defalut: true
  • inferSchema: schema를 Spark이 자동으로 알아내는 경우 사용
    • default: schema
  • samplingRatio: schema inferring 시에 data의 얼마를 샘플링할지
    • default: 1.0
  • ignoreLeadingWhiteSpace: value의 앞에 있는 공백을 제거할지 여부
    • default: false
  • ignoreTrailingWhiteSpace: value의 뒤에 있는 공백을 제거할지 여부
    • default: false
  • nullValue: null을 표현하는 문자열
    • default: empty string
  • emptyValue: 공백을 표현하는 문자열
    • default: empty string
  • nanValue: non-number를 표현하는 문자열
    • default: NaN
  • positiveInf: “양의 무한대”를 표현하는 문자열
    • default: Inf
  • negativeInf: “음의 무한대”를 표현하는 문자열
    • default: -Inf
  • dateFormat: 날자 포맷을 지정하는 문자열. java.text.SimpleDateFormat에서 사용하는한 포맷을 지정할 수 있다. date 타입인 필드에서 사용된다
    • default: yyyy-MM-dd
  • timestampFormat: dateFormat과 유사하며, timestamp 필드에서 사용된다
    • default: yyyy-MM-dd'T'HH:mm:ss.SSSXXX
  • maxColumns: 최대 필드 개수
    • default: 20480
  • maxCharsPerColumn: 1개 필드에서 최대 문자열의 길이
    • default: -1 (즉, 제한없음)
  • mode: 에러 처리에 관련된 옵션. 개인적으로 csv를 다를 때 가장 중요한 옵션이라 생각한다
    • PERMISSIVE (default): 잘못된 포맷의 line을 만나면 해당 문자열을 columnNameOfCorruptRecord에서 지정한 필드에 저장하고, 나머지 필드는 모두 null로 설정한다
    • DROPMALFORMED: 잘못된 문자열의 레코드는 무시한다
    • FAILFAST: 잘못된 문자열을 레코드를 만나면 에러를 출력한다
  • columnNameOfCorruptRecord
  • multiLine: 1개 레코드가 여러 line에 걸쳐있는지 지정하는 옵션

DataFrame에 관한 정보

  • df는 sparksession으로 불러온 것이기 때문에 dataframe으로 생성된다.
  • df로 불러올떄 마지막이 load()가 아닌 .show()같은 것으로 끝날 때, 불러온 파일은 dataframe타입이 아닌 nonetype으로 설정이된다.(이유는 모름)
  • inferschema옵션은 default가 true이고 설정을 false로 하지않는 이상 sparksession이 자동으로 스키마를 유추해본다.
  • (false로 할경우 모두 string으로 할당)
  • df.printSchema() (스키마를 따로 설정하지않는 한 자동으로 할당된다.)
  • df.show() 데이터프레임의 데이터들을 조회한다.
  • df.createOrReplaceTempView("dfTable") //객체 df에 불러온 파일을 임시뷰테이블인 이름 dfTable로 변환한다(만든다)
  • (임시 뷰테이블로 만들면 spark.sql("SQL문")으로 즉, SQL로 질의 및 조작이 가능하다)
  • df.write.save("주소") 로 데이터파일을 저장

dataframe 형식으로 변환하기

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
   StructField("some", StringType(), True),
   StructField("col", StringType(), True),
   StructField("names", LongType(), False)])

myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()


"""
+-----+----+-----+

| some| col |names|

+-----+----+-----+

| Hello| null | 1     |

+-----+----+-----+
"""

RDD를 schema를 설정하여 DataFrame으로 변환

from pyspark.sql import Row

schema= df.schema
newRows = [
  Row("a","b",1),
  Row("c","d",2)
]
parallelizedRows =spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)
newDF.show()
print(type(paralleliedRows), type(newDF))

"""
+------------------------+----------------------------+-----+

|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|

+------------------------+----------------------------+-----+

| New Country            | Other Country               | 1    |

| New Country 2         | Other Country 3             | 1    |

+------------------------+----------------------------+-----+
"""


#<class 'pyspark.rdd.RDD'>, <class 'pyspark.sql.dataframe.DataFrame'> 출력

DataFrame 열 추가(concatenate 개념), 열 이름 수정

df.withColumn("추가할 열의 이름", expr("표현식")).show()
  • 기존 df의 열에 열이 추가되어 보여진다.
  • df.show() 를 하게되면 기존의 파일만 보여진다.(lazy한 spark의 특징)
df.withColumnRenamed("기존의 열이름","수정할 열이름").show()
  • 기존의 열 이름이 바뀐 열이름으로 보이게된다. SQL의 alias개념

select, selectexpr는 dataframe을 sql쿼리과 동등하게 질의할 수 있게하는 기능

ex)

df.select("열 이름").show()
df.select("열 이름1, 열이름2").show()

DataFrame select, col, expr, selectExpr

import pyspark.sql import pyspark.sql.functions

df.select(
   expr("열 이름"),
   col("열 이름"),
   column("열 이름")).show()
#모두 같은 열이 출력된다.


df.select(expr("DEST_COUNTRY_NAME AS destination")).show()
#expr로 SQL에서의 alias처럼 별명을 붙일 수 있다

df.select(expr("DEST_COUNTRY_NAME").alias("destiny").show()
#expr + .alias로도 별명을 붙일 수 있다.


df.selectExpr("DEST_COUNTRY_NAME" as newColumnName", "DEST_COUNTRY_NAME").show()
#select과 expr을 결합한형태로도 표현 할 수 있다.

df.selectExpr("*","(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show()
#모든 열+withinCountry로 본국과 도착지가 같은 열에 대한 boolean값 출력


df.withColumn("열 이름","값").show()
#열 추가하기
ex)
df.withColumn("withinCountry",expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show()


df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
#열 이름 별명붙여서 출력(실제로 변하진 않았음. lazy한 spark특징)

dfWithLongColName.selectExpr(
"`This Long Column-Name`",
"`This Long Column-Name` as `new col`")
.show()
#열 이름에 띄어쓰기가 가능할려면 열에 들어가는 문구를 `(backtick)로 감싸줘야한다.

DataFrame 열 제거, 열 필터, 열 조건절

df.drop("열 이름").columns
#열을 삭제하고 열목록을 보여준다.


df.drop("열 이름").show()
#열을 삭제하고 열,데이터를 보여준다.


df.filter(col("count") < 2).show()
#count라는 속성이 2미만인것만 보여준다.


df.where("count < 2").show()
#count라는 속성이 2미만인것만 보여준다.

df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "CROATIA").show()
#count가 2미만이면서 본국적이 크로아티아가 아닌 데이터 출력(조건절이 AND문이다)



df.select("열 이름").distinct().count()
#중복을 제거한 열의 갯수 출력

df.select("열 이름").distinct().show()
#중복을 제거한 열의 데이터 출력

DataFrame에 row를 추가시키려면??

dataframe은 immutable(변경 불가능하다)한 특징,

dataframe에 row(데이터)를 추가시키려면?

-> dataframe을 연결(union, concatenate)해줘야한다. 여기서 2개의 dataframe의 스키마는 동일해야하며, 그렇지 않은경우 오류가난다.

from pyspark.sql import Row

shcema = df.schema
newRows =[
   Row=("New country","Other country", 1),
   Row=("New country2","Other country2",1)
]

parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)


df.union(newDF).where("count = 1").where(col("ORIGIN_COUNTRY_NAME") != "United States").show()
#df에 newDF 데이터프레임을 합치는데 조건은 count가 1이고, 본국적이 미국이 아닌 데이터를 보여준다.



df.union(newDF).createOrReplaceTempView("테이블 이름1")
#유니온된 데이터프레임을 임시뷰테이블에 저장한다.

spark.sql("SELECT * FROM 테이블 이름1").show()
#유니온된 데이터프레임이 저장된 임시뷰테이블을 조회한다.

DataFrame의 정렬

데이터프레임의 데이터들을 정렬하기 위해선 sort, orderby를 사용한다(둘다 완전히 동일하다)(오름차순이 기본값)

df.sort("열 이름").show()
df.orderBy("열 이름1","열 이름2").show()
df.orderBy(col("열 이름1"), col("열 이름2")).show()



from pyspark.sql.functions import desc,asc

df.orderBy(expr("열 이름 desc")).show()
df.orderBy(col("열 이름1").desc(), col("열 이름2").asc()).show()
#expr 와 col의 차이는 expr는 따옴표 안에 desc까지 들어가지만 col은 열 만들어가고 함수를 따로 붙여준다.


df.sortWithinPartitions("열 이름")
#파티션내에 정렬도 가능하다.


df.orderBy(expr("열 이름 desc").limit(10).show()
#열 이름을 기준으로 내림차순하여 10개만 보여준다.

 

 

728x90

댓글