Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
haje01 committed Dec 1, 2016
1 parent b6e93f0 commit a1520d7
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 71 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ wdfwd/tests/cfg*.yml
*.swo
wdfwd.sublime-*
tmp*
*.csv
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,21 @@ DB(= SQLServer)에 남고 있는 로그 테이블을 로컬 CSV 파일로 덤프
* `key_col` - 테이블의 키 컬럼명. 로그성 테이블의 경우 일반적으로 `datetime` 타입의 컬럼이 지정된다.
* `tag` - 목적지 스트리밍 서비스에서 사용될 분류 태그

MS SQLServer에서 SP(Stored Proceedure)를 써야만 한다면, 아래와 같이 설정할 수 있다.

- table:
name: Log2
start_key_sp: uspGetStartDatetime
latest_rows_sp: uspGetLatestRows
key_idx: 0
tag: wdfwd.dbtail2

* `name` - 대상 테이블 이름
* `start_key_sp` - 테일링을 시작할 키를 요청하는 SP. int형 인자를 받는데 이값은 lines_on_start가 건네진다.
* `latest_rows_sp` - 지정 시간 이후의 최신 로그 행을 요청. datetime형 인자를 받는데, 최근 보내진 행의 키가 건네진다.
* `key_idx` - SP 반환된 행에서 키로 사용될 컬럼의 인덱스
* `tag` - 목적지 스트리밍 서비스에서 사용될 분류 태그

#### to

테일링의 목적지를 설정. Fluentd나 AWS Kinesis 같은 데이터 스트리밍 서비스 정보를 지정한다.
Expand Down
6 changes: 5 additions & 1 deletion wdfwd/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ def start_tailing():
encoding=ti.encoding,
lines_on_start=ti.lines_on_start,
max_between_data=ti.max_between_data,
millisec_ndigit=ti.millisec_ndigit)
millisec_ndigit=ti.millisec_ndigit,
dup_qsize=ti.dup_qsize,
key_idx=ti.key_idx,
start_key_sp=ti.start_key_sp,
latest_rows_sp=ti.latest_rows_sp)
elif isinstance(ti, FileTailInfo):
ldebug("start file tail - {}".format(ti))

Expand Down
2 changes: 1 addition & 1 deletion wdfwd/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
class Target:
def __init__(self, **kw):
self.__dict__.update(kw)
self.version = "0.1.2"
self.version = "0.1.3"
self.company_name = "Webzen"
self.copyright = "Copyright (C) 2016 Webzen"
self.name = "WzDat Forwarder"
Expand Down
109 changes: 74 additions & 35 deletions wdfwd/tail.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,16 +252,24 @@ def read_sent_pos(self, target, con):
tname = escape_path(target)
ppath = os.path.join(self.pdir, tname + '.pos')
pos = None

if os.path.isfile(ppath):
with open(ppath, 'r') as f:
pos = f.readline()
self.linfo(1, "found pos file - {}: {}".format(ppath, pos))
else:
parsed_pos = self.parse_sent_pos(pos)
if parsed_pos is None:
self.lerror("Invalid pos file: '{}'".format(pos))
pos = None
else:
pos = parsed_pos

if pos is None:
pos = self.get_initial_pos(con)
self.linfo(1, "can't find pos file for {}, save as "
self.linfo(1, "can't find valid pos for {}, save as "
"initial value {}".format(target, pos))
self._save_sent_pos(target, pos)
pos = self.parse_sent_pos(pos)
pos = self.parse_sent_pos(pos)
return pos

def _save_sent_pos(self, target, pos):
Expand Down Expand Up @@ -402,7 +410,8 @@ class TableTailer(BaseTailer):
def __init__(self, dbcfg, table, tag, pdir, stream_cfg, datefmt, key_col,
send_term=DB_SEND_TERM, max_send_fail=None, echo=False,
encoding=None, lines_on_start=None, max_between_data=None,
millisec_ndigit=None, dup_qsize=None, delim='\t'):
millisec_ndigit=None, dup_qsize=None, delim='\t',
start_key_sp=None, latest_rows_sp=None, key_idx=None):
"""init TableTailer"""
super(TableTailer, self).__init__(tag, pdir, stream_cfg,
send_term,
Expand All @@ -414,17 +423,28 @@ def __init__(self, dbcfg, table, tag, pdir, stream_cfg, datefmt, key_col,
self.datefmt = datefmt
self.millisec_ndigit = millisec_ndigit
self.key_col = key_col
self.key_idx = None
self.dup_qsize = DB_DUP_QSIZE if dup_qsize is None else dup_qsize
self.delim = delim
# If SP is used, both start_key_sp & latest_rows_sp exist should exist.
assert (start_key_sp and latest_rows_sp) or (not start_key_sp and
not latest_rows_sp)
self.start_key_sp = start_key_sp
self.latest_rows_sp = latest_rows_sp
self.use_sp = start_key_sp is not None
self.linfo(" start_key_sp: {}, latest_rows_sp: {}".
format(start_key_sp, latest_rows_sp))
if self.use_sp:
assert key_idx is not None
self.key_idx = key_idx
else:
self.key_idx = None

def _store_key_idx_once(self, con):
"""Store key column index once.
Args:
con(DBConnector): DB connection
"""
assert self.key_col is not None
if self.key_idx is None:
self.key_idx = db_get_column_idx(con, self.table, self.key_col)

Expand All @@ -440,6 +460,7 @@ def is_table_exist(self, con):
Returns:
True if table exists, False otherwise.
"""
assert not self.use_sp
tbl = self.table
if not con.sys_schema:
tbl = tbl.split('.')[-1]
Expand All @@ -448,15 +469,13 @@ def is_table_exist(self, con):
db_execute(con, cmd)
rv = con.cursor.fetchall()
exist = len(rv) > 0
self.ldebug("is_table_exist for {} - {}".format(tbl, exist))
self.ldebug("is_table_exist for {} - {}".format(tbl, rv))
return exist

def tmain(self):
self.ldebug("tmain")
cur = super(TableTailer, self).tmain()

sent_line, _ = self.may_send_newlines(cur)
self.ldebug("tmain done")
return sent_line

def get_sent_pos(self, con):
Expand Down Expand Up @@ -494,10 +513,14 @@ def select_lines_to_send(self, con, pos):
"""
self.ldebug("select_lines_to_send: pos {}".format(pos))
self.raise_if_notarget()
cmd = """
SELECT * FROM {0} WHERE {1} >= '{2}'
""".format(self.table, self.key_col, pos)
db_execute(con, cmd)
if self.use_sp:
cmd = 'EXEC {} ?'.format(self.latest_rows_sp)
db_execute(con, cmd, pos)
else:
cmd = """
SELECT * FROM {0} WHERE {1} >= '{2}'
""".format(self.table, self.key_col, pos)
db_execute(con, cmd)
return con.cursor

def may_send_newlines(self, cur, econ=None):
Expand All @@ -520,9 +543,10 @@ def may_send_newlines(self, cur, econ=None):
self.last_send_try = cur

def _body(self, con):
self._store_key_idx_once(con)
if not self.use_sp:
self._store_key_idx_once(con)

if not self.is_table_exist(con):
if not self.use_sp and not self.is_table_exist(con):
self.lwarning("Target table '{}' not exists".format(
self.table))
return None, None
Expand Down Expand Up @@ -586,11 +610,13 @@ def send_new_lines(self, con, cursor, sent_hashq):
for cols in cursor:
kv = self.conv_datetime(cols[self.key_idx])
msg = self.delim.join([str(c) for c in cols])
# convert encoding
if self.encoding:
msg = msg.decode(self.encoding).encode('utf8')

msgh = hash(msg)
if sent_hashq is not None and msgh in sent_hashq:
self.ldebug("message '{}' is duplicated. skip.".
format(msg))
self.ldebug("dup msg: '{}'..".format(msg[:50]))
continue

self._send_newline(msg, msgs)
Expand Down Expand Up @@ -639,25 +665,39 @@ def get_initial_pos(self, con):
str: Initial position ends with '\t' seperator (which seperates
duplicate hashes ).
"""
self.ldebug("get_initial_pos: {}".format(self.lines_on_start))
assert self.lines_on_start >= 0

if self.lines_on_start == 0:
cmd = 'SELECT TOP(1) dtime FROM {0} ORDER BY {1} DESC'.\
format(self.table, self.key_col)
lines_on_start = 1 if self.lines_on_start == 0 else self.lines_on_start

if self.use_sp:
cmd = 'EXEC {} ?'.format(self.start_key_sp)
db_execute(con, cmd, lines_on_start)
else:
cmd = 'SELECT TOP(1) dtime FROM (SELECT TOP({0}) dtime FROM {1} '\
'ORDER BY {2} DESC) a ORDER BY a.dtime'.\
format(self.lines_on_start, self.table, self.key_col)

db_execute(con, cmd)
format(lines_on_start, self.table, self.key_col)
db_execute(con, cmd)
self.ldebug(cmd)
dtime = con.cursor.fetchone()[0]
dtime = self.conv_datetime(dtime)
self.ldebug("get_initial_pos: {}".format(dtime))
return "{}\t".format(dtime)

def parse_sent_pos(self, pos):
"""Parsing sent position (datetime)"""
pos, hashes = pos.split('\t')
"""Parsing sent position (datetime)
Args:
pos(str): Sent position
Returns:
(position type): Last sent position
(hashes): Sent message hash list
"""
try:
pos, hashes = pos.split('\t')
except ValueError:
return None
if len(hashes) == 0:
hashes = []
else:
Expand Down Expand Up @@ -826,11 +866,11 @@ def handle_file_recreate(self, cur=None):

if ret > 0:
self.linfo(1, "reset target to delegate update_target")
self.save_sent_pos(self.get_initial_pos())
self.save_sent_pos(self.get_initial_pos(None))
self.set_target(None)
return ret

def get_initial_pos(self):
def get_initial_pos(self, con):
return 0

def handle_elatest_rotation(self, epath=None, cur=None):
Expand Down Expand Up @@ -867,7 +907,7 @@ def handle_elatest_rotation(self, epath=None, cur=None):
# even elatest file not exist, there might be pos file
self._save_sent_pos(pre_elatest, self.get_sent_pos(epath))
# reset elatest sent_pos
self._save_sent_pos(epath, self.get_initial_pos())
self._save_sent_pos(epath, self.get_initial_pos(None))
self.last_update = cur
# pre-elatest is target for now
self.set_target(pre_elatest)
Expand Down Expand Up @@ -938,7 +978,7 @@ def update_target(self, start=False):
if start:
self.start_sent_pos(self.target_path)
else:
self.read_sent_pos(self.target_path)
self.read_sent_pos(self.target_path, None)
else:
self.linfo(1, "cur target {}".format(self.target_path))

Expand Down Expand Up @@ -1141,7 +1181,7 @@ def get_sent_pos(self, epath=None):
if tpath in self.cache_sent_pos:
return self.cache_sent_pos[tpath]

pos = self.read_sent_pos(tpath)
pos = self.read_sent_pos(tpath, None)
self.cache_sent_pos[tpath] = pos
return pos

Expand Down Expand Up @@ -1195,11 +1235,11 @@ def save_sent_pos(self, pos):
self._save_sent_pos(self.target_path, pos)


def db_execute(con, cmd):
def db_execute(con, cmd, *args):
try:
con.cursor.execute(cmd.strip())
except pyodbc.ProgrammingError as e:
sys.stderr.write(str(e[1]) + '\n')
con.cursor.execute(cmd.strip(), *args)
except Exception as e:
sys.stderr.write(str(e[1]).decode(sys.stdout.encoding) + '\n')
return False
return True

Expand Down Expand Up @@ -1242,7 +1282,6 @@ def __init__(self, dcfg, ldebug=print, lerror=print):
else True
self.uid = dbcc['uid']
self.passwd = dbcc['passwd']
self.fetchsize = dbc['fetchsize']
self.sys_schema = dbc['sys_schema']
self.ldebug = ldebug
self.lerror = lerror
Expand Down
Loading

0 comments on commit a1520d7

Please sign in to comment.