Skip to content

Commit 3981811

Browse files
skeptrunedevcdxker
authored andcommitted
feature: add delete chunks flag to delete file route
1 parent fdda50f commit 3981811

5 files changed

Lines changed: 134 additions & 32 deletions

File tree

server/src/handlers/file_handler.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use super::auth_handler::{AdminOnly, LoggedUser};
1+
use super::{
2+
auth_handler::{AdminOnly, LoggedUser},
3+
group_handler::DeleteGroupData,
4+
};
25
use crate::{
36
data::models::{
47
DatasetAndOrgWithSubAndPlan, DatasetConfiguration, File, FileAndGroupId, FileWorkerMessage,
@@ -336,6 +339,7 @@ pub async fn get_dataset_files_handler(
336339
params(
337340
("TR-Dataset" = String, Header, description = "The dataset id or tracking_id to use for the request. We assume you intend to use an id if the value is a valid uuid."),
338341
("file_id" = uuid::Uuid, description = "The id of the file to delete"),
342+
("delete_chunks" = bool, Query, description = "Delete the chunks within the group"),
339343
),
340344
security(
341345
("ApiKey" = ["admin"]),
@@ -344,14 +348,17 @@ pub async fn get_dataset_files_handler(
344348
#[tracing::instrument(skip(pool))]
345349
pub async fn delete_file_handler(
346350
file_id: web::Path<uuid::Uuid>,
351+
query: web::Query<DeleteGroupData>,
347352
pool: web::Data<Pool>,
348353
_user: AdminOnly,
349354
dataset_org_plan_sub: DatasetAndOrgWithSubAndPlan,
350355
) -> Result<HttpResponse, actix_web::Error> {
351356
let dataset_config =
352357
DatasetConfiguration::from_json(dataset_org_plan_sub.dataset.server_configuration.clone());
358+
353359
delete_file_query(
354360
file_id.into_inner(),
361+
query.delete_chunks,
355362
dataset_org_plan_sub.dataset,
356363
pool,
357364
dataset_config,

server/src/handlers/group_handler.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -604,12 +604,10 @@ pub async fn delete_chunk_group(
604604
)
605605
.await?;
606606

607-
let deleted_at = chrono::Utc::now().naive_utc();
608-
609607
delete_group_by_id_query(
610608
group_id,
611609
dataset_org_plan_sub.dataset,
612-
deleted_at,
610+
chrono::Utc::now().naive_utc(),
613611
data.delete_chunks,
614612
delete_group_pool,
615613
dataset_config,

server/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ impl Modify for SecurityAddon {
141141
name = "BSL",
142142
url = "https://github.com/devflowinc/trieve/blob/main/LICENSE.txt",
143143
),
144-
version = "0.11.8",
144+
version = "0.11.9",
145145
),
146146
servers(
147147
(url = "https://api.trieve.ai",

server/src/operators/file_operator.rs

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use std::collections::HashMap;
2-
31
use super::chunk_operator::{create_chunk_metadata, get_row_count_for_organization_id_query};
42
use super::clickhouse_operator::{ClickHouseEvent, EventQueue};
53
use super::group_operator::{create_group_from_file_query, create_groups_query};
@@ -9,6 +7,7 @@ use crate::data::models::FileDTO;
97
use crate::data::models::{Dataset, DatasetAndOrgWithSubAndPlan, DatasetConfiguration, EventType};
108
use crate::handlers::chunk_handler::ChunkReqPayload;
119
use crate::handlers::file_handler::UploadFileReqPayload;
10+
use crate::operators::group_operator::delete_group_by_file_id_query;
1211
use crate::{data::models::WorkerEvent, get_env};
1312
use crate::{
1413
data::models::{File, Pool},
@@ -18,11 +17,11 @@ use actix_web::web;
1817
use diesel::dsl::sql;
1918
use diesel::prelude::*;
2019
use diesel::sql_types::BigInt;
21-
use diesel_async::scoped_futures::ScopedFutureExt;
22-
use diesel_async::{AsyncConnection, RunQueryDsl};
20+
use diesel_async::RunQueryDsl;
2321
use redis::aio::MultiplexedConnection;
2422
use regex::Regex;
2523
use s3::{creds::Credentials, Bucket, Region};
24+
use std::collections::HashMap;
2625

2726
#[tracing::instrument]
2827
pub fn get_aws_bucket() -> Result<Bucket, ServiceError> {
@@ -359,6 +358,7 @@ pub async fn get_dataset_file_query(
359358
#[tracing::instrument(skip(pool))]
360359
pub async fn delete_file_query(
361360
file_uuid: uuid::Uuid,
361+
delete_chunks: Option<bool>,
362362
dataset: Dataset,
363363
pool: web::Data<Pool>,
364364
dataset_config: DatasetConfiguration,
@@ -383,29 +383,28 @@ pub async fn delete_file_query(
383383
.await
384384
.map_err(|_| ServiceError::BadRequest("Could not delete file from S3".to_string()))?;
385385

386-
let transaction_result = conn
387-
.transaction::<_, diesel::result::Error, _>(|conn| {
388-
async {
389-
diesel::delete(
390-
files_columns::files
391-
.filter(files_columns::id.eq(file_uuid))
392-
.filter(files_columns::dataset_id.eq(dataset.clone().id)),
393-
)
394-
.execute(conn)
395-
.await?;
396-
397-
Ok(())
398-
}
399-
.scope_boxed()
400-
})
401-
.await;
402-
403-
match transaction_result {
404-
Ok(_) => (),
405-
Err(e) => {
406-
log::error!("Error deleting file with transaction {:?}", e);
407-
return Err(ServiceError::BadRequest("Could not delete file".to_string()).into());
408-
}
386+
diesel::delete(
387+
files_columns::files
388+
.filter(files_columns::id.eq(file_uuid))
389+
.filter(files_columns::dataset_id.eq(dataset.clone().id)),
390+
)
391+
.execute(&mut conn)
392+
.await
393+
.map_err(|e| {
394+
log::error!("Error deleting file {:?}", e);
395+
ServiceError::BadRequest("Could not delete file".to_string())
396+
})?;
397+
398+
if delete_chunks.is_some_and(|delete_chunks| delete_chunks) {
399+
delete_group_by_file_id_query(
400+
file_uuid,
401+
dataset,
402+
chrono::Utc::now().naive_utc(),
403+
Some(true),
404+
pool.clone(),
405+
dataset_config,
406+
)
407+
.await?;
409408
}
410409

411410
Ok(())

server/src/operators/group_operator.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,104 @@ pub async fn delete_group_by_id_query(
354354
}
355355
}
356356

357+
#[tracing::instrument(skip(pool))]
358+
pub async fn delete_group_by_file_id_query(
359+
file_id: uuid::Uuid,
360+
dataset: Dataset,
361+
deleted_at: chrono::NaiveDateTime,
362+
delete_chunks: Option<bool>,
363+
pool: web::Data<Pool>,
364+
dataset_config: DatasetConfiguration,
365+
) -> Result<(), ServiceError> {
366+
use crate::data::schema::chunk_group::dsl as chunk_group_columns;
367+
use crate::data::schema::chunk_group_bookmarks::dsl as chunk_group_bookmarks_columns;
368+
use crate::data::schema::chunk_metadata::dsl as chunk_metadata_columns;
369+
use crate::data::schema::groups_from_files::dsl as groups_from_files_columns;
370+
371+
let mut conn = pool.get().await.map_err(|_e| {
372+
ServiceError::InternalServerError("Failed to get postgres connection".to_string())
373+
})?;
374+
375+
let group_id: uuid::Uuid = groups_from_files_columns::groups_from_files
376+
.filter(groups_from_files_columns::file_id.eq(file_id))
377+
.select(groups_from_files_columns::group_id)
378+
.first::<uuid::Uuid>(&mut conn)
379+
.await
380+
.map_err(|_err| {
381+
ServiceError::BadRequest("Error getting group id for file_id".to_string())
382+
})?;
383+
384+
let delete_chunks = delete_chunks.unwrap_or(false);
385+
let chunks = chunk_group_bookmarks_columns::chunk_group_bookmarks
386+
.inner_join(chunk_metadata_columns::chunk_metadata)
387+
.filter(chunk_group_bookmarks_columns::group_id.eq(group_id))
388+
.select(ChunkMetadataTable::as_select())
389+
.load::<ChunkMetadataTable>(&mut conn)
390+
.await
391+
.map_err(|_err| ServiceError::BadRequest("Error getting chunks".to_string()))?;
392+
393+
let transaction_result = conn
394+
.transaction::<_, diesel::result::Error, _>(|conn| {
395+
async move {
396+
diesel::delete(
397+
groups_from_files_columns::groups_from_files
398+
.filter(groups_from_files_columns::group_id.eq(group_id))
399+
.filter(groups_from_files_columns::created_at.le(deleted_at)),
400+
)
401+
.execute(conn)
402+
.await?;
403+
404+
diesel::delete(
405+
chunk_group_bookmarks_columns::chunk_group_bookmarks
406+
.filter(chunk_group_bookmarks_columns::group_id.eq(group_id))
407+
.filter(chunk_group_bookmarks_columns::created_at.le(deleted_at)),
408+
)
409+
.execute(conn)
410+
.await?;
411+
412+
diesel::delete(
413+
chunk_group_columns::chunk_group
414+
.filter(chunk_group_columns::id.eq(group_id))
415+
.filter(chunk_group_columns::dataset_id.eq(dataset.id))
416+
.filter(chunk_group_columns::created_at.le(deleted_at)),
417+
)
418+
.execute(conn)
419+
.await?;
420+
421+
Ok(())
422+
}
423+
.scope_boxed()
424+
})
425+
.await;
426+
427+
if delete_chunks {
428+
let chunk_ids = chunks.iter().map(|chunk| chunk.id).collect();
429+
delete_chunk_metadata_query(
430+
chunk_ids,
431+
deleted_at,
432+
dataset.clone(),
433+
pool.clone(),
434+
dataset_config.clone(),
435+
)
436+
.await?;
437+
} else {
438+
let remove_chunks_from_groups_futures = chunks.iter().map(|chunk| {
439+
remove_bookmark_from_qdrant_query(
440+
chunk.qdrant_point_id,
441+
group_id,
442+
dataset_config.clone(),
443+
)
444+
});
445+
446+
futures::future::join_all(remove_chunks_from_groups_futures).await;
447+
}
448+
449+
match transaction_result {
450+
Ok(_) => Ok(()),
451+
Err(_) => Err(ServiceError::BadRequest("Error deleting group".to_string())),
452+
}
453+
}
454+
357455
#[tracing::instrument(skip(pool))]
358456
pub async fn update_chunk_group_query(
359457
group: ChunkGroup,

0 commit comments

Comments
 (0)