@@ -50,7 +50,7 @@ use tracing::{debug, error, info, warn};
5050
5151use crate :: {
5252 users:: { AuthSession , Backend } ,
53- web:: { auth, protected, CameraMessage } ,
53+ web:: { auth, protected, CameraListChange , CameraMessage } ,
5454 ApiChannelMessage , Camera , CameraPermissionView , CameraSetting , CameraSettingNoMeta , Model ,
5555 User , Video ,
5656} ;
@@ -126,7 +126,8 @@ impl App {
126126 debug ! ( "Video path: {:?}" , video_path) ;
127127
128128 // Private IPv4 e.g. 192.168.x.x with port for passing to camera
129- let oko_private_socket_addr = SocketAddr :: from ( ( local_ip_address:: local_ip ( ) ?, listener. local_addr ( ) ?. port ( ) ) ) ;
129+ let oko_private_socket_addr =
130+ SocketAddr :: from ( ( local_ip_address:: local_ip ( ) ?, listener. local_addr ( ) ?. port ( ) ) ) ;
130131
131132 Ok ( Self {
132133 db,
@@ -319,7 +320,7 @@ async fn ws_handler(
319320 info ! ( "{addr} connected to ws_handler." ) ;
320321 // finalize the upgrade process by returning upgrade callback.
321322 // we can customize the callback by sending additional info such as address.
322- ws. on_upgrade ( move |socket| handle_socket ( socket, addr, state, auth_session) )
323+ ws. on_upgrade ( move |socket| handle_socket ( socket, addr, state, Arc :: new ( auth_session) ) )
323324}
324325
325326// ! Camera restart does not guarantee new recording, frames will keep going to the same video unless socket times out?
@@ -331,7 +332,7 @@ async fn handle_socket(
331332 mut socket : WebSocket ,
332333 who : SocketAddr ,
333334 state : State < Arc < AppState > > ,
334- auth_session : AuthSession ,
335+ auth_session : Arc < AuthSession > ,
335336) {
336337 info ! ( "{who} connected to handle_socket." ) ;
337338
@@ -382,7 +383,8 @@ async fn handle_socket(
382383 }
383384
384385 let mut initial_camera_settings = None ;
385- let mut cameras: Vec < CameraPermissionView > = Vec :: new ( ) ;
386+ let cameras: Arc < Mutex < Vec < CameraPermissionView > > > = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
387+ let mut user_id = None ;
386388
387389 if is_camera {
388390 // TODO: Maybe find a better way to handle this
@@ -418,19 +420,21 @@ async fn handle_socket(
418420 initial_camera_settings = Some ( camera_settings) ;
419421 } else {
420422 // TODO: Return errors to user
421- let Some ( user) = auth_session. user else {
423+ let Some ( ref user) = auth_session. user else {
422424 error ! ( "User not found in auth session..." ) ;
423425 return ;
424426 } ;
425427
428+ user_id = Some ( user. user_id ) ;
429+
426430 let Ok ( i_cameras) =
427431 Camera :: list_accessible_to_user ( & auth_session. backend . db , user. user_id ) . await
428432 else {
429433 error ! ( "Error listing cameras for user..." ) ;
430434 return ;
431435 } ;
432436
433- cameras = i_cameras;
437+ * cameras. lock ( ) . await = i_cameras;
434438 }
435439
436440 let initial_camera_settings_clone = initial_camera_settings. clone ( ) ;
@@ -443,6 +447,7 @@ async fn handle_socket(
443447 // ! Camera restart does not guarantee new recording, frames will keep going to the same video unless socket times out?
444448 let mut recording_task: JoinHandle < Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > > =
445449 if is_camera {
450+ let auth_session_clone = auth_session. clone ( ) ;
446451 // TODO: Check if errors are returned properly here, had some issues with the ? operator being silent
447452 tracker. spawn ( async move {
448453 let now = Video :: DEFAULT . start_time ( ) ;
@@ -461,7 +466,9 @@ async fn handle_socket(
461466 } ;
462467
463468 // ? Maybe don't create video until first frame (or maybe doing this is actually a good approach)?
464- video. create_using_self ( & auth_session. backend . db ) . await ?;
469+ video
470+ . create_using_self ( & auth_session_clone. backend . db )
471+ . await ?;
465472
466473 let ( frame_width, frame_height, framerate) = match initial_camera_settings_clone {
467474 #[ allow( clippy:: match_same_arms) ] // readability
@@ -533,7 +540,9 @@ async fn handle_socket(
533540 video. end_time = Some ( OffsetDateTime :: now_utc ( ) ) ;
534541 video. file_size = Some ( total_bytes. try_into ( ) ?) ;
535542
536- video. update_using_self ( & auth_session. backend . db ) . await ?;
543+ video
544+ . update_using_self ( & auth_session_clone. backend . db )
545+ . await ?;
537546 info ! ( "Recording finished for {who}..." ) ;
538547
539548 Ok ( ( ) )
@@ -566,6 +575,7 @@ async fn handle_socket(
566575 #[ allow( clippy:: if_not_else) ]
567576 let mut send_task: JoinHandle < Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > > =
568577 if !is_camera {
578+ let cameras_clone = cameras. clone ( ) ;
569579 let mut images_rx = state. images_tx . subscribe ( ) ;
570580 let sender_mutex_clone = sender_mutex. clone ( ) ;
571581 // TODO: Proper error handling
@@ -579,20 +589,24 @@ async fn handle_socket(
579589 if first_received {
580590 debug ! ( "Sending message to {who}..." ) ;
581591
582- if !cameras. iter ( ) . any ( |c| c. camera_id == message. camera_id ) {
583- continue ;
584- }
585-
586- // TODO: look into bincode (fastest?) / rmp-serde (wide support) / flatbuffers (partial deserialization)
587- let message_json = serde_json:: to_string ( & message) ?;
588- let message_json_msg = Message :: Text ( message_json. clone ( ) ) ;
589-
590- // TODO: Handle error here
591- sender_mutex_clone
592+ // TODO: locking this mutex in such a hot path is a horrible idea, absolutely find a better way to do this
593+ if cameras_clone
592594 . lock ( )
593595 . await
594- . send ( message_json_msg)
595- . await ?;
596+ . iter ( )
597+ . any ( |c| c. camera_id == message. camera_id )
598+ {
599+ // TODO: look into bincode (fastest?) / rmp-serde (wide support) / flatbuffers (partial deserialization)
600+ let message_json = serde_json:: to_string ( & message) ?;
601+ let message_json_msg = Message :: Text ( message_json. clone ( ) ) ;
602+
603+ // TODO: Handle error here
604+ sender_mutex_clone
605+ . lock ( )
606+ . await
607+ . send ( message_json_msg)
608+ . await ?;
609+ }
596610 }
597611
598612 if images_rx. changed ( ) . await . is_err ( ) {
@@ -654,11 +668,12 @@ async fn handle_socket(
654668 }
655669 } ) ;
656670
671+ let api_channel = state. api_channel . clone ( ) ;
672+ let mut first_received = false ;
673+
657674 let mut api_listener_task: JoinHandle < Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > > =
658675 if is_camera {
659- let api_channel = state. api_channel . clone ( ) ;
660676 let sender_mutex_clone = sender_mutex. clone ( ) ;
661- let mut first_received = false ;
662677 tokio:: spawn ( async move {
663678 if let Some ( some_camera_settings) = initial_camera_settings {
664679 let some_initial_camera_settings = CameraSettingNoMeta {
@@ -687,8 +702,9 @@ async fn handle_socket(
687702 let api_msg = ( * api_channel_rx. borrow_and_update ( ) ) . clone ( ) ;
688703
689704 if first_received {
705+ #[ allow( clippy:: single_match) ] // will change in the future
690706 match api_msg {
691- ApiChannelMessage :: CameraRelated {
707+ ApiChannelMessage :: CameraAction {
692708 camera_id : message_camera_id,
693709 message,
694710 } => {
@@ -707,6 +723,17 @@ async fn handle_socket(
707723 }
708724 }
709725 }
726+ ApiChannelMessage :: CameraListChanged ( change) => match change {
727+ CameraListChange :: Removed {
728+ camera_id : camera_id_removed,
729+ } => {
730+ if camera_id_removed == camera_id {
731+ info ! ( "Closing WebSocket for {who} because camera was removed from DB" ) ;
732+ return Err ( "Camera removed from DB" . into ( ) ) ;
733+ }
734+ }
735+ _ => ( ) ,
736+ } ,
710737 ApiChannelMessage :: Initial => ( ) ,
711738 }
712739 }
@@ -722,10 +749,47 @@ async fn handle_socket(
722749 } )
723750 } else {
724751 tokio:: spawn ( async move {
725- let mut interval = tokio :: time :: interval ( EMPTY_TASK_SLEEP_DURATION ) ;
752+ let mut api_channel_rx = api_channel . subscribe ( ) ;
726753 loop {
727- interval. tick ( ) . await ;
754+ let api_msg = ( * api_channel_rx. borrow_and_update ( ) ) . clone ( ) ;
755+
756+ if first_received {
757+ #[ allow( clippy:: single_match) ] // will change in the future
758+ match api_msg {
759+ // TODO: every user task performing this is wasteful, global camera list mutex shared with api would be better
760+ ApiChannelMessage :: CameraListChanged ( _) => {
761+ info ! ( "API channel message received for camera list changed for {who}..." ) ;
762+
763+ let Some ( user_id_some) = user_id else {
764+ error ! ( "Camera list changed but user was not found in auth_session. How is this even possible?" ) ;
765+ break ;
766+ } ;
767+
768+ let Ok ( new_cameras) = Camera :: list_accessible_to_user (
769+ & auth_session. backend . db ,
770+ user_id_some,
771+ )
772+ . await
773+ else {
774+ // TODO: prevent potential endless loop here from the DB call always failing
775+ error ! ( "Error listing new cameras for {who}..." ) ;
776+ continue ;
777+ } ;
778+
779+ * cameras. lock ( ) . await = new_cameras;
780+ }
781+ _ => ( ) ,
782+ }
783+ }
784+
785+ if api_channel_rx. changed ( ) . await . is_err ( ) {
786+ break ;
787+ }
788+
789+ first_received = true ;
728790 }
791+
792+ Ok ( ( ) )
729793 } )
730794 } ;
731795
@@ -763,6 +827,10 @@ async fn handle_socket(
763827 Ok ( _) => info!( "api_listener_task finished for {who}" ) ,
764828 Err ( d) => error!( "Error listening to API channel {d:?}" )
765829 }
830+ send_task. abort( ) ;
831+ recv_task. abort( ) ;
832+ recording_token_clone. cancel( ) ;
833+ tracker. wait( ) . await ;
766834 // ? Maybe do something if api channel fails e.g. send a message to the client/DB
767835 }
768836 }
0 commit comments