From 78c84d1e17ece1cebec73a19b5fc5a84bb8cd38f Mon Sep 17 00:00:00 2001 From: horhik Date: Thu, 29 Apr 2021 08:54:01 +0300 Subject: [PATCH] fix regexp in serv_conn.rs --- src/chat/serv_conn.rs | 45 +++++++++++++++++++++++++++++++++--------- src/chat/stay_awake.rs | 2 +- src/main.rs | 3 ++- 3 files changed, 39 insertions(+), 11 deletions(-) diff --git a/src/chat/serv_conn.rs b/src/chat/serv_conn.rs index d14615c..c856e6a 100644 --- a/src/chat/serv_conn.rs +++ b/src/chat/serv_conn.rs @@ -1,8 +1,11 @@ use super::serv_handler::to_server_sender; use crate::api::types::Message as FrontMessage; use crate::chat::types::{PackedMessage, RP, SP}; +use crate::db::users::increase_messages_count; use async_std::task; use fcpv2::types::traits::FcpParser; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; use regex::Regex; use serde_derive::Deserialize; use std::env; @@ -12,11 +15,19 @@ use tokio::{ }; #[tokio::main] -pub async fn listen_server(client_sender: SP, server_receiver: RP) -> io::Result<()> { - task::block_on(connect_to_server(client_sender, server_receiver)) +pub async fn listen_server( + client_sender: SP, + server_receiver: RP, + conn: Pool, +) -> io::Result<()> { + task::block_on(connect_to_server(client_sender, server_receiver, conn)) } -async fn connect_to_server(client_sender: SP, server_receiver: RP) -> io::Result<()> { +async fn connect_to_server( + client_sender: SP, + server_receiver: RP, + conn: Pool, +) -> io::Result<()> { let addr = env::args() .nth(1) .unwrap_or_else(|| "127.0.0.1:9481".to_string()); @@ -27,14 +38,18 @@ async fn connect_to_server(client_sender: SP, server_receiver: RP) -> io::Result .expect("Unable to connect to FCP"); let (receiver, sender) = stream.into_split(); log::info!("Connected to FCP"); - let t = task::spawn(server_responce_getter(receiver, client_sender)); + let t = task::spawn(server_responce_getter(receiver, client_sender, conn)); to_server_sender(sender, server_receiver, sr).await?; match t.await { Ok(_) => Ok(()), Err(e) => Err(e), } } -async fn server_responce_getter(mut receiver: OwnedReadHalf, client_sender: SP) -> io::Result<()> { +async fn server_responce_getter( + mut receiver: OwnedReadHalf, + client_sender: SP, + conn: Pool, +) -> io::Result<()> { // let mut prev = [0; 1024]; loop { // each freenet responce have an identifier and program will define what to do with request by this identifier @@ -93,12 +108,16 @@ async fn server_responce_getter(mut receiver: OwnedReadHalf, client_sender: SP) } } "DataFound" => { + let db = conn.get().unwrap(); log::debug!("Receive a new message!!! {:?}", &received.trim()); let rec = received.clone(); let splitted: Vec<&str> = rec.split_inclusive("AllDatan").collect(); log::debug!("\n\n\n\n\n AAAAAAAAA \n\n\n\n"); - let reg = Regex::new("AllData\nIdentifier=(.*)\nCompletionTime=(.*)\nStartupTime=(.*)\nDataLength=(.*)\nGlobal=(.*)\nMetadata.ContentType=(.*)\nData\n((.|\n)*)").unwrap(); - let captured = reg.captures(&received[..]).unwrap(); + match Regex::new("AllData\nIdentifier=(.*)\nCompletionTime=(.*)\nStartupTime=(.*)\nDataLength=(.*)\nGlobal=(.*)\nMetadata.ContentType=(.*)\nData\n((.|\n)*)"){ + Ok(reg) => { + match reg.captures(&received[..]){ + + Some(captured) => { log::debug!("\n\n\n\n\n AAAAAAAAA {:?} \n\n\n\n", captured); let data_length: usize = usize::from_str_radix(&captured[4], 10).unwrap(); let message = &captured[7][0..data_length].to_string(); @@ -119,15 +138,23 @@ async fn server_responce_getter(mut receiver: OwnedReadHalf, client_sender: SP) )) .unwrap(); log::debug!("Send gotted message to frontend..."); + log::debug!("Increasing messages count..."); + increase_messages_count(crate::db::types::Id(json.id), &db).unwrap(); } Err(_) => { log::error!("Failed to parse gotted message"); } } + }, + None => {log::error!("Failed to parse regexp")}, + } + }, + Err(e) => {log::error!("Failed to unwrap regexp")} + } } - "AllData" => { - log::debug!("Receive a new message!!! {:?}", &received); + "Alldata" => { + log::debug!("Receive a new AllData!!! {:?}", &received); let message = fcpv2::node::fcp_response::AllData::parse(&received[..]).unwrap(); log::debug!("Parse new message!!!! {:?}", &message); diff --git a/src/chat/stay_awake.rs b/src/chat/stay_awake.rs index 644dbc6..5678dc8 100644 --- a/src/chat/stay_awake.rs +++ b/src/chat/stay_awake.rs @@ -22,7 +22,7 @@ pub async fn request_repeater(ss: SP, conn: Pool) -> io loop { let users: Vec = crate::db::users::load_all_users(&db).unwrap(); - let time = std::time::Duration::from_millis(1300); + let time = std::time::Duration::from_millis(10000); std::thread::sleep(time); log::debug!("enough sleep"); for user in users { diff --git a/src/main.rs b/src/main.rs index 5864c5c..88ea42c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -48,6 +48,7 @@ fn main() -> io::Result<()> { //TODO Add connection pool for ruqlite let db = db::start_db().unwrap(); + let d2 = db.clone(); //let manager = r2d2_foodb::FooConnectionManager::new("localhost:1234"); let (to_server_sender, server_receiver): (Sender, Receiver) = @@ -60,7 +61,7 @@ fn main() -> io::Result<()> { let cs = client_sender; let sr = server_receiver; - let t = thread::spawn(move || listen_server(cs, sr)); + let t = thread::spawn(move || listen_server(cs, sr, d2)); t.join().expect("failed server thread").unwrap(); });