DataFrame 은 Row 타입의 레코드와 여러 컬럼으로 구성된다.
스키마는 각 컬럼명과 데이터 타입을 정의 한다.
DataFrame 의 파티셔닝은 DataFrame 이 클러스터에서 물리적으로 배치 되는 형태를 정의한다.
파티셔닝 스키마는 파티션을 배치하는 방법을 정의함
- 스키마
스키마는 여러개의 StructFiedld 타입 필드로 구성된 StructType 객체다
스키마를 StructType 으로 직접 정의할 수 있는데,
스키마의 데이터 타입과 데이터가 일치하지 않으면 런타임에서 에러를 발생시킨다.
스파크는 자체 데이터 타입을 사용하므로 프로그래밍 언어의 데이터 타입을
스파크의 데이터 타입으로 설정 할 수 없다.
- 표현식으로 컬럼 표현
표현식은 DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미한다.
컬럼과 컬럼의 트랜스포메이션은 파싱된 표현식과 동일한 논리적 실행 계획을 갖아서
expr("someCol - 5"), expr("someCol") - 5 등이 모두 같은 트랜스포메이션 과정을 거치게 된다.
DataFrame 의 트랜스포메이션
일반적으로 특정 컬럼값을 변경하고 그 결과를 반환 할 수 있다.
- DataFrame 생성하기
createOrReplaceTempView 로 임시 뷰로 등록할 수 있고,
Row 객체를 가진 Seq 타입을 직접 변환해 DataFrame 을 생성 할 수 도 있다.
-> Seq() 데이터를 parallelize 후 나온 RDD 를 createDataFrame 으로 생성 할 수 있다.
이제 유용하게 사용할 수 있는 메서드를 알아보자
- select 와 selectExpr
select 와 selectExpr 를 사용하면 테이블에 SQL 을 실행하는 것처럼 dataframe 에서도 사용 할 수 있다.
df.select("컬럼 명").show()
expr 는 유연한 참조 방법이다. 단순 컬럼 참조나 문자열을 이용해 컬럼을 참조 할 수 있다.
select(expr("컬럼 명 as alias 컬럼명")).alias("변경할 컬럼 명")
처럼 유연하게 참조를 할 수 있다.
selectExpr 는 복잡한 표현식을 간단하게 만들 수 있다.
위 예제를 들어보면,
selectExpr("컬럼 명 as alias 컬럼 명", "변경 할 컬럼 명")
이 뿐만 아니라 selectExpr 를 이용해 컬럼 추가 등 여러 기능을 추가 할 수 있다.
- 컬럼 추가하기
withColumn 을 사용해 추가할수있다.
df.withColumn("추가할 컬럼 명", "값")
-컬럼 변경하기
withColumnRenamed 메서드로 컬럼명을 변경 할 수 있다.
-컬럼 제거하기
df.drop("컬럼 명")
-로우 필터링 하기
df.filter(col("컬럼 명") < 2)
- 고유한 로우 얻기
df.select("컬럼명").distinct()
- 임의 분할 하기
머신러닝에서 사용할 학습셋, 검증셋, 테스트셋을 만들때 주로 사용한다.
df.randomSplit(Array(0.25, 0.75), seed)
임의성을 가지도록 설계되었으므로 시드값을 설정해야 한다.
임의의 값을 seed에 넣어야한다.
총합 1이 되도록 비율을 지정해준다.
- 로우 정렬하기
sort와 orderby 를 이용해 정렬 할 수 있다.
완전히 같은 방식으로 동작한다.
df.sort("컬럼명")
df.orderBy(expr("컬럼명 desc"))
트랜스포메이션을 처리하기 전에 성능 최적화를 위해
파티션별 정렬을 수행하기도 한다.
파티션별 정렬은 sortWithinPartitions 메소드를 사용한다.
spark.read.format("json").load("json 파일 위치")
.sortWithinPartitions("컬럼명")
- repartition과 coalesce
최적화 기법으로 자주 필터링 하는 컬럼을 기준으로 데이터를 분할 하는 것
파티셔닝 스키마와 파티션 수를 포함해 클러스터의 물리적 데이터 구성을 제어 할 수 있다.
repartition을 호출하면 전체 데이터를 셔플한다.
- 향 후에 사용할 파티션 수가 현재 파티션 수 보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용해야 한다.
df.repartition("파티션 수")
df.reapartition("파티션 수", "기준 컬럼 명")
coalesce 는 셔플 하지 않고 파티션을 병합할 때 사용된다.
셔플을 수행해 5개의 파티션으로 나누고 셔플 없이 병합하는 예제
df.repartition(5, "컬럼 명").coalesce(2)
'데이터처리 > spark' 카테고리의 다른 글
데이터 소스 (0) | 2021.05.14 |
---|---|
구조적 API (0) | 2021.05.12 |
[spark definitive guide] 스파크 기능 둘러보기 (0) | 2021.02.03 |
[spark definitive guide] - 스파크 간단히 살펴보기 (0) | 2021.02.02 |
[Spark definitive guide] - 아파치 스파크란 (0) | 2021.02.02 |