def deep_get(dictionary, keys, default=None):
return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary)
@app.task
@provide_redis
def each_k(redis_db, k):
# print("레디스 디비 : ")
# print(redis_db)
document_id = k.get("id")
print("document_id : " + document_id)
osm_id = deep_get(k, "ATLAS-DEV.codes.osm")
polygon = redis_db.hget(name="osm:polygons", key=osm_id)
# print("폴리곤 : ")
# print(polygon)
if polygon is not None:
k['ATLAS-DEV'].update({'boundaries': polygon})
bucket_test.upsert(document_id, k['ATLAS-DEV'])
flask
celery
1 redis에 있는 key를 가져다가 couchbase 에서 해당하는 key를 osm이라는 필드에 있는지 확인. 없거나 null 인지 확인.
2 있다면 couchbase에 upsert 한다.
이런 스크립트. 입니다.
필요한 라이브러리.
from couchbase.cluster import Cluster
from couchbase.cluster import PasswordAuthenticator
from couchbase.n1ql import N1QLQuery
from couchbase.n1ql import N1QLRequest
from elasticsearch import Elasticsearch
from time import sleep
from urllib import request
from wikidata.client import Client
from openpyxl import Workbook
import base64
import json
import osmapi
import re
import redis
import urllib
import hashlib
import time
import requests
import csv
from functools import reduce
wikid_client = Client()
es_client = Elasticsearch("192.168.28.46:9210")
cluster = Cluster('couchbase://192.168.28.46')
authenticator = PasswordAuthenticator('ATLAS-DEV','ATLAS-DEV')
cluster.authenticate(authenticator)
bucket_test = cluster.open_bucket('ATLAS-DEV')
redis_db = redis.StrictRedis(host='192.168.28.47', port=6379, db=10, decode_responses=True)
map_api = osmapi.OsmApi()
N1QLQuery.timeout = 3600
일단 아래와 같이 동기로 짜본다.
def deep_get(dictionary, keys, default=None):
return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary)
# 실제 코드에서는 LIMIT 20 을 제외하였음.
# TODO : 카우치베이스에 폴리곤 저장하기.
query = N1QLQuery("SELECT meta().id, * FROM `ATLAS-DEV` WHERE class_type = 'REGION' AND (codes.osm IS NOT NULL OR codes.osm != '') LIMIT 20")
for k in bucket_test.n1ql_query(query):
# print("k : ")
# print(k)
document_id = k.get("id")
# print("document_id : " + document_id)
osm_id = deep_get(k, "ATLAS-DEV.codes.osm")
polygon = redis_db.hget(name="osm:polygons", key=osm_id)
if polygon is not None:
k['ATLAS-DEV'].update({'boundaries': polygon})
bucket_test.upsert(document_id, k['ATLAS-DEV'])
음 돌아가네
마침 celery 설정이 되어있으니 비동기로도 해보자.
# tasks.py
from app import config
from app.modules.provide_redis import provide_redis
from celery import Celery
import app.external.osm as osm
import app.external.geonames as geonames
import requests
import json
import time
import app.jobs.celeryconfig as celeryconfig
# celery -A app.jobs.task worker --loglevel=info --concurrency=1 --pool solo
app = Celery('tasks', broker='redis://localhost:6379/0')
app.config_from_object(celeryconfig)
api_host = config['ATLAS_API_HOST']
def deep_get(dictionary, keys, default=None):
return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary)
@app.task
@provide_redis
def each_k(redis_db, k):
# print("레디스 디비 : ")
# print(redis_db)
document_id = k.get("id")
print("document_id : " + document_id)
osm_id = deep_get(k, "ATLAS-DEV.codes.osm")
polygon = redis_db.hget(name="osm:polygons", key=osm_id)
# print("폴리곤 : ")
# print(polygon)
if polygon is not None:
k['ATLAS-DEV'].update({'boundaries': polygon})
bucket_test.upsert(document_id, k['ATLAS-DEV'])
app.task 애노테이션? 을 달아서 해줌.
provide_redis 이거는 데코레이터임.
celery -A app.jobs.task worker --loglevel=info --concurrency=1 --pool solo
celery로 실행을 해준다.
역시 잘됨.
그럼 멀티스레드로 하면은? 얼마나 빠를까?
사실 얼마나 빠른지는 모르겠으며
status_id_flag = [] # 리스트로 관리 할것.
@app.task
@provide_redis
def each_k(redis_db, k):
global status_id_flag
document_id = k.get("id")
if(document_id not in status_id_flag):
status_id_flag.append(document_id)
print("document_id : " + document_id)
osm_id = deep_get(k, "ATLAS-DEV.codes.osm")
polygon = redis_db.hget(name="osm:polygons", key=osm_id)
if polygon is not None:
k['ATLAS-DEV'].update({'boundaries': polygon})
bucket_test.upsert(document_id, k['ATLAS-DEV'])
print("배열길이 : ")
print(len(status_id_flag))
celery command는
celery -A app.jobs.task worker --loglevel=DEBUG --concurrency=4 -P eventlet
이렇게 실행.
'내가 당면한 문제와 해결방안' 카테고리의 다른 글
this (0) | 2019.06.29 |
---|---|
javascript this (0) | 2019.06.29 |
springboot test using cucumber (0) | 2019.05.15 |
can not instantiate from JSON object (missing default constructor or creator, or perhaps need to add/enable type information?) (0) | 2019.05.14 |
for...in / for... of loop diff - javascript (0) | 2019.04.20 |