Spark 완벽 가이드 10장 Spark SQL
이전 스터디에서도 DataFrme API로 표현된 코드와 함께 SQL로 변환된 코드들도 함께 봤었지만 10장에서는 Spark SQL에서 꼭 알아둬야하는 핵심내용들과 함께 몇 가지 예시를 추가적으로 제공한다.
Spark SQL
- Spark SQL은 DB에서 생성된 View나 Table에 SQL 질의문을 실행할 수 있으며 시스템 함수나 사용자 정의 함수를 사용할 수도 있다.
Spark SQL은 DataFrame과 Datset API에 통합되어 있다.
- 데이터 변환 시 SQL과 DataFrame의 기능 모두를 사용할 수 있다.
- SQL과 DataFrame 방식 모두 동일한 코드로 컴파일된다.
Spark와 Hive
Hive
- Spark SQL 이전에 사용되던 빅데이터 계의 SQL 접근 계층
- 사실상 표준처럼 사용되었다.
- Facebook에서 개발됐다.
Spark SQL
Hive와의 호환성
- ANSI-SQL과 HiveQL을 모두 지원하는 자체 개발된 SQL Parser가 포함되어 있다.
DataFrame과의 뛰어난 호환성
-
- 기존에 사용하고 분석 시스템에서 Hive를 Spark SQL로 대체
- Spark pipeline 사용 시 Hive pipeline 대비 엄청난 성능 개선이 있었다.
^cputime사진출처
주의할 점
- Spark SQL은 온라인 트랜잭션 처리(Online transaction processing, OLTP)[^oltp]를 위한 데이터베이스가 아니라 온라인 분석 처리(Online Analytical Processing, OLAP)[^olap]를 위한 데이터베이스로 작동한다.
- 그렇기 때문에 낮은 지연시간을 요구하는 쿼리를 수행하기에는 적합하지 않다.
Spark SQL과 Hive와의 관계
- Spark SQL은 Hive Metastore[^hivemetastore]를 사용하기 때문에 Hive와 용이하다.
- Spark SQL에서는 조회활 파일 수를 최소화하기 위해 Hive Metastore에 접속한 후 Metadata를 참조한다.
- Hive와 연동하기 위해서는 공식문서를 참고해서 몇 가지 설정을 완료해야한다.
Spark SQL 접근법
Spark SQL CLI
여타 다른 DBMS에 쿼리를 날리듯이 SQL 문을 작성하면 된다.spark-sql프로그래밍 SQL 인터페이스 Programming SQL interface 실습 노트북
spark.sql("SELECT 1+1").show()
Spark SQL Thrift JDBC/ODBC 서버
- Spark는 자바 데이터베이스 연결[^jdbc] 인터페이스를 제공한다.
- Thrift JDBC/ODBC[^odbc] 서버는 HiveServer2 기반으로 만들어졌다.
사용자는 Thrift JDBC/ODBC 서버를 경유해 SQL문을 실행할 수 있다.
Thrift 서버 실행
$SPARK_HOME/sbin/start-thriftserver.sh # 환경변수를 통해 Thrift 서버의 주소를 변경할 수 있다. # spark-submit에서 지원하는 모든 명령행 옵션을 사용할 수 있다. export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> $SPARK_HOME/sbin/start-thriftserver.sh \ --master <master-uri> \ ... $SPARK_HOME/sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-poart> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> \ ...Beeline으로 접속
보안 모드로 접속했을 때 비밀번호가 필요하지만, 그렇지 않을 경우 비밀번호는 입력하지 않아도 된다.beeline beeline> !connect jdbc:hive2://localhost:10000
카탈로그(Catalog)
- Spark SQL에서 가장 높은 단계의 추상화
- Table에 저장된 데이터에 대한 Metadata 뿐만 아니라 Database, Table, 함수, View에 대한 정보를 추상화한다.
org.apache.spark.sql.catalog.Catalog패키지로 사용가능하다.- Table, Database, 함수 등을 조회하는 유용한 함수를 제공한다.
테이블(Table)
- 명령을 실행할 데이터의 구조 (DataFrame과 논리적으로 동일하다.)
- 9장에서 배운 조인, 필터링, 집계 등의 여러 데이터 변환 작업을 수행할 수 있다.
- DataFrame과의 차이점
- DataFrame은 Programming 언어에서 정의
- Table은 Database에서 정의한다.
- Table을 생성하면 default Database에 등록된다.
- 주의사항
- Spark 2.x 버전에서는 Table은 비어있지 않다.
- 비어있는 View만 존재한다.
- 그러므로 Table을 지울 시 모든 데이터가 제거된다.
관리형 Table과 외부 Table
Table은 두 가지 중요한 정보를 저장한다.
- Table data
- Table metadata
외부 테이블
- 디스크에 저장된 파일을 이용해서 정의한 Table
관리형 테이블
DataFrame의
saveAsTable메소드를 실행해서 만든 스파크가 관련된 모든 정보를 추적할 수 있는 TablesaveAsTable메소드는spark.sql.warehouse.dir 속성에 정의된 디렉토리 경로에 테이블 데이터를 스파크 포맷으로 변경한 후 저장한다.- 기본 저장경로는
/user/hive/warehouse이다.
테이블 생성하기
주의사항
USING JSON구문을 사용하지 않을 시 Hive의 SerDe 설정을 사용하게 되는데 Spark 자체 직렬화보다 훨씬 느리다.
아래와 같이 데이터를 읽어올 수 있다.
CREATE TABLE flights ( DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG ) USING JSON OPTIONS (path './2015-summary.json')comment를 추가할 수도 있다.
CREATE TABLE flights ( DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING COMMENT "여기가 코멘트", count LONG ) USING CSV OPTIONS (path './2015-summary.csv')SELECT 결과를 테이블로 생성할 수도 있다.
CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights테이블이 없을 시에만 생성할 수도 있다.
CREATE TABLE IF NOT EXISTS flights_from_select AS SELECT * FROM flights파티셔닝된 데이터셋을 저장해 데이터 레이아웃을 제어할 수도 있다.
CREATE TABLE partitioned_flights USING parquet PARTITIONED BY (DEST_COUNTRY_NAME) AS SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 5
외부 테이블 생성하기
- Spark는 외부 테이블의 Metadata를 관리한다.
- 테이블 데이터는 관리하지 않는다.
CREATE EXTERNAL TABLE hive_flights (
DEST_COUNTRY_NAME STRING,
ORIGIN_COUNTRY_NAME STRING,
count LONG
)
ROW FORMAT DELIMITED FIELDS TERMINAED BY ','
LOCATION './data/flight-data-hive'
테이블 데이터 삽입하기
- 테이블의 데이터 삽입은 표준 SQL문법을 따른다.
- 특정 파티션에만 저장하고 싶은 경우 파티션 명세를 추가할 수 있다.
INSERT INTO partitioned_flights
PARTITION (DEST_COUNTRY_NAME=" UNITED STATES ")
SELECT count, ORIGIN_COUNTRY_NAME
FROM flights
WHERE DEST_COUNTRY_NAME=" UNITED STATES "
LIMIT 12
메타데이터 확인하기
테이블 생성시 추가한 코멘트를 확인하기
DESCRIBE TABLE flights_csv파티셔닝 스키마 정보 확인하기
SHOW PARTITIONS partitioned_flights
메타데이터 갱신하기
- 테이블의 메타데이터를 유지해야 최신의 데이터셋을 읽고 있다는 것을 보장할 수 있다.
- 테이블과 관련된 모든 캐싱된 항목을 갱신
REFRESH table partitioned_flights - 카탈로그에서 관리하는 테이블의 파티션 정보를 갱신
MSCK REPAIR TABLE partitioned_flights
테이블 제거하기
- 외부 테이블은 제거할 수 있지만 데이터는 삭제되지 않는다.
-- 테이블 삭제
DROP TABLE flights_csv;
-- 테이블이 존재할 시 삭제
DROP TABLE IF EXISTS flights_csv;
테이블 캐싱하기
테이블 캐싱
CACHE TABLE flights캐쉬된 테이블 제거
UNCACHE TABLE flights
뷰(View)
- View는 사용자에게 Table처럼 보인다.
- 모든 데이터를 새로운 경로에 다시 저장하는 것이 아닌 쿼리 시점에 트랜스포메이션을 수행한다.
쿼리 실행계획을 살펴보면 둘 다 같은 코드로 변환되는 것을 확인할 수 있다.
flights = spark.read.format("json")\ .load("./2015-summary.json") just_usa_df = flights.where("dest_country_name = 'United States'") print(just_usa_df.selectExpr("*").explain)EXPLAIN SELECT * FROM just_usa_view
뷰 생성
기본 뷰 생성
CREATE VIEW just_usa_view AS SELECT * FROM flights WHERE dest_coutnry_name = 'United States'현재 세션에서만 사용가능한 임시 뷰도 만들 수 있다.
CREATE TEMP VIEW just_usa_view_temp AS SELECT * FROM flights WHERE dest_coutnry_name = 'United States'전역적 임시뷰도 생성할 수 있다.(DB상관없이 사용 가능)
CREATE GLOBAL TEMP VIEW just_usa_view_temp AS SELECT * FROM flights WHERE dest_coutnry_name = 'United States'생성된 뷰를 덮어쓸 수도 있다.
CREATE OR REPLACE TEMP VIEW just_usa_view_temp AS SELECT * FROM flights WHERE dest_coutnry_name = 'United States'
뷰 제거하기
TABLE대신VIEW키워드를 사용하면된다.
DROP VIEW IF EXISTS just_usa_view;
데이터베이스(Database)
전체 데이터베이스 목록 확인
SHOW DATABASE데이터베이스 생성하기
CREATE DATABASE test_db데이터베이스 사용하기
use test_db데이터베이스 삭제하가ㅣ
DROP DATABASE IF EXISTS test_db;
복합 데이터 타입
- 표준 SQL과는 거리가 먼 기능
- 구조체, 리스트, 맵 타입이 존재한다.
구조체
- 구조체는 맵에 더 까가운 복합 데이터 타입이다.
여러 컬럼이나 표현식을 괄호로 묶으면 생성된다.
구조체 생성
CREATE VIEW IF NOT EXISTS nested_data AS SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights구조체 조회
-- 구조체 조회 SELECT * FROM nested_data-- 구조체의 개별 칼럼 조회 SELECT country.DEST_COUNTRY_NAME, count FROM nested_data-- 구조체의 모든 개별 칼럼 조회 SELECT country.*, count FROM nested_data
리스트
collect_list, collect_set 함수를 통해 생성할 수 있다.
리스트 조희
SELECT DEST_COUNTRY_NAME AS new_name, collect_list(count) AS flight_counts, collect_set(ORIGIN_COUNTRY_NAME) AS origin_set FROM flights GROUP BY DEST_COUNTRY_NAME-- 리스트의 특정 위치의 아이템 조회 SELECT DEST_COUNTRY_NAME AS new_name, collect_list(count)[0] AS flight_counts, collect_set(ORIGIN_COUNTRY_NAME) AS origin_set FROM flights GROUP BY DEST_COUNTRY_NAMEexplod 함수를 이용한 배열을 로우로 만들기
CREATE OR REPLACE TEMP VIEW flights_agg AS SELECT DEST_COUNTRY_NAME, collect_list(count) AS collected_counts FROM flights GROUP BY DEST_COUNTRY_NAME -- collect 함수와 정확히 반대로 동작한다. -- collect 수행 이전의 DataFrame과 동일한 결과를 반환한다. SELECT explode(collected_counts), DEST_COUNTRY_NAME FROM flights_agg
함수
Spark SQL 사용가능한 함수 조회
SHOW FUNCTIONS SHOW SYSTEM FUNCTIONS SHOW USER FUNCTIONS사용자 정의 함수
def power3(number:Double): Double = number * number * number spark.udf.register("power3", power3(_:Double):Double) SELECT count, power3(count) FROM flights
서브쿼리
- 쿼리 안에 쿼리를 지정할 수 있도록 지원하는 기능
상호연관 서브쿼리
- 서브쿼리가 사용하는 정보가 쿼리 외부 범위에 있는 경우
비상호연관 서브쿼리
- 서브쿼리가 사용하는 정보가 서브쿼리 내부에만 있는 경우
[^oltp]: wikipedia - 온라인 트랜잭션 처리 [^olap]: wikipedia - 온라인 분석 처리 [^hivemetastore]: Hive Metastore는 여러 세션에서 사용할 테이블 정보를 보관하고 있다. [^jdbc]: Java Database Connectivity [^odbc]: Open Database Connectivity


