상황
1 postgres table 2개를 input
2 그 중 1개를 clone 하여
3 3개의 인덱스에 각각 output
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
jdbc_user => "atlas"
jdbc_password => "atlas"
jdbc_validate_connection => true
jdbc_driver_library => "/lib/postgres-42-test.jar"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "select id, iata_airport_code, name, name_full, country_code, center_longitude, region_name_full, region_name, region_type, region_id, name_kr, name_full_kr, st_asgeojson(center_geo_point)::text as center_geo_point_text, la, lo, public_flag, international_flag, source_from, source_time, iata_airport_metro_code,(select json_build_object('region_id', region_id, 'region_name', region_name, 'region_name_kr', region_name_kr, 'region_type', region_type, 'region_code', region_code)::text from expedia_region_union where region_id = c.region_id) as haha_test from expedia_airport_more c"
tags => ["expedia_airport_more"]
}
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
jdbc_user => "atlas"
jdbc_password => "atlas"
jdbc_validate_connection => true
jdbc_driver_library => "/lib/postgres-42-test.jar"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "SELECT region_id, region_name, region_name_full, region_name_kr, region_name_full_kr, continent::text, country::text from expedia_region_union order by region_id asc limit 100"
tags => ["expedia_region_union"]
}
stdin {
codec => plain { charset => "UTF-8"}
}
}
filter {
if "expedia_region_union" in [tags] {
ruby {
code => "
require 'json'
continent_json = JSON.parse(event.get('continent'.to_s))
event.set('continent', continent_json)
"
}
clone {
clones => ['test-e', 'test-f']
}
if [type] == 'test-e' {
prune {
whitelist_names => ["region_id", "region_name_kr"]
}
mutate {
add_field => {"[@metadata][type]" => "test-e"}
}
} else {
prune {
whitelist_names => ["region_id", "region_name_full_kr"]
}
mutate {
add_field => {"[@metadata][type]" => "test-f"}
}
}
} else {
ruby {
code => "require 'json'
begin
center_geo_point_json = JSON.parse(event.get('center_geo_point_text') || '{}')
event.set('center_geo_point', center_geo_point_json)
event.remove('center_geo_point_text')
rescue Exception => e
end
begin
region_json_json = JSON.parse(event.get('haha_test') || '{}')
event.set('region_json', region_json_json)
rescue Exception => e
event.tag('you have to check region_json sql.')
end
"
}
prune {
whitelist_names => ["id", "region_id", "region_json"]
}
mutate {
add_field => {"[@metadata][type]" => "test-g"}
}
}
}
output {
stdout { codec => rubydebug }
if [@metadata][type] == 'test-e' {
elasticsearch {
hosts => ["localhost:9200"]
index => "test-e"
doc_as_upsert => true
action => "update"
document_id => "%{region_id}"
}
}
else if [@metadata][type] == 'test-f' {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "test-f"
doc_as_upsert => true
action => "update"
document_id => "%{region_id}"
}
} else {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "test-g"
doc_as_upsert => true
action => "update"
document_id => "%{id}"
}
}
}
'내가 당면한 문제와 해결방안' 카테고리의 다른 글
logstash 변경분만 가져오기 (0) | 2020.07.03 |
---|---|
clone conf 적용 파일 (0) | 2020.06.30 |
clone2 (0) | 2020.06.23 |
clone and expedia_airport_more table conf (0) | 2020.06.23 |
multiple jdbc input and clone (0) | 2020.06.19 |