[Spark_5] Kafka with twitter stream(1)

Kafka

  • 분산처리가 가능한 고성능 메세지 큐(streaming data를 저장/읽기/분석을 가능하게 함)
  • kafka는 Broker의 역할
  • Topic안에 partition 기능 존재 (link)
    • partition안에서는 순서 바뀌고, b/w partition은 순서 보장(유지)
  • 특징
    • file copy 불필요
    • delay되지 않도록, 저장되기 전 consumer에 뿌려줘야 함.
    • 메세지를 디스크에 저장. 유실없음.
    • 메세지 중복/유실 문제
      • trade-off
      • 유실 안하려면 중복 vs 중복 안하면 유실 위험
      • default: At least once(유실 회피)
  • 구조
    • cluster형식으로 실행
    • Kafka cluster는 topic의 records stream을 저장
    • record는 key, value, timestamp로 구성됨
  • 핵심 API: Producer/Consumer/Streams/Connector API
    • Producer API: App이 steram of records를 Kafka Topic으로 전송하도록 함
    • Consumer API: App이 topic을 subscribe하고 stream of records를 처리하도록 함
    • Streams API: App이 stream processor 역할을 하도록 함. stream processor는 topic의 input stream을 받아 transform하여 output stream 생성
    • Connector API: Kafka topic을 현재 app 또는 data system에 연결하는 producers/consumers를 실행하도록 함. eg. RDB에 연결된 Connector는 table의 모든 변경사항을 capture

Tutorial: twitter stream을 kafka로 받아오기

kafka 실행

  • install

    1
    2
    3
    $ curl "http://www-eu.apache.org/dist/kafka/1.1.0/kafka_2.12-1.1.0.tgz" -o ~/Downloads/kafka.tgz
    $ mkdir ~/kafka && cd ~/kafka
    $ tar -xvzf ~/Downloads/kafka.tgz --strip 1
  • zookeeper & server 실행: zookeeper가 서버 매니징

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # start server
    ## terminal1
    $ bin/zookeeper-server-start.sh config/zookeeper.properties

    ## terminal2
    $ bin/kafka-server-start.sh config/server.properties

    # create a topic
    ## terminal3
    $ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

    # create producer
    ## terminal4
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    >> This is a message
    >> This is another message

    # create consumer
    ## terminal5
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    >> This is a message
    >> This is another message
  • 정리: producer(console) - kafka(zookeeper) - consumer(console)

    • 각 단계에서 test topic create - subscribe

twitter 설정

  • install twitter connector
  • https://github.com/Eneco/kafka-connect-twitter

    • install mvn mvn clean package
    • export CLASSPATH=pwd/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar
    • CLASSPATH = /home/henry/kafka_twitter/kafka-connect-twitter/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar
      1
      2
      export KAFKA=카프카_다운로드_받은_경로 `pwd`
      $KAFKA/bin/connect-standalone.sh connect-simple-source-standalone.properties twitter-source.properties
  • 설정파일 만들기

    1
    $ cp twitter-source.properties.example twitter-source.properties
  • key 토큰 받기

  • 중요: outdated된 code 최신화 필요(https://github.com/Eneco/kafka-connect-twitter/pull/56/files)

    • 수정 후 다시 build mvn clean package

실행

  • kafka: connect-standalone.sh
  • kafka-connect-twitter: connect-simple-source-standalone.properties & twitter-source.properties
    1
    2
    $ cd kafka-connect-twitter
    $ $KAFKA/bin/connect-standalone.sh connect-simple-source-standalone.properties twitter-source.properties

추가

  • 실행파일 jar - zip파일 압축파일임
  • build: 소스코드를 실행 가능한 상태로 만들기
    • 소스코드를 컴파일하면 실행파일이 나옴
  • maven: 빌드해야할 코드가 너무 많고 다른 pjt코드도 가져와야 할 때 사용
< !-- add by yurixu 替换Google的jquery并且添加判断逻辑 -->