-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathanalyzer.py
More file actions
132 lines (113 loc) · 3.35 KB
/
analyzer.py
File metadata and controls
132 lines (113 loc) · 3.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import sqlite3
import argparse
import readline
import datetime
import csv
# 连接一个sqlite3数据库文件,并持续从控制台接受SQL语句,将SQL语句执行结果打印到控制台中。
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("db_file", help="sqlite3 database file")
return parser.parse_args()
def handle_select(
conn: sqlite3.Connection,
cursor: sqlite3.Cursor,
sql: str,
export_file_name: str = None,
) -> None:
cursor.execute(sql)
# 输出列名
if export_file_name:
with open(
export_file_name,
"w",
encoding="utf-8",
) as f:
writer = csv.writer(f)
writer.writerow([col_desc[0] for col_desc in cursor.description])
for row in cursor.fetchall():
writer.writerow(row)
else:
print(", ".join([col_desc[0] for col_desc in cursor.description]))
print("-" * 20)
for row in cursor.fetchall():
print(", ".join([str(x) for x in row]))
def handle_execute(conn: sqlite3.Connection, cursor: sqlite3.Cursor, sql: str) -> None:
cursor.execute(sql)
conn.commit()
print("Affect:", cursor.rowcount, "rows")
def completer(text, state):
options = [
"select",
"from",
"insert",
"update",
"delete",
"where",
"exit",
"order",
"by",
"group",
"set",
"values",
"distinct",
"join",
"date",
"time",
"datetime",
"and",
"or",
"like",
"limit",
"having",
"export",
]
matches = [option for option in options if option.startswith(text)]
if state < len(matches):
return matches[state]
else:
return None
if __name__ == "__main__":
# 设置命令行历史记录长度
readline.set_history_length(1000)
# 设置自动补全函数
readline.set_completer(completer)
readline.parse_and_bind("tab: complete")
args = parse_args()
conn = sqlite3.connect(args.db_file)
cursor = conn.cursor()
last_select_history_idx = -1
while True:
cmd = input("CMD> ")
try:
if cmd == "exit":
break
elif cmd.startswith("export"):
if last_select_history_idx == -1:
print("No select history to export")
continue
target_path = cmd.split()[1]
print(
"export sql: [",
readline.get_history_item(last_select_history_idx),
"] to csv file",
target_path,
)
handle_select(
conn,
cursor,
readline.get_history_item(last_select_history_idx),
target_path,
)
continue
sql_cmd = cmd.strip().upper().split()[0]
if sql_cmd == "SELECT":
handle_select(conn, cursor, cmd)
last_select_history_idx = readline.get_current_history_length()
else:
handle_execute(conn, cursor, cmd)
except Exception as e:
print("Error!", e)
continue
cursor.close()
conn.close()
print("Bye")