본문 바로가기

Bigdata

(13)
spark cluster 환경 정리 스파크 애플리케이션은 마스터 역할을 담당하는 드라이버(driver) 프로그램과 노드의 자원을 관리하는 클러스터 매니저 그리고 실제 데이터를 처리하는 익스큐터(executor)로 구성됩니다. Driver : Driver는 애플리케이션의 main()함수가 실행되는 프로세스이며 Spark Context를 생성하고 관리하며 transformation과 action 연산을 수행하게 됩니다. Executor : Executor는 CPU와 메모리 등의 자원을 할당받은 프로세스로서 Spark Job의 Task들을 수행하게 되며 처리된 데이터를 나중에 빠르게 재사용할 수 있또록 메모리에 저장해 두는 역할을 하게 됩니다. Cluster Manager : 클러스터 매니저는 스파크의 클러스터 모드를 구성하는 컴포넌트 중 하나..
[spark] json 형태의 문자열을 dataframe으로 변환하는 방법 spark에서 json 파일을 읽어 dataframe 형태로 만들면 별도의 파싱 과정을 직접할 필요 없이 구조화된 데이터로 만들어 주기 때문에 굉장히 편리하게 개발을 할 수 있다. val df = sqlContext.read.json("file.json") 때로는 json이 파일 형태로 존재하지 않고 데이터 처리과정에서 json 형태의 문자열을 dataframe으로 변환해야 할 경우가 있는데 sc.read.json 메소드는 문자열을 인자로 받지 않는다. 그래서 특정 string 변수에 json 형태의 문자열이 존재할 경우 dataframe으로 변환하는 방법을 알아보겠다. 방법은 단순하다. 문자열을 Seq 형태로 만들고 parallelize 메소드를 이용해서 rdd 형태로 변환하면 dataframe으로 변..
Apache Kafka 정리 Apache Kafka 1. 소개 kafka는 링크드인에서 개발한 발행-구독 방식의 고성능 분산 메시징 시스템입니다.kafka는 크게 topic과 메시지를 관리하는 broker, 메시지를 전송하는 producer, 그리고 메시지를 소비하는 consumer로 이루어져 있습니다. 일반 메시징 시스템의 경우 메시지를 메모리에 저장하는 경우가 많은 반면 kafka의 경우 메시지를 파일에 저장하여 메시지의 영속성이 높으며 그러면서도 OS의 페이지 캐시와 zero copy 기법 그리고 async non blocking io를 이용하여 기존 메시징 시스템보다도 고성능을 구현하였습니다.또한 아키텍쳐 자체가 분산 시스템을 목표로 개발되었기 때문에 확장 또한 매우 손쉽게 할 수 있습니다. 예) activemq, kafka..
spark streaming에서 중요한 옵션 정리 최근 스파크 스트리밍을 이용해서 실시간 로그 전처리 작업을 진행중이다.작업을 하면서 문제가 발생했던 부분을 옵션을 수정하면서 해결하였다. 1. spark.streaming.kafka.maxRatePerPartition-> 초당 파티션의 처리량 제한메시지가 한번에 대량으로 들어오거나 어떠한 이유로 스트리밍 어플리케이션이 다운되어서 카프카에 대량의 메시지가 쌓여있을 경우 어플리케이션을 재시작하면 대량의 메시지가 한번에 받아들여져서 설정된 executor-memory 이상의 데이터가 몰리면 OOM에러가 발생하게 된다. 만약에 상황에 대비해서 자원할당을 무작정 크게 할 수는 없기 때문에 옵션을 찾아보았는데 maxRatePerPartition 옵션을 발견하였다. 이 옵션은 초당 파티션에 들어오는 레코드의 양을 제..
[logstash] timestamp timezone 변경 logstash에서 @timestamp의 디폴트 타임존은 UTC 0시이다. Elasticsearch에 넣고 Kibana를 이용해서 분석을 한다면 타임존을 변경해서 분석이 가능하지만 kafka 혹시 다른곳으로 데이터를 출력할 경우 원하는 시간대로 타임존을 변경해야만 하는 경우가 있다. 기본적으로 logstash에서는 timestamp의 타임존을 변경할 수 없다. 하지만 아래와 같이 ruby 플러그인을 이용하면 타임존을 변경한 날짜를 변수에 저장할 수 있다. 아래는 로컬타임(한국시간)으로 타임존을 변경하고 index_day라는 변수에 할당하고 있다. filter { ruby { code => "event.set('index_day', event.timestamp.time.localtime.strftime('..