From 35d2eafdb85ab86f16801274615d039404465587 Mon Sep 17 00:00:00 2001 From: Horhik Date: Fri, 23 Apr 2021 21:54:48 +0300 Subject: [PATCH] Supporting Sqlite DB Pool and add json Message type --- Cargo.lock | 1 + Cargo.toml | 1 + src/api/handlers.rs | 6 +- src/api/mod.rs | 1 + src/api/request.rs | 4 +- src/api/selector.rs | 8 ++- src/api/types.rs | 12 +++- src/chat/front_conn.rs | 47 ++++++++++++---- src/chat/stay_awake.rs | 6 +- src/db/mod.rs | 123 +++++++++++++++++++++++++---------------- src/main.rs | 3 +- 11 files changed, 145 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6197be1..b84dcf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -639,6 +639,7 @@ dependencies = [ "futures-util", "log", "ntru", + "r2d2", "r2d2_sqlite", "regex", "rusqlite", diff --git a/Cargo.toml b/Cargo.toml index 6f347d0..812a381 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ chrono = {version = "0.4.19", features = ["serde"]} uuid = {version = "0.8.2", features = ["serde" , "v4"]} regex = "1.4.5" r2d2_sqlite = "0.18.0" +r2d2 = "0.8.9" [dependencies.fcpv2] path = "../FCPv2" diff --git a/src/api/handlers.rs b/src/api/handlers.rs index 4851672..494caf6 100644 --- a/src/api/handlers.rs +++ b/src/api/handlers.rs @@ -148,12 +148,12 @@ pub fn add_user( name: String, insert_key: String, sign_key: String, + id: uuid::Uuid, conn: &Connection, server_sender: SP, ) -> Result<()> { - let new_id = Uuid::new_v4(); let user = db::types::User { - id: db::types::Id(new_id.clone()), + id: Id(id.clone()), name: name.clone(), sign_key: sign_key.clone(), insert_key: SSK::parse(&insert_key[..]).unwrap(), @@ -161,7 +161,7 @@ pub fn add_user( my_messages_count: 0, }; let user_jsoned = crate::api::response::User{ - id: new_id.clone().to_string(), + id: id.clone().to_string(), name: name.clone(), sign_key: sign_key.clone(), insert_key: insert_key, diff --git a/src/api/mod.rs b/src/api/mod.rs index 99c62af..8375137 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -3,3 +3,4 @@ pub mod response; pub mod handlers; pub mod selector; pub mod identifier; +pub mod types; diff --git a/src/api/request.rs b/src/api/request.rs index 2927789..74c4209 100644 --- a/src/api/request.rs +++ b/src/api/request.rs @@ -28,6 +28,7 @@ pub enum Request { name: String, sign_key: String, insert_key: String, + id: uuid::Uuid, }, // CreateInstance TODO v0.3 } @@ -76,12 +77,13 @@ fn request_LoadMessages_are_correct() { } #[test] fn request_AddUser_are_correct() { - let json = "{\"type\":\"addUser\",\"name\":\"john\",\"signKey\":\"USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE\",\"insertKey\":\"USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE\"}"; + let json = "{\"type\":\"addUser\",\"name\":\"john\",\"signKey\":\"USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE\",\"insertKey\":\"USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE\",\"id\":\"2d12c305-eb79-489c-b643-f27f1e78a7c0\"\"}"; let parsed: Request = serde_json::from_str(json).unwrap(); assert_eq!(parsed, Request::AddUser{ name: "john".to_string(), sign_key: "USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE".to_string(), insert_key: "USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE".to_string(), + id: uuid::Uuid::from_str("2d12c305-eb79-489c-b643-f27f1e78a7c0"), }) } diff --git a/src/api/selector.rs b/src/api/selector.rs index 3dbb513..4e2211b 100644 --- a/src/api/selector.rs +++ b/src/api/selector.rs @@ -9,11 +9,14 @@ use rusqlite; use rusqlite::Connection; use serde_json::from_str; use serde_json::json; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; // server_sender sending data to server thread; -pub fn request_selector(json: &str, server_sender: SP, conn: &Connection) -> Result<()> { +pub fn request_selector(json: &str, server_sender: SP, conn: Pool,) -> Result<()> { // if let Ok(res) = from_str::(&json) { //TODO v0.3 Add Instances return Ok(()); // } + let conn = conn.get().unwrap(); log::info!("matching request..."); let parsed: Request = serde_json::from_str(json).unwrap(); match parsed { @@ -62,7 +65,8 @@ pub fn request_selector(json: &str, server_sender: SP, conn: &Connection) -> Res name, sign_key, insert_key, - } => match handlers::add_user(name, insert_key, sign_key, &conn, server_sender.clone()) { + id + } => match handlers::add_user(name, insert_key, sign_key, id, &conn, server_sender.clone()) { Ok(_) => {} Err(_) => {} }, diff --git a/src/api/types.rs b/src/api/types.rs index e197f19..8a557cd 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -1 +1,11 @@ -pub use super::request_types; +use crate::db::types::{Time, Id}; +use serde::{Serialize, Deserialize}; + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct Message { + pub message: String, + pub date: Time, + pub id: Id, + pub from_me: bool, +} diff --git a/src/chat/front_conn.rs b/src/chat/front_conn.rs index 8a1def4..2c04132 100644 --- a/src/chat/front_conn.rs +++ b/src/chat/front_conn.rs @@ -1,5 +1,4 @@ use crate::chat::types::PackedMessage; -use fcpv2::{types::traits::FcpParser, node::fcp_response::AllData}; use crate::db; use async_std::{ io, @@ -7,6 +6,7 @@ use async_std::{ task, }; use async_tungstenite::{accept_async, tungstenite::Message, WebSocketStream}; +use fcpv2::{node::fcp_response::AllData, types::traits::FcpParser}; use futures::{ stream::{SplitSink, SplitStream}, SinkExt, StreamExt, @@ -21,6 +21,8 @@ use super::stay_awake::request_repeater; use super::types::{RP, SP}; use crate::api::selector::request_selector; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; #[derive(Deserialize, Debug)] struct FrontMsg { @@ -33,7 +35,7 @@ struct FrontMsg { pub fn listen_client( server_sender: SP, client_receiver: RP, - conn: rusqlite::Connection, + conn: Pool, ) -> io::Result<()> { task::block_on(connect_to_client(server_sender, client_receiver, conn)) } @@ -41,7 +43,7 @@ pub fn listen_client( async fn connect_to_client( server_sender: SP, client_receiver: RP, - conn: rusqlite::Connection, + conn: Pool, ) -> io::Result<()> { let addr = env::args() .nth(1) @@ -69,9 +71,14 @@ async fn connect_to_client( sender, client_receiver, client_repeater, + conn.clone(), )); - let t2 = task::spawn(connection_for_sending(receiver, server_sender, conn)); - let t3 = task::spawn(request_repeater(ss)); + let t2 = task::spawn(connection_for_sending( + receiver, + server_sender, + conn.clone(), + )); + let t3 = task::spawn(request_repeater(ss, conn.clone())); t1.await?; t3.await?; t2.await?; @@ -84,9 +91,11 @@ async fn connection_for_receiving( mut sender: SplitSink, Message>, client_receiver: RP, server_sender: SP, + conn: Pool, ) -> io::Result<()> { log::info!("Connection for receiving launched"); // let mut prev: PackedMessage = PackedMessage::FromFreenet("nothing".to_string()); + let db = conn.get().unwrap(); while let Ok(res) = client_receiver.recv() { //TODO call client get after receiving NodeHello // log::debug!("RES {:?}", &res); @@ -107,11 +116,29 @@ async fn connection_for_receiving( match res_type { Some("AllData") => { let data = AllData::parse(&r).unwrap(); - log::debug!("GOT mESSAGE {}\n FROM FREENET: {}",data.identifier, data.data ); - server_sender.send(PackedMessage::ToClient(data.data)); + log::debug!( + "GOT mESSAGE {}\n FROM FREENET: {}", + &data.identifier, + &data.data + ); + server_sender.send(PackedMessage::ToClient(data.data.clone())); //TOOD parse identifier - let (uuid, id) = crate::api::identifier::parse_message_identifier(&data.identifier); + let (uuid, id) = + crate::api::identifier::parse_message_identifier(&data.identifier); log::debug!("parsed identifier: {:?} {:?}", uuid, id); + let jsoned: crate::api::types::Message = + serde_json::from_str(&data.data[..]).unwrap(); + crate::db::messages::add_message( + crate::db::types::Message { + id: id, + date: jsoned.date, + user_id: jsoned.id, + message: jsoned.message, + from_me: jsoned.from_me, + }, + &db, + ) + .unwrap() /*async_std::task::block_on( sender // TODO freenet_response_handler @@ -135,7 +162,7 @@ async fn connection_for_receiving( async fn connection_for_sending( mut receiver: SplitStream>, server_sender: SP, - conn: rusqlite::Connection, + conn: Pool, ) -> io::Result<()> { let ss = server_sender.clone(); log::info!("Connection for sending launched"); @@ -144,7 +171,7 @@ async fn connection_for_sending( if let Some(msg) = new_msg.await { let jsoned = msg.expect("Falied to unwrap gotted message"); log::info!("new request"); - match request_selector(&jsoned.to_string()[..], server_sender.clone(), &conn) { + match request_selector(&jsoned.to_string()[..], server_sender.clone(), conn.clone()) { Ok(_) => {} Err(e) => { log::error!("{}", e); diff --git a/src/chat/stay_awake.rs b/src/chat/stay_awake.rs index 4280245..28f613d 100644 --- a/src/chat/stay_awake.rs +++ b/src/chat/stay_awake.rs @@ -6,11 +6,13 @@ use std::sync::mpsc::Sender; use std::path::Path; use std::fs::File; +use r2d2_sqlite::SqliteConnectionManager; +use r2d2::Pool; type SP = Sender; -pub async fn request_repeater(ss: SP) -> io::Result<()> { +pub async fn request_repeater(ss: SP, conn: Pool) -> io::Result<()> { - let db = crate::db::start_db().unwrap(); + let db = conn.get().unwrap(); // loop { //TODO create a field with tracked users log::debug!("Request Repeater Started!"); diff --git a/src/db/mod.rs b/src/db/mod.rs index 7266d30..cdb85b6 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,7 +1,16 @@ use rusqlite::{params, Connection, Result}; +pub mod messages; pub mod types; pub mod users; -pub mod messages; + +extern crate r2d2; +extern crate r2d2_sqlite; +extern crate rusqlite; + +use core::marker::Send; +use r2d2_sqlite::SqliteConnectionManager; + +pub use r2d2::{ManageConnection, Pool}; use types::DB_PATH; @@ -49,13 +58,33 @@ fn create_db(conn: &Connection) -> Result<()> { Ok(()) } -pub fn start_db() -> Result { +pub fn start_db() -> std::result::Result, r2d2::Error> where +{ + let manager = r2d2_sqlite::SqliteConnectionManager::file(DB_PATH); + + if !std::path::Path::new(DB_PATH).exists() { + let conn = Connection::open(DB_PATH).expect("failed to connect to db"); + println!("{}", conn.is_autocommit()); + create_db(&conn).expect("failed to create db"); + Ok(r2d2::Pool::new(manager).unwrap()) + } else { + Ok(r2d2::Pool::new(manager).unwrap()) + } +} + +pub fn old_start_db() -> Result { if !std::path::Path::new(DB_PATH).exists() { let conn = Connection::open(DB_PATH)?; println!("{}", conn.is_autocommit()); match create_db(&conn) { - Ok(_) => {log::info!("Successfully created DB!"); Ok(conn)}, - Err(e) => {log::error!("Failed to create DB: {:?}",e ); Err(e)}, + Ok(_) => { + log::info!("Successfully created DB!"); + Ok(conn) + } + Err(e) => { + log::error!("Failed to create DB: {:?}", e); + Err(e) + } } } else { Connection::open(DB_PATH) @@ -63,47 +92,47 @@ pub fn start_db() -> Result { } // ### A little stupid test ### // - //let conn = db::start_db().unwrap(); - /* - users::add_user(db::types::User{ - id: 9349, - name: "Nick".to_string(), - sign_key: "string".to_string(), - insert_key: fcpv2::types::SSK::parse("SSK@Rgt0qM8D24DltliV2-JE9tYLcrgGAKeDwkz41I3JBPs,p~c8c7FXcJjhcf2vA-Xm0Mjyw1o~xn7L2-T8zlBA1IU").unwrap(), - messages_count: 1, - }, &conn); - let time: chrono::DateTime = - chrono::DateTime::parse_from_rfc3339("2021-03-18T04:22:42.501Z").unwrap(); - db::messages::add_message( - db::types::Message { - user_id: 9349, - id: 4, - date: time.naive_utc(), - message: "HI?".to_string(), - }, - &conn, - ) - .unwrap(); - db::messages::add_message( - db::types::Message { - user_id: 9349, - id: 5, - date: time.naive_utc(), - message: "I AM NICK!".to_string(), - }, - &conn, - ) - .unwrap(); - db::messages::add_message( - db::types::Message { - user_id: 9349, - id: 6, - date: time.naive_utc(), - message: "I'LL FIND that".to_string(), - }, - &conn, - ) - .unwrap(); +//let conn = db::start_db().unwrap(); +/* + users::add_user(db::types::User{ + id: 9349, + name: "Nick".to_string(), + sign_key: "string".to_string(), + insert_key: fcpv2::types::SSK::parse("SSK@Rgt0qM8D24DltliV2-JE9tYLcrgGAKeDwkz41I3JBPs,p~c8c7FXcJjhcf2vA-Xm0Mjyw1o~xn7L2-T8zlBA1IU").unwrap(), + messages_count: 1, + }, &conn); + let time: chrono::DateTime = + chrono::DateTime::parse_from_rfc3339("2021-03-18T04:22:42.501Z").unwrap(); + db::messages::add_message( + db::types::Message { + user_id: 9349, + id: 4, + date: time.naive_utc(), + message: "HI?".to_string(), + }, + &conn, + ) + .unwrap(); + db::messages::add_message( + db::types::Message { + user_id: 9349, + id: 5, + date: time.naive_utc(), + message: "I AM NICK!".to_string(), + }, + &conn, + ) + .unwrap(); + db::messages::add_message( + db::types::Message { + user_id: 9349, + id: 6, + date: time.naive_utc(), + message: "I'LL FIND that".to_string(), + }, + &conn, + ) + .unwrap(); - let messages = db::messages::select_message_by_id(9349, 3, &conn).unwrap(); - */ + let messages = db::messages::select_message_by_id(9349, 3, &conn).unwrap(); +*/ diff --git a/src/main.rs b/src/main.rs index 80b82ab..b9a5e5f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,8 +46,9 @@ use std::{ fn main() -> io::Result<()> { - SimpleLogger::new().init().unwrap(); + SimpleLogger::new().with_level(log::LevelFilter::Debug).init().unwrap(); + //TODO Add connection pool for ruqlite let db = db::start_db().unwrap(); //let manager = r2d2_foodb::FooConnectionManager::new("localhost:1234");