본문 바로가기

내가 당면한 문제와 해결방안

multiple jdbc input and clone

input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
        jdbc_user => "atlas"
        jdbc_password => "atlas"
        jdbc_validate_connection => true
        jdbc_driver_library => "/lib/postgres-42-test.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        schedule => "* * * * *"
        statement => "SELECT region_id, region_name, region_name_full, region_name_kr, continent::text, country::text from expedia_region_union order by region_id asc limit 100"
   }

   stdin {
        codec => plain { charset => "UTF-8"}
   }

}
filter {
        ruby {
                code => "
                        require 'json'
                        continent_json = JSON.parse(event.get('continent').to_s)
                        event.set('continent', continent_json)
                        country_json = JSON.parse(event.get('country').to_s)
                        event.set('country', country_json)
                        "
                }
        date {
                match => ['time', 'UNIX']
        }
        clone {
                clones => ['2020-05-21-thu', '2020-06-17-wed']
        }
        if [type] == '2020-06-17-wed' {
                prune {
                        whitelist_names => ["region_id", "region_name_full"]
                }
                mutate {
                        add_field => {"[@metadata][type]" => "only_region_name_full"}
                }
        }
        else if [type] == '2020-05-21-thu' {
                prune {
                        whitelist_names => ["region_id", "region_name"]
                }
                mutate {
                        add_field => {"[@metadata][type]" => "only_region_name"}
                }
        }
}

output {
        stdout { codec => rubydebug }

        if [@metadata][type] == 'only_region_name_full' {
                elasticsearch {
                        hosts => ["localhost:9200"]
                        index => "2020-06-17-wed"
                        doc_as_upsert => true
                        action => "update"
                        document_id => "%{region_id}"
                }
        }
        else {
                elasticsearch {
                        hosts => [ "localhost:9200" ]
                        index => "2020-05-21-thu"
                        doc_as_upsert => true
                        action => "update"
                        document_id => "%{region_id}"
                }
        }

}

'내가 당면한 문제와 해결방안' 카테고리의 다른 글

clone2  (0) 2020.06.23
clone and expedia_airport_more table conf  (0) 2020.06.23
logstash clone 설정  (0) 2020.06.17
elasticsearch max_result_window 설정  (0) 2020.06.17
logstash 설정  (0) 2020.06.16