본문 바로가기

내가 당면한 문제와 해결방안

clone and expedia_airport_more table conf

clone.conf

input {
     jdbc {
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
        jdbc_user => "atlas"
        jdbc_password => "atlas"
        jdbc_validate_connection => true
        jdbc_driver_library => "/lib/postgres-42-test.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        schedule => "* * * * *"
        statement => "select *, (select json_build_object('region_id', region_id, 'region_name', region_name, 'region_name_kr', region_name_kr, 'region_type', region_type, 'region_code', region_code)::jsonb from expedia_region_union  where region_id = c.region_id ) as region_json from expedia_airport_more c"
        tags => ["expedia_airport_more"]
   }

   jdbc {
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
        jdbc_user => "atlas"
        jdbc_password => "atlas"
        jdbc_validate_connection => true
        jdbc_driver_library => "/lib/postgres-42-test.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        schedule => "* * * * *"
        statement => "SELECT region_id, region_name, region_name_full, region_name_kr, region_name_full_kr, continent::text, country::text from expedia_region_union order by region_id asc limit 100"
        tags => ["expedia_region_union"]
   }

   stdin {
        codec => plain { charset => "UTF-8"}
   }

}


filter {
        if "expedia_region_union" in [tags] {
                ruby {
                        code => "
                                require 'json'
                                continent_json = JSON.parse(event.get('continent'.to_s))
                                event.set('continent', continent_json)
                                "
                }
                clone {
                        clones => ['test-a', 'test-b']          }
                if [type] == 'test-a' {
                        prune {
                                whitelist_names => ["region_id", "region_name_kr"]
                        }
                        mutate {
                                add_field => {"[@metadata][type]" => "test-a"}
                        }
                } else {
                        prune {
                                whitelist_names => ["region_id", "region_name_full_kr"]
                        }
                        mutate {
                                add_field => {"[@metadata][type]" => "test-b"}
                        }
                }
        } else {
                ruby {
                        code => ""
                }
                prune {
                        whitelist_names => ["region_id", "region_name"]
                }
                mutate {
                        add_field => {"[@metadata][type]" => "test-c"}
                }
        }
}

output {
    stdout { codec => rubydebug }

    if [@metadata][type] == 'test-c' {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "test-c"
            doc_as_upsert => true
            action => "update"
            document_id => "%{region_id}"
        }
    }
    else if [@metadata][type] == 'test-a' {
        elasticsearch {
            hosts => [ "localhost:9200" ]
            index => "test-a"
            doc_as_upsert => true
            action => "update"
            document_id => "%{region_id}"
        }
    } else {
        elasticsearch {
            hosts => [ "localhost:9200" ]
            index => "test-b"
            doc_as_upsert => true
            action => "update"
            document_id => "%{region_id}"
        }
    }
}

input {
     jdbc {
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
        jdbc_user => "atlas"
        jdbc_password => "atlas"
        jdbc_validate_connection => true
        jdbc_driver_library => "/lib/postgres-42-test.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        schedule => "* * * * *"
        statement => "select id, iata_airport_code, name, name_full, country_code, center_longitude, region_name_full, region_name, region_type, region_id, name_kr, name_full_kr, st_asgeojson(center_geo_point)::text as center_geo_point_text, la, lo, public_flag, international_flag, source_from, source_time, iata_airport_metro_code,

                            (select json_build_object('region_id', region_id, 'region_name', region_name, 'region_name_kr', region_name_kr, 'region_type', region_type, 'region_code', region_code)::text from expedia_region_union  where region_id = c.region_id) as haha_test from expedia_airport_more c"
        tags => ["expedia_airport_more"]
   }

   stdin {
        codec => plain { charset => "UTF-8"}
   }

}

filter {
                ruby {
                        code => "require 'json'
                                begin
                                        center_geo_point_json = JSON.parse(event.get('center_geo_point_text') || '{}')
                                        event.set('center_geo_point', center_geo_point_json)
                                        event.remove('center_geo_point_text')
                                rescue Exception => e
                                end
                                begin
                                        region_json_json = JSON.parse(event.get('haha_test') || '{}')
                                        event.set('region_json', region_json_json)
                                rescue Exception => e
                                        event.tag('you have to check region_json sql.')
                                end
                                "
                }
}

output {
 elasticsearch {
        hosts => [ "localhost:9200" ]
        index => "test-d"
        doc_as_upsert => true
        action => "update"
        document_id => "%{id}"
        }
 stdout { codec => rubydebug }
}

~
~

'내가 당면한 문제와 해결방안' 카테고리의 다른 글

clone and postgis  (0) 2020.06.23
clone2  (0) 2020.06.23
multiple jdbc input and clone  (0) 2020.06.19
logstash clone 설정  (0) 2020.06.17
elasticsearch max_result_window 설정  (0) 2020.06.17