-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp_helper_functions.py
More file actions
514 lines (413 loc) · 17.7 KB
/
app_helper_functions.py
File metadata and controls
514 lines (413 loc) · 17.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
import glob
import os
import numpy as np
import pandas as pd
import sys
import io
import streamlit as st
import fnmatch
import pickle
@st.cache_data(ttl=60) # TTL 60 will refresh data every minute.
def load_data():
try:
with open(
os.path.join(
st.session_state["cwd"], "data/calculated/edges_clusters_dfs.pickle"
),
"rb",
) as file:
cluster_dfs = pickle.load(file)
with open(
os.path.join(
st.session_state["cwd"],
"data/calculated/personen_organisationen_dfs_processed.pickle",
),
"rb",
) as file:
data_dfs = pickle.load(file)
# Store it in session state for later use
st.session_state["file_versions"] = {}
st.session_state["file_versions"]["earliest_date"] = data_dfs[
"file_versions"
]["earliest_date"]
st.session_state["file_versions"]["latest_date"] = data_dfs[
"file_versions"
]["latest_date"]
st.session_state["file_versions"]["ordered_filenames"] = data_dfs[
"file_versions"
]["ordered_filenames"]
return cluster_dfs, data_dfs
except FileNotFoundError:
print("No data found. Please upload and process data.")
return None, None
def generate_dataframe_html(df):
# wrap table with a div to add a horizontal scrollbar
table_html = df.to_html(header=True, classes=["no_style_div"], render_links=False)
html_string = f"""
<div style="overflow-x: auto; border: 1px solid #e6e9ef; margin-bottom: 2em; padding: 1em;">
{table_html}
"""
# warning: I removed the closing </div> above, since it was shown in plain text. it works now, but could cause future problems.
return html_string
def identify_groups_and_master(df):
"""
Input: df where all users have the same name.
Assigns each entry a duplicate_group number (finding unions by checking addres, email, phone, VerknupftesObjektID).
"""
df.reset_index(inplace=True, drop=True)
# if 'index' in df.columns:
# df.reset_index(inplace=True, drop=True)
# else:
# print("Index column not found in DataFrame.")
# Create a buffer to capture the printed output
output_buffer = io.StringIO()
# Keep the original stdout reference for restoration later
original_stdout = sys.stdout
# Redirect stdout to the buffer
sys.stdout = output_buffer
# Union-find utility functions
def find(x, parent):
# Find the root of x
if parent[x] != x:
parent[x] = find(parent[x], parent)
return parent[x]
def union(x, y, parent):
# Union sets containing x and y
rootX = find(x, parent)
rootY = find(y, parent)
if rootX != rootY:
parent[rootY] = rootX
# Apply the function to the 'UID_CHID' column
df.replace("NotRegisteredCHID", pd.NA, inplace=True)
# Initialize each row as its own group
n = len(df)
parent = [i for i in range(n)]
columns_to_check = [
"address_gmaps",
"EMailAdresse",
"Telefonnummer",
"VerknuepftesObjektID",
]
# Union rows that have matching values in ANY of the columns
for column in ["address_gmaps", "EMailAdresse", "Telefonnummer"]:
unique_vals = df[column].dropna().unique() # Drop NaN values
for val in unique_vals:
indices = df[df[column] == val].index.tolist()
for i in range(1, len(indices)):
# Print statement to explain the merge
print(
f"Rows {indices[0]} and {indices[i]} are being merged due to shared {column} value: {val}\n"
)
union(indices[0], indices[i], parent)
# Special handling for 'VerknuepftesObjektID' column
df["VerknuepftesObjektID"] = df["VerknuepftesObjektID"].apply(
lambda x: [x] if isinstance(x, str) else x
)
all_object_ids = set(
[
item
for sublist in df["VerknuepftesObjektID"].dropna()
if isinstance(sublist, list)
for item in sublist
]
)
for object_id in all_object_ids:
indices = df[
df["VerknuepftesObjektID"].apply(
lambda x: object_id in x if isinstance(x, list) else False
)
].index.tolist()
for i in range(1, len(indices)):
# Print statement to explain the merge
print(
f"Rows {indices[0]} and {indices[i]} are being merged due to shared objectID value: {object_id}\n"
)
union(indices[0], indices[i], parent)
# Assign a unique group ID for each unique parent
group_map = {}
for i in range(n):
root = find(i, parent)
if root not in group_map:
group_map[root] = len(group_map)
df.at[i, "duplicate_group"] = group_map[root]
# Assign scores and determine master
df["Aktiv"] = (
df["Aktiv"].fillna(0).astype(int)
) # TODO: should already be done in notebook
# df['Aktiv'] = df['Aktiv'].astype(bool).astype(int)
df["AnzahlGeschaeftsobjekte"] = df["AnzahlGeschaeftsobjekte"].fillna(0)
df["Verknuepfungsart"] = df["Verknuepfungsart"].fillna(0)
df["Versandart"] = df["Versandart"].fillna(0)
df["AnzahlObjektZeiger"] = df["AnzahlObjektZeiger"].fillna(0)
df["UID_CHID_check"] = df["UID_CHID"].apply(lambda x: x if not pd.isna(x) else 0)
df["score"] = (
df["Aktiv"].astype(int) * 1000
+ df["AnzahlGeschaeftsobjekte"].astype(int) * 100
+ df["UID_CHID_check"].astype(int) * 50
+ df["Verknuepfungsart"].isin(["Administrator", "Mitarbeiter"]).astype(int) * 50
+ df["Versandart"].isin(["Portal"]).astype(int) * 100
+ df["AnzahlObjektZeiger"].astype(int) * 10
)
master_indices = df.groupby("duplicate_group")["score"].idxmax()
df["master"] = -1
df.loc[master_indices, "master"] = df.loc[master_indices, "duplicate_group"]
# Restore the original stdout
sys.stdout = original_stdout
# Get the captured output as a string
captured_output = output_buffer.getvalue()
return df, captured_output
def display_subset_of_df(df, columns_at_start=[], columns_at_end=[]):
desired_order = [
"Name",
"score",
"Aktiv",
"CreatedAt",
"Versandart",
"UID_CHID",
"ReferenceID",
"address_full",
"address_gmaps",
"EMailAdresse",
"Telefonnummer",
"AnzahlObjektZeiger",
"AnzahlVerknuepfungen",
"VerknuepftesObjekt",
"VerknuepftesObjektID",
"Verknuepfungsart",
]
desired_order = columns_at_start + desired_order + columns_at_end
output_df = df[desired_order]
# Use last 3 digits of ReferenceID as index
output_df["index_column"] = df["ReferenceID"].str[-3:]
output_df.set_index("index_column", inplace=True)
# Remove duplicates based on ReferenceID
output_df = output_df.drop_duplicates(subset="ReferenceID", keep="first")
# Remove columns with all NaN values
output_df = output_df.dropna(axis=1, how="all")
# Remove columns with all empty lists
columns_to_remove = []
for col in output_df.columns:
if all(pd.isnull(output_df[col])):
columns_to_remove.append(col)
elif all(isinstance(item, list) and len(item) == 0 for item in output_df[col]):
columns_to_remove.append(col)
output_df = output_df.drop(columns=columns_to_remove)
return output_df
def calculate_scores_personen(df, physisch=False):
# For Doubletten physisch, UID is not really important. We still consider it here but divided by 10.
# Fill missing values for non-list columns
df.fillna(
{
"AnzahlGeschaeftsobjekte": 0,
"Versandart": 0,
"AnzahlObjektZeiger": 0,
"AnzahlVerknuepfungen": 0,
"Servicerole_string": "",
},
inplace=True,
)
# UID_CHID_check calculation
df["UID_CHID_check"] = df["UID_CHID"].apply(
lambda x: 0
if pd.isna(x) or x == ""
else 1
if str(x).lower() == "notregisteredchid"
else 2
)
# Function to calculate score and score details
def score_and_details(row):
score_components = {
"Geschaeftsobjekte": row["AnzahlGeschaeftsobjekte"] * 30,
"UID": int(row["UID_CHID_check"] * 50 / (10 if physisch else 1)),
"Verknuepfungsart": sum(
100 if val == "Administrator" else 50 if val == "Mitarbeiter" else 0
for val in (row["Verknuepfungsart_list"] or [])
),
"Versandart": 100 if row["Versandart"] == "Portal" else 0,
"ObjektZeiger": np.minimum(row["AnzahlObjektZeiger"] * 10, 100),
"Geschaeftspartner": sum(
100 for _ in (row["Geschaeftspartner_list"] or [])
),
"Servicerole_string": 100 if "Ausweis" in row["Servicerole_string"] else 0,
"Produktrolle": len(row["Produkt_rolle"]) * 100
if row["Produkt_rolle"]
else 0,
}
if row["EMailAdresse"] and not pd.isna(row["EMailAdresse"]):
score_components["Email"] = 20
if row["Telefonnummer"] and not pd.isna(row["Telefonnummer"]):
score_components["Email"] = 10
score_details = ", ".join(
[f"{name} {score}" for name, score in score_components.items() if score > 0]
)
total_score = sum(score_components.values())
return total_score, score_details
# Apply the function to each row
df[["score", "score_details"]] = df.apply(
lambda row: score_and_details(row), axis=1, result_type="expand"
)
return df
def calculate_scores_organisationen(df):
# new: requires serviceroles and produkte to be integrated.
df["Debitornummer"] = df["Debitornummer"].fillna(0)
df["Versandart"] = df["Versandart"].fillna(0)
df["AnzahlGeschaeftsobjekte"] = df["AnzahlGeschaeftsobjekte"].fillna(0)
df["AnzahlObjektZeiger"] = df["AnzahlObjektZeiger"].fillna(0)
df["Debitornummer_check"] = df["Debitornummer"].apply(lambda x: 1 if x > 0 else 0)
df["UID_CHID_check"] = df["UID_CHID"].apply(
lambda x: 1 if isinstance(x, str) else 0
)
def calculate_score_and_details(row):
score_components = {
"Debitornummer": row["Debitornummer_check"] * 100,
"UID_CHID": row["UID_CHID_check"] * 200,
"Versandart": 100 if row["Versandart"] == "Portal" else 0,
"Geschaeftsobjekte": row["AnzahlGeschaeftsobjekte"] * 30,
"ObjektZeiger": min(row["AnzahlObjektZeiger"] * 10, 100),
"Verknuepfungsart": sum(
100 if val == "Administrator" else 50 if val == "Mitarbeiter" else 0
for val in row["Verknuepfungsart_list"]
),
"Geschaeftspartner": sum(100 for _ in row["Geschaeftspartner_list"]),
"Produkt_Inhaber": min(row["Produkt_Inhaber"] * 80, 200),
"Produkt_Adressant": min(row["Produkt_Adressant"] * 30, 100),
"Servicerole": row["Servicerole_count"] * 50,
"UID_MASTER": 1000 if row["UID_MASTER"] == True else 0
}
total_score = sum(score_components.values())
score_details = ", ".join([f"{k}: {v}" for k, v in score_components.items() if v > 0])
return total_score, score_details
df["score"], df["score_details"] = zip(*df.apply(calculate_score_and_details, axis=1))
return df
def get_geschaeftspartner(input_df, folder_path):
"""
Check if input df has matching ReferenceID with any of the other dfs.
df gets a new column "Geschaeftspartner" which contains a list of all matching partners.
"""
# Create the "Geschaeftspartner" column in the input_df
input_df["Geschaeftspartner"] = [[] for _ in range(len(input_df))]
# List all xlsx files in the specified directory
xlsx_files = glob.glob(f"{folder_path}/*.xlsx")
# Helper function to check if a ReferenceID exists in any of the dfs and return its name(s)
def check_reference(reference, df, partner_name):
if reference in df["ReferenceID"].values:
return [partner_name]
return []
# Load each xlsx file and check for a match with the ReferenceID in input_df
for xlsx_file in xlsx_files:
# Extract the partner name from the file name
partner_name = (
os.path.basename(xlsx_file)
.rsplit("-", 1)[-1]
.rsplit("_", 1)[-1]
.split(".")[0]
)
# Load the dataframe
df = pd.read_excel(xlsx_file)
# Loop through each row in input_df and populate the "Geschaeftspartner" column
for index, row in input_df.iterrows():
partners = check_reference(row["ReferenceID"], df, partner_name)
input_df.at[index, "Geschaeftspartner"].extend(partners)
return input_df
# ----- Functions related to app file upload ----
# -------------------------------
def upload_files():
uploaded_files = st.file_uploader(
"Upload File", accept_multiple_files=True, type=["xlsx"]
)
if uploaded_files is not None and not st.session_state["clear_data"]:
for uploaded_file in uploaded_files:
# For backwards compatibility, put Geschaeftspartner files into appropriate subfolders.
if fnmatch.fnmatch(
uploaded_file.name, "*Geschaeftspartner*_Organisationen*.xlsx"
):
relative_dir = "data/mandanten/organisationen"
elif fnmatch.fnmatch(
uploaded_file.name, "*Geschaeftspartner*_Personen*.xlsx"
):
relative_dir = "data/mandanten/personen"
else:
relative_dir = "data"
# Combine the stored cwd with the relative directory
data_dir = os.path.join(st.session_state["cwd"], relative_dir)
# Ensure the directory exists
if not os.path.exists(data_dir):
os.makedirs(data_dir)
# Define the full file path
file_path = os.path.join(data_dir, uploaded_file.name)
# Write the file to the specified location
with open(file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
if len(uploaded_files) > 0:
st.success(f"{len(uploaded_files)} files saved")
import shutil
def clear_data_directory(directory="data"):
# Combine the stored cwd with the relative directory
full_directory_path = os.path.join(st.session_state["cwd"], directory)
# Check if the directory exists
if os.path.exists(full_directory_path):
# Remove all files in the directory
for filename in os.listdir(full_directory_path):
file_path = os.path.join(full_directory_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
st.error(f"Failed to delete {file_path}. Reason: {e}")
st.success("Data directory cleared.")
st.session_state["clear_data"] = True
else:
st.warning("Data directory does not exist.")
# ----------------- other app only related function -----
import re
from datetime import datetime
def get_data_version():
"""
find_all_data() must have been executed somewhere before this.
"""
# Check if 'file_paths' exists in the session state and is not empty
if "file_paths" not in st.session_state or not st.session_state["file_paths"]:
# Handle the empty or missing case
return "No data", "No data", []
filenames = st.session_state["file_paths"]
date_pattern = r"\d{4}-\d{2}-\d{2}"
dates = set()
filenames_with_dates = []
for key, value in filenames.items():
match = re.search(date_pattern, value)
filename_with_extension = os.path.basename(value)
filename_without_extension = os.path.splitext(filename_with_extension)[0]
if match:
date_str = match.group()
date_obj = datetime.strptime(date_str, "%Y-%m-%d")
dates.add(date_obj)
# Add a tuple of (date_obj, filename_without_extension) to the list
filenames_with_dates.append((date_obj, filename_without_extension))
# Sort the list by dates
filenames_with_dates.sort(key=lambda x: x[0])
# Extract just the filenames in order
ordered_filenames = [filename for date, filename in filenames_with_dates]
# Find the earliest and latest date
earliest_date = min(dates).strftime("%Y-%m-%d")
latest_date = max(dates).strftime("%Y-%m-%d")
# Store it in session state for later use
st.session_state["file_versions"] = {}
st.session_state["file_versions"]["earliest_date"] = earliest_date
st.session_state["file_versions"]["latest_date"] = latest_date
st.session_state["file_versions"]["ordered_filenames"] = ordered_filenames
return earliest_date, latest_date, ordered_filenames
def upload_python_files():
# Allows replacing code directly in the app directory. Use with caution!
uploaded_files = st.file_uploader(
"Upload Python Files", accept_multiple_files=True, type=["py"]
)
if uploaded_files:
for uploaded_file in uploaded_files:
# Define the path where the file should be saved
file_path = os.path.join(st.session_state["cwd"], uploaded_file.name)
# Write the file to the specified location
with open(file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
st.success(f"{len(uploaded_files)} files uploaded and replaced.")