input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://IP:25432/atlasdb"
jdbc_user => "USER"
jdbc_password => "PW"
jdbc_driver_library => "/lib/postgresql-42.2.12.jar"
jdbc_validate_connection => true
jdbc_driver_class => "org.postgresql.Driver"
columns_charset => {"region_name_kr" => "UTF-8"}
schedule => "*/10 * * * *"
statement => "SELECT region_id, st_asgeojson(center_geo_point)::text as center_geo_point_text, center_longitude, center_latitude, jsonn::text from expedia_region_union order by region_id asc"
jdbc_paging_enabled => "true"
jdbc_page_size => "10000"
}
stdin {
codec => plain { charset => "UTF-8"}
}
}
filter {
ruby {
code => "
require 'json'
begin
center_geo_point_json = JSON.parse(event.get('center_geo_point_text') || '{}')
event.set('point', center_geo_point_json)
event.remove('center_geo_point_text')
rescue Exception => e
#event.tag('malfunctioned center_geo_point')
end
begin
location = {lat: event.get('center_latitude').to_f, lon: event.get('center_longitude').to_f}
event.set('location', JSON(location.to_json))
event.set('center_longitude', event.get('center_longitude'))
event.set('center_latitude', event.get('center_latitude'))
rescue Exception => e
#event.tag('something happens in location block')
end
begin
jsonn_json = JSON.parse(event.get('jsonn').to_s || '{}')
jsonn_json.each{|k,v|
event.set(k,v)
}
event.remove('jsonn')
rescue Exception => e
#event.tag('something happens in jsonn')
end
"
}
mutate {
remove_field => ["@version", "@timestamp"]
}
}
output {
elasticsearch {
hosts => ["https://atlas-search.회사.com:443"]
index => "expedia_region_union_backup_047"
doc_as_upsert => true
action => "update"
document_id => "%{region_id}"
}
stdout { codec => rubydebug }
}