생활데이타/ElasticSearch

[logstash] JDBC input plugin

개발의 여름 2017. 3. 31. 10:50
 logstash JDBC input plugin

상세한 내용은 도큐먼트 참고 : https://www.elastic.co/guide/en/logstash/5.2/plugins-inputs-jdbc.html
설명 : DB -> logstash 를 연동해주는 플러그인.
특이한 점 : 
- JDBC 드라이버 및 클래스 위치를 직접 지정해 주어야 한다.
- sql_last_value 값을 사용하여 추가적으로 생성되는 값을 업데이트 할 수 있다.  마지막 레코드의 특정 칼럼 값이나 마지막 작업 시간이 파일에 저장되어 logstash 수행시에 sql_last_value 변수 저장되어 statement에 사용될 수 있다. 
아쉬운점 : 
- 스케줄 지정 시 crontab 구문을 사용하는데, crontab 문법이 분/시/일/월/요일 방식으로 지정되기 때문에 초 단위 수행을 지정할 방법이 없다.(아직 찾지 못한 걸까?)
주의사항 : 페이징 작업 또는 레코드 업데이트시 stastement에 sql_last_value 조건을 주어야 한다. 처음에 알아서 변경분을 처리해 줄것을 기대하고 .... 삽질한 뒤 제대로 사용하게 되었다. 역시 도큐먼트를 읽고 또 읽는 작업은 중요하다.

주소 파라메터 및 설정 

input {
   jdbc {
        # JDBC driver 위치
        jdbc_driver_library => "/usr/share/java/mysql-connector-java-5.1.12.jar”

        # JDBC dricer class
        jdbc_driver_class => “com.mysql.jdbc.Driver”

        # db host & database
        jdbc_connection_string => "jdbc:mysql://{db_host}/{data_base}”

 # 한글 데이터 사용시

        # jdbc_connection_string => "jdbc:mysql://{db_host}/{data_base}?useUnicode=true&characterEncoding=utf8


        jdbc_pool_timeout => 3000
        jdbc_paging_enabled => true
        dbc_page_size => 100000
        jdbc_user => “{user}"
        jdbc_password => “{pass_word}”

        # 스케쥴 
        # linux crontab 문법을 따름
        # 매분 마다 실행 
        schedule => "* * * * *”
        # 데이터 트레킹을 위해 마지막 데이터 기록시 사용
        tracking_column_type => “numeric"

        # false 타임스탬프 사용, true 숫자 사용
        use_column_value => true

        # 트레킹에 사용할 컬럼
        tracking_column => id
        
        # 마지막 컬럼값 기록 여부, true시 0 또는 1970년 1월 1일 부터 시작된다.
        # clean_run => false
        
        # 인코딩
        charset => "UTF-8"
     
        # 쿼리
        # sql_last_value는 마지막 레코드 중 설정된 필드값(기본값 시간)
        # sql_last_value 사용해서 마지막 값만 가져올수 있도록 SQL 명령어를 짜야한다.
        statement => " SELECT id, name, c_date FROM elk_jdbc_test WHERE id > :sql_last_value "

 # sql_last_value 값 저장 경로. 기본값은 "~/.logstash_jdbc_last_run"
 last_run_metadata_path => “{last_run_path}"
    }
}

filter {

}

output {
     amazon_es {
          hosts => [“{host}"]
          region => "ap-northeast-1"
          index => "elk_jdbc_test"
     }
}