1. dataframe Method로 select,selectExpr,expr,col 이용하여 컬럼 출력하기(with 집계함수)
- expr을 사용할 때는 expr("") 따옴표안에 표현식이 위치
- selectExpr를 사용할 때도 selectExpr("") 따옴표안에 표현식이 위치
- select와 함수,컬럼 이름으로 쓰고싶을 때는 select(함수("컬럼 이름"))처럼 사용
- select와 함수,col을이용한 컬럼 이름을 명시하고 사용하는 방법은 select(함수(col("컬럼 이름"))) 처럼 사용
2. SQL에서의 조인을 Dataframe Method로 join구현하기
데이터프레임.select.join(조인할 데이터프레임,조인할 표현식,조인 방식).show()
SELECT FROM 처럼 조인할 테이블이 있으면 SELECT 다음에 join 위치
1. 조인 조건을 미리 객체에 저장하기
아래와 같이 joinExpression객체에 조건을 저장해두고 join해도 된다.
2. 조인 조건을 직접 입력하기
아래와 같이 조인 조건을 직접 적어도 된다.
select에 SQL의 table.column처럼 출력하지만, pyspark에서는 dataframe["column"], dataframe.column, col("column name"), column name 처럼 4가지의 방법으로 지정해서 출력할 수 있다.
아래의 링크 참고
2022.03.26 - [BigData/Spark & Spark Tuning] - [Spark] 스파크의 문법적 자유도, 스키마 조작, dummy 생성
데이터프레임.select.join(조인할 데이터프레임,조인할 표현식,조인 방식).where("조건식").show()
#spark에서도 SQL에서와 같이 SELECT FROM WHERE (SQL) 처럼 순서대로 위치
데이터프레임.select.join(조인할 데이터프레임,조인할 표현식,조인 방식).where("조건식").groupBy("열 이름").agg(expr(""),expr("")).show()
#spark에서는 WHERE 다음 GROUP BY 그리고 GROUP BY 뒤에는 AGG가 반드시 나와야 한다.
(그렇지 않으면 GroupedData 객체로만 나오고 .show() 메서드로 조회를 할 수 없다.)
(SQL에서 GROUP BY 를 집계하기위해 쓰듯이 SPARK의 dataframe method도 groupBy 다음에 agg가 나오게 되어있다.)
데이터프레임.join(~~)로 데이터프레임.select()없이 바로 넘어갈시 select(*)인 것 과 같다.
*만약, 조인한 것 중에서 원하는 컬럼만 조회하려면 join 뒤에 select를 하면된다.
3. SQL의 Inline view 에 해당 하는 것을 dataframe으로 구현하기
--SQL
SELECT *
FROM (SELECT DayofMonth,Origin,Dest,count(UniqueCarrier) as delaycount,
ROW_NUMBER () OVER (PARTITION BY DayofMonth ORDER BY count(UniqueCarrier) DESC) as rank FROM dfTable
WHERE ArrTime > CRSArrTime
GROUP BY DayofMonth,Origin,Dest
ORDER BY int(DayofMonth),rank)
WHERE rank < 6;
--뜻: 일자별, 출발지별,도착지별,지연건수가 많은 순으로 5개까지 출력하시오.
Dataframe Method
1. window function을 사용하기위해 관련 모듈을 import해주고, 각 객체(windowSpec,rownum)에 partitionby, row_number를 적용한 결과를 담는다.
2. df2라는 객체에 조건으로 걸러진(inline view)에 해당하는 데이터프레임을 담는다->df2라는 새로운 데이터프레임이 생긴다.
아래와 같이 하나의 코드로 통합 시켜도 상관 없다.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df2=\
df.where(F.col("ArrTime") > F.col("CRSArrTime"))\
.groupBy(F.col("DayofMonth", F.col("Origin"), F.col("Dest"))\
.agg(F.count(F.col("UniqueCarrier")).alias("delaycount")
,F.row_number().over(Window.partitionBy(F.col("DayofMonth")).orderBy(F.count(F.col("UniqueCarrier")))).alias("rank"))\
.orderBy(F.col("DayofMonth").cast("int"),F.col("rank"))
3. df2를 조건걸어서 출력
2022-03-29 수정 -글 포멧, 예시 다듬기
'BigData > Spark & Spark Tuning' 카테고리의 다른 글
HIVE QL(HQL) VS Pyspark (REGEXP_REPLACE 구문 차이정리) (0) | 2021.08.19 |
---|---|
[Spark] virtual box linux [ubuntu 18.04]에 스파크 설치,다운로드 5.ubuntu 에 spark(스파크) 다운로드,설치 (0) | 2021.05.16 |
[BigData] Spark 예제, Dataframe 특징, 설정 값, SQL (0) | 2021.02.17 |
[PySpark] Spark 환경 설정 with Anaconda(Jupyter Notebook) Python(Pyspark Standalone mode) on windows (0) | 2021.02.17 |
[BigData] Spark( RDD vs DataFrame vs Dataset) (0) | 2021.02.15 |
댓글