Spark 완벽 가이드 10장 Spark SQL

이전 스터디에서도 DataFrme API로 표현된 코드와 함께 SQL로 변환된 코드들도 함께 봤었지만 10장에서는 Spark SQL에서 꼭 알아둬야하는 핵심내용들과 함께 몇 가지 예시를 추가적으로 제공한다.

Spark SQL

  • Spark SQL은 DB에서 생성된 View나 Table에 SQL 질의문을 실행할 수 있으며 시스템 함수나 사용자 정의 함수를 사용할 수도 있다.
  • Spark SQL은 DataFrame과 Datset API에 통합되어 있다.

    • 데이터 변환 시 SQL과 DataFrame의 기능 모두를 사용할 수 있다.
    • SQL과 DataFrame 방식 모두 동일한 코드로 컴파일된다.

Spark와 Hive

Hive

  • Spark SQL 이전에 사용되던 빅데이터 계의 SQL 접근 계층
  • 사실상 표준처럼 사용되었다.
  • Facebook에서 개발됐다.

Spark SQL

  • Hive와의 호환성

    • ANSI-SQL과 HiveQL을 모두 지원하는 자체 개발된 SQL Parser가 포함되어 있다.
  • DataFrame과의 뛰어난 호환성

  • 페이스북 사용사례

    • 기존에 사용하고 분석 시스템에서 Hive를 Spark SQL로 대체
    • Spark pipeline 사용 시 Hive pipeline 대비 엄청난 성능 개선이 있었다. CPU Time 분석^cputime사진출처
  • 주의할 점

    • Spark SQL은 온라인 트랜잭션 처리(Online transaction processing, OLTP)[^oltp]를 위한 데이터베이스가 아니라 온라인 분석 처리(Online Analytical Processing, OLAP)[^olap]를 위한 데이터베이스로 작동한다.
    • 그렇기 때문에 낮은 지연시간을 요구하는 쿼리를 수행하기에는 적합하지 않다.

Spark SQL과 Hive와의 관계

  • Spark SQL은 Hive Metastore[^hivemetastore]를 사용하기 때문에 Hive와 용이하다.
  • Spark SQL에서는 조회활 파일 수를 최소화하기 위해 Hive Metastore에 접속한 후 Metadata를 참조한다.
  • Hive와 연동하기 위해서는 공식문서를 참고해서 몇 가지 설정을 완료해야한다.

Spark SQL 접근법

  1. Spark SQL CLI
    여타 다른 DBMS에 쿼리를 날리듯이 SQL 문을 작성하면 된다.

    spark-sql
    
  2. 프로그래밍 SQL 인터페이스 Programming SQL interface 실습 노트북

    spark.sql("SELECT 1+1").show()
    

Spark SQL Thrift JDBC/ODBC 서버

  • Spark는 자바 데이터베이스 연결[^jdbc] 인터페이스를 제공한다.
  • Thrift JDBC/ODBC[^odbc] 서버는 HiveServer2 기반으로 만들어졌다.
  • 사용자는 Thrift JDBC/ODBC 서버를 경유해 SQL문을 실행할 수 있다.

    • Thrift 서버 실행

      $SPARK_HOME/sbin/start-thriftserver.sh
      
      # 환경변수를 통해 Thrift 서버의 주소를 변경할 수 있다.
      # spark-submit에서 지원하는 모든 명령행 옵션을 사용할 수 있다.
      export HIVE_SERVER2_THRIFT_PORT=<listening-port>
      export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
      $SPARK_HOME/sbin/start-thriftserver.sh \
        --master <master-uri> \
        ...
      
      $SPARK_HOME/sbin/start-thriftserver.sh \
        --hiveconf hive.server2.thrift.port=<listening-poart> \
        --hiveconf hive.server2.thrift.bind.host=<listening-host> \
        --master <master-uri> \
        ...
      
    • Beeline으로 접속
      보안 모드로 접속했을 때 비밀번호가 필요하지만, 그렇지 않을 경우 비밀번호는 입력하지 않아도 된다.

      beeline
      
      beeline> !connect jdbc:hive2://localhost:10000
      

카탈로그(Catalog)

  • Spark SQL에서 가장 높은 단계의 추상화
  • Table에 저장된 데이터에 대한 Metadata 뿐만 아니라 Database, Table, 함수, View에 대한 정보를 추상화한다.
  • org.apache.spark.sql.catalog.Catalog패키지로 사용가능하다.
  • Table, Database, 함수 등을 조회하는 유용한 함수를 제공한다.

테이블(Table)

  • 명령을 실행할 데이터의 구조 (DataFrame과 논리적으로 동일하다.)
  • 9장에서 배운 조인, 필터링, 집계 등의 여러 데이터 변환 작업을 수행할 수 있다.
  • DataFrame과의 차이점
    • DataFrame은 Programming 언어에서 정의
    • Table은 Database에서 정의한다.
  • Table을 생성하면 default Database에 등록된다.
  • 주의사항
    • Spark 2.x 버전에서는 Table은 비어있지 않다.
    • 비어있는 View만 존재한다.
    • 그러므로 Table을 지울 시 모든 데이터가 제거된다.

관리형 Table과 외부 Table

  • Table은 두 가지 중요한 정보를 저장한다.

    1. Table data
    2. Table metadata
  • 외부 테이블

    • 디스크에 저장된 파일을 이용해서 정의한 Table
  • 관리형 테이블

    • DataFrame의 saveAsTable 메소드를 실행해서 만든 스파크가 관련된 모든 정보를 추적할 수 있는 Table

    • saveAsTable 메소드는spark.sql.warehouse.dir 속성에 정의된 디렉토리 경로에 테이블 데이터를 스파크 포맷으로 변경한 후 저장한다.

    • 기본 저장경로는 /user/hive/warehouse이다.

테이블 생성하기

  • 주의사항

    • USING JSON 구문을 사용하지 않을 시 Hive의 SerDe 설정을 사용하게 되는데 Spark 자체 직렬화보다 훨씬 느리다.
  • 아래와 같이 데이터를 읽어올 수 있다.

    CREATE TABLE flights (
    DEST_COUNTRY_NAME STRING,
    ORIGIN_COUNTRY_NAME STRING,
    count LONG
    )
    USING JSON
    OPTIONS (path './2015-summary.json')
    
  • comment를 추가할 수도 있다.

    CREATE TABLE flights (
    DEST_COUNTRY_NAME STRING,
    ORIGIN_COUNTRY_NAME STRING COMMENT "여기가 코멘트",
    count LONG
    )
    USING CSV
    OPTIONS (path './2015-summary.csv')
    
  • SELECT 결과를 테이블로 생성할 수도 있다.

    CREATE TABLE flights_from_select
    USING parquet AS SELECT * FROM flights
    
  • 테이블이 없을 시에만 생성할 수도 있다.

    CREATE TABLE IF NOT EXISTS flights_from_select
    AS SELECT * FROM flights
    
  • 파티셔닝된 데이터셋을 저장해 데이터 레이아웃을 제어할 수도 있다.

    CREATE TABLE partitioned_flights
    USING parquet
    PARTITIONED BY (DEST_COUNTRY_NAME)
    AS
    SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count
    FROM flights LIMIT 5
    

외부 테이블 생성하기

  • Spark는 외부 테이블의 Metadata를 관리한다.
  • 테이블 데이터는 관리하지 않는다.
CREATE EXTERNAL TABLE hive_flights (
  DEST_COUNTRY_NAME STRING,
  ORIGIN_COUNTRY_NAME STRING,
  count LONG
)
ROW FORMAT DELIMITED FIELDS TERMINAED BY ','
LOCATION './data/flight-data-hive'

테이블 데이터 삽입하기

  • 테이블의 데이터 삽입은 표준 SQL문법을 따른다.
  • 특정 파티션에만 저장하고 싶은 경우 파티션 명세를 추가할 수 있다.
INSERT INTO partitioned_flights
PARTITION (DEST_COUNTRY_NAME=" UNITED STATES ")
SELECT count, ORIGIN_COUNTRY_NAME
FROM flights
WHERE DEST_COUNTRY_NAME=" UNITED STATES "
LIMIT 12

메타데이터 확인하기

  • 테이블 생성시 추가한 코멘트를 확인하기

    DESCRIBE TABLE flights_csv
    
  • 파티셔닝 스키마 정보 확인하기

    SHOW PARTITIONS partitioned_flights
    

메타데이터 갱신하기

  • 테이블의 메타데이터를 유지해야 최신의 데이터셋을 읽고 있다는 것을 보장할 수 있다.
  • 테이블과 관련된 모든 캐싱된 항목을 갱신
    REFRESH table partitioned_flights
    
  • 카탈로그에서 관리하는 테이블의 파티션 정보를 갱신
    MSCK REPAIR TABLE partitioned_flights
    

테이블 제거하기

  • 외부 테이블은 제거할 수 있지만 데이터는 삭제되지 않는다.
-- 테이블 삭제
DROP TABLE flights_csv;

-- 테이블이 존재할 시 삭제
DROP TABLE IF EXISTS flights_csv;

테이블 캐싱하기

  • 테이블 캐싱

    CACHE TABLE flights
    
  • 캐쉬된 테이블 제거

    UNCACHE TABLE flights
    

뷰(View)

  • View는 사용자에게 Table처럼 보인다.
  • 모든 데이터를 새로운 경로에 다시 저장하는 것이 아닌 쿼리 시점에 트랜스포메이션을 수행한다.
  • 쿼리 실행계획을 살펴보면 둘 다 같은 코드로 변환되는 것을 확인할 수 있다.

     flights = spark.read.format("json")\
        .load("./2015-summary.json")
    
     just_usa_df = flights.where("dest_country_name = 'United States'")
     print(just_usa_df.selectExpr("*").explain)
    
    EXPLAIN SELECT * FROM just_usa_view
    

뷰 생성

  • 기본 뷰 생성

    CREATE VIEW just_usa_view
    AS
    SELECT * FROM flights
    WHERE dest_coutnry_name = 'United States'
    
  • 현재 세션에서만 사용가능한 임시 뷰도 만들 수 있다.

    CREATE TEMP VIEW just_usa_view_temp
    AS
    SELECT * FROM flights
    WHERE dest_coutnry_name = 'United States'
    
  • 전역적 임시뷰도 생성할 수 있다.(DB상관없이 사용 가능)

    CREATE GLOBAL TEMP VIEW just_usa_view_temp
    AS
    SELECT * FROM flights
    WHERE dest_coutnry_name = 'United States'
    
  • 생성된 뷰를 덮어쓸 수도 있다.

    CREATE OR REPLACE TEMP VIEW just_usa_view_temp
    AS
    SELECT * FROM flights
    WHERE dest_coutnry_name = 'United States'
    

뷰 제거하기

  • TABLE대신 VIEW 키워드를 사용하면된다.
DROP VIEW IF EXISTS just_usa_view;

데이터베이스(Database)

  • 전체 데이터베이스 목록 확인

    SHOW DATABASE
    
  • 데이터베이스 생성하기

    CREATE DATABASE test_db
    
  • 데이터베이스 사용하기

    use test_db
    
  • 데이터베이스 삭제하가ㅣ

    DROP DATABASE IF EXISTS test_db;
    

복합 데이터 타입

  • 표준 SQL과는 거리가 먼 기능
  • 구조체, 리스트, 맵 타입이 존재한다.

구조체

  • 구조체는 맵에 더 까가운 복합 데이터 타입이다.
  • 여러 컬럼이나 표현식을 괄호로 묶으면 생성된다.

  • 구조체 생성

    CREATE VIEW IF NOT EXISTS nested_data
    AS
    SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count
    FROM flights
    
  • 구조체 조회

    -- 구조체 조회
    SELECT * FROM nested_data
    
    -- 구조체의 개별 칼럼 조회
    SELECT country.DEST_COUNTRY_NAME, count FROM nested_data
    
    -- 구조체의 모든 개별 칼럼 조회
    SELECT country.*, count FROM nested_data
    

리스트

  • collect_list, collect_set 함수를 통해 생성할 수 있다.

  • 리스트 조희

    SELECT DEST_COUNTRY_NAME AS new_name, collect_list(count)
    AS flight_counts, collect_set(ORIGIN_COUNTRY_NAME) AS origin_set
    FROM flights
    GROUP BY DEST_COUNTRY_NAME
    
    -- 리스트의 특정 위치의 아이템 조회
    SELECT DEST_COUNTRY_NAME AS new_name,
    collect_list(count)[0] AS flight_counts, collect_set(ORIGIN_COUNTRY_NAME) AS origin_set
    FROM flights
    GROUP BY DEST_COUNTRY_NAME
    
  • explod 함수를 이용한 배열을 로우로 만들기

    CREATE OR REPLACE TEMP VIEW flights_agg AS
    SELECT DEST_COUNTRY_NAME, collect_list(count) AS collected_counts
    FROM flights
    GROUP BY DEST_COUNTRY_NAME
    
    -- collect 함수와 정확히 반대로 동작한다.
    -- collect 수행 이전의 DataFrame과 동일한 결과를 반환한다.
    SELECT explode(collected_counts), DEST_COUNTRY_NAME
    FROM flights_agg
    

함수

  • Spark SQL 사용가능한 함수 조회

    SHOW FUNCTIONS
    
    SHOW SYSTEM FUNCTIONS
    
    SHOW USER FUNCTIONS
    
  • 사용자 정의 함수

    def power3(number:Double): Double = number * number * number
    spark.udf.register("power3", power3(_:Double):Double)
    
    SELECT count, power3(count) FROM flights
    

서브쿼리

  • 쿼리 안에 쿼리를 지정할 수 있도록 지원하는 기능

상호연관 서브쿼리

  • 서브쿼리가 사용하는 정보가 쿼리 외부 범위에 있는 경우

비상호연관 서브쿼리

  • 서브쿼리가 사용하는 정보가 서브쿼리 내부에만 있는 경우

[^oltp]: wikipedia - 온라인 트랜잭션 처리 [^olap]: wikipedia - 온라인 분석 처리 [^hivemetastore]: Hive Metastore는 여러 세션에서 사용할 테이블 정보를 보관하고 있다. [^jdbc]: Java Database Connectivity [^odbc]: Open Database Connectivity