diff --git a/.cursor/rules/rust-rule.mdc b/.cursor/rules/rust-rule.mdc new file mode 100644 index 0000000..992595c --- /dev/null +++ b/.cursor/rules/rust-rule.mdc @@ -0,0 +1,64 @@ +--- +alwaysApply: true +--- + +# Rust Expert Developer Guidelines + +You are an expert Rust developer, highly skilled in async programming, concurrent systems, and modern Rust patterns. Your goal is to produce code that is professional, maintainable, readable, idiomatic, and performant. + +## 1. Core Principles + +- **Idiomatic Rust**: Adhere strictly to Rust idioms. Use `snake_case` for variables/functions and `PascalCase` for types. +- **Safety First**: Leverage the type system to enforce correctness. Avoid `unsafe` unless absolutely necessary. +- **Explicit Safety**: Every `unsafe` block _must_ be accompanied by a `// SAFETY:` comment explaining why the operation is sound. +- **Expressive & Clear**: Use descriptive variable names (`is_ready`, `has_data`) and avoid obscure abbreviations. +- **Feature-Driven Modularity**: Organize code by **feature**, not by file type. Keep related structs, enums, and `impl` blocks together in the same module. + - _Bad_: `types.rs`, `impls.rs` + - _Good_: `user.rs` containing `struct User` and `impl User`. + +## 2. Code Structure & Patterns + +### 2.1 Structs & Types + +- **Small & Cohesive**: Break complex data into smaller, composable structs. +- **Newtype Pattern**: Use tuple structs (e.g., `pub struct UserId(u64);`) to enforce type safety and prevent argument swapping. +- **Builder Pattern**: Use for complex initialization logic or structs with many optional fields. Avoid for simple constructors. +- **Generic Bounds**: Place generic bounds on `impl` blocks or functions, not on the struct definition, unless intrinsic to the type. + +### 2.2 Async Programming (Tokio) + +- **Runtime**: Use `tokio` as the default async runtime. +- **Structured Concurrency**: Use `tokio::spawn` for tasks and `tokio::select!` for managing multiple futures. +- **Synchronization**: + - Use `tokio::sync::mpsc` for asynchronous message passing. + - Use `tokio::sync::broadcast` for one-to-many communication. + - Use `tokio::sync::oneshot` for single responses. + - Prefer **bounded channels** to manage backpressure. + - Use `tokio::sync::Mutex` / `RwLock` for shared state in async contexts. +- **Performance**: Avoid blocking operations in async contexts. Offload CPU-bound work to `tokio::task::spawn_blocking`. + +### 2.3 Error Handling + +- **Result & Option**: Use `Result` for recoverable errors. Use `?` for propagation. +- **Crates**: Use `thiserror` for library errors and `anyhow` for application-level error handling. +- **Panic**: Reserve `panic!` for unrecoverable bugs (logic errors). + +## 3. Performance & Optimization + +- **Collections**: Default to `Vec` and `HashMap`. +- **Pre-allocation**: Always use `Vec::with_capacity` or `HashMap::with_capacity` when the approximate size is known. +- **Cloning**: Be mindful of `.clone()`. Use references `&T` where ownership transfer isn't needed. + +## 4. Testing + +- **Unit Tests**: Place isolated tests in a `tests` module within the same file using `#[cfg(test)]`. +- **Async Tests**: Use `#[tokio::test]`. +- **Documentation Tests**: Use `rustdoc` examples in comments (`///`) to ensure documentation matches code behavior. +- **Mocking**: Use traits to define interfaces, allowing for easier mocking in tests. + +## 5. Ecosystem & Config + +- **Configuration**: Use environment variables (e.g., `dotenv`) for config management. +- **Crates**: Prefer standard ecosystem crates: `serde` (serialization), `reqwest` (HTTP), `sqlx` (DB), `tracing` (logging). + +When asked to write code, always apply these rules to ensure the highest quality output. diff --git a/config/default.toml b/config/default.toml index 6e33593..674981e 100644 --- a/config/default.toml +++ b/config/default.toml @@ -58,6 +58,9 @@ client_secret = "lfXc45dZLqYTzP62Ms32EhXinGQzxcIP9TvjJml2B-h0T1nIJK" api_key = "some-key" interval_in_hours = 24 keywords = "quantum" +monthly_limit = 15000 +alert_threshold = 13500 +reset_day = 22 [tg_bot] base_url = "https://api.telegram.org" @@ -69,3 +72,6 @@ token = "token" [raid_leaderboard] sync_interval_in_hours = 24 tweets_req_interval_in_secs = 60 + +[alert] +webhook_url = "https://www.webhook_url.com" \ No newline at end of file diff --git a/config/example.toml b/config/example.toml index 916a4c8..9aa437c 100644 --- a/config/example.toml +++ b/config/example.toml @@ -68,6 +68,9 @@ client_secret = "example-secret" api_key = "some-key" interval_in_hours = 24 keywords = "example" +monthly_limit = 15000 +alert_threshold = 13500 +reset_day = 22 [tg_bot] base_url = "https://api.telegram.org" @@ -80,6 +83,9 @@ token = "token" sync_interval_in_hours = 24 tweets_req_interval_in_secs = 60 +[alert] +webhook_url = "https://www.webhook_url.com" + # Example environment variable overrides: # TASKMASTER_BLOCKCHAIN__NODE_URL="ws://remote-node:9944" # TASKMASTER_BLOCKCHAIN__WALLET_PASSWORD="super_secure_password" diff --git a/config/test.toml b/config/test.toml index 3a040cd..f190ac8 100644 --- a/config/test.toml +++ b/config/test.toml @@ -58,6 +58,9 @@ client_secret = "test-secret" api_key = "some-key" interval_in_hours = 24 keywords = "test" +monthly_limit = 15000 +alert_threshold = 13500 +reset_day = 22 [tg_bot] base_url = "https://api.telegram.org" @@ -69,3 +72,6 @@ token = "token" [raid_leaderboard] sync_interval_in_hours = 24 tweets_req_interval_in_secs = 1 + +[alert] +webhook_url = "https://www.webhook_url.com" \ No newline at end of file diff --git a/migrations/009_tweet_pull_usage_table.sql b/migrations/009_tweet_pull_usage_table.sql new file mode 100644 index 0000000..6f17e79 --- /dev/null +++ b/migrations/009_tweet_pull_usage_table.sql @@ -0,0 +1,14 @@ +-- Table to track Twitter API usage for the monthly cap +CREATE TABLE IF NOT EXISTS tweet_pull_usage ( + period VARCHAR(7) PRIMARY KEY, -- Format: YYYY-MM + tweet_count INTEGER DEFAULT 0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Trigger for updated_at +DROP TRIGGER IF EXISTS set_timestamp ON tweet_pull_usage; +CREATE TRIGGER set_timestamp +BEFORE UPDATE ON tweet_pull_usage +FOR EACH ROW +EXECUTE PROCEDURE trigger_set_timestamp(); + diff --git a/scripts/seed_test_tweet_authors.sh b/scripts/seed_test_tweet_authors.sh new file mode 100755 index 0000000..df7eaef --- /dev/null +++ b/scripts/seed_test_tweet_authors.sh @@ -0,0 +1,43 @@ +#!/bin/bash +set -e + +# CONFIG ------------------------------------------ +CONTAINER_NAME="task_master_test_db" # Change if your container is named differently +DB_USER="postgres" +DB_NAME="task_master" +SQL_FILE="seed_authors.sql" +# -------------------------------------------------- + +echo "🔧 Generating seed SQL..." + +cat << 'EOF' > $SQL_FILE +INSERT INTO tweet_authors ( + id, name, username, followers_count, following_count, + tweet_count, listed_count, like_count, media_count, fetched_at +) +VALUES +('1862779229277954048', 'Yuvi Lightman', 'YuviLightman', 0, 0, 0, 0, 0, 0, NOW()) +ON CONFLICT (id) DO UPDATE SET + name = EXCLUDED.name, + username = EXCLUDED.username, + followers_count = EXCLUDED.followers_count, + following_count = EXCLUDED.following_count, + tweet_count = EXCLUDED.tweet_count, + listed_count = EXCLUDED.listed_count, + like_count = EXCLUDED.like_count, + media_count = EXCLUDED.media_count, + fetched_at = NOW(); +EOF + +echo "📦 Copying SQL file into container ($CONTAINER_NAME)..." +podman cp "$SQL_FILE" "$CONTAINER_NAME":/"$SQL_FILE" + +echo "🚀 Running seed script inside Postgres..." +podman exec -it "$CONTAINER_NAME" psql -U "$DB_USER" -d "$DB_NAME" -f "/$SQL_FILE" + +echo "🔍 Verifying result..." +podman exec -it "$CONTAINER_NAME" psql -U "$DB_USER" -d "$DB_NAME" -c "SELECT * FROM tweet_authors WHERE id = '1862779229277954048';" + +rm -rf "$SQL_FILE" + +echo "✅ Seeding complete!" diff --git a/src/config.rs b/src/config.rs index 4539884..5bbad1a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -16,6 +16,7 @@ pub struct Config { pub tweet_sync: TweetSyncConfig, pub tg_bot: TelegramBotConfig, pub raid_leaderboard: RaidLeaderboardConfig, + pub alert: AlertConfig, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -74,6 +75,9 @@ pub struct TweetSyncConfig { pub interval_in_hours: u64, pub keywords: String, pub api_key: String, + pub monthly_limit: u32, + pub alert_threshold: u32, + pub reset_day: u32, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -90,6 +94,11 @@ pub struct RaidLeaderboardConfig { pub tweets_req_interval_in_secs: u64, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AlertConfig { + pub webhook_url: String, +} + impl Config { pub fn load(config_path: &str) -> Result { let settings = config::Config::builder() @@ -209,6 +218,9 @@ impl Default for Config { interval_in_hours: 24, keywords: "hello".to_string(), api_key: "key".to_string(), + monthly_limit: 15000, + alert_threshold: 13000, + reset_day: 1, }, tg_bot: TelegramBotConfig { base_url: "https://api.telegram.org".to_string(), @@ -220,6 +232,9 @@ impl Default for Config { sync_interval_in_hours: 24, tweets_req_interval_in_secs: 60, }, + alert: AlertConfig { + webhook_url: "https://your-webhook-url.com".to_string(), + }, } } } diff --git a/src/db_persistence.rs b/src/db_persistence.rs index 252f411..77dc591 100644 --- a/src/db_persistence.rs +++ b/src/db_persistence.rs @@ -7,6 +7,7 @@ use crate::repositories::raid_quest::RaidQuestRepository; use crate::repositories::raid_submission::RaidSubmissionRepository; use crate::repositories::relevant_tweet::RelevantTweetRepository; use crate::repositories::tweet_author::TweetAuthorRepository; +use crate::repositories::tweet_pull_usage::TweetPullUsageRepository; use crate::repositories::x_association::XAssociationRepository; use crate::repositories::DbResult; use crate::repositories::{ @@ -45,6 +46,7 @@ pub struct DbPersistence { pub raid_quests: RaidQuestRepository, pub raid_submissions: RaidSubmissionRepository, pub raid_leaderboards: RaidLeaderboardRepository, + pub tweet_pull_usage: TweetPullUsageRepository, pub pool: PgPool, } @@ -67,6 +69,7 @@ impl DbPersistence { let raid_quests = RaidQuestRepository::new(&pool); let raid_submissions = RaidSubmissionRepository::new(&pool); let raid_leaderboards = RaidLeaderboardRepository::new(&pool); + let tweet_pull_usage = TweetPullUsageRepository::new(pool.clone()); Ok(Self { pool, @@ -82,6 +85,7 @@ impl DbPersistence { raid_quests, raid_submissions, raid_leaderboards, + tweet_pull_usage, }) } @@ -101,6 +105,7 @@ impl DbPersistence { let raid_quests = RaidQuestRepository::new(&pool); let raid_submissions = RaidSubmissionRepository::new(&pool); let raid_leaderboards = RaidLeaderboardRepository::new(&pool); + let tweet_pull_usage = TweetPullUsageRepository::new(pool.clone()); Ok(Self { pool, @@ -116,6 +121,7 @@ impl DbPersistence { raid_quests, raid_submissions, raid_leaderboards, + tweet_pull_usage, }) } } diff --git a/src/http_server.rs b/src/http_server.rs index 43a077e..af4bc6d 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -14,6 +14,7 @@ use crate::{ metrics::{metrics_handler, track_metrics, Metrics}, models::task::TaskStatus, routes::api_routes, + services::alert_service::AlertService, Config, GraphqlClient, }; use chrono::{DateTime, Utc}; @@ -29,6 +30,7 @@ pub struct AppState { pub oauth_sessions: Arc>>, pub twitter_oauth_tokens: Arc>>, pub twitter_gateway: Arc, + pub alert_client: Arc, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -115,6 +117,7 @@ pub async fn start_server( db: Arc, graphql_client: Arc, twitter_gateway: Arc, + alert_client: Arc, bind_address: &str, config: Arc, ) -> Result<(), Box> { @@ -122,6 +125,7 @@ pub async fn start_server( db, metrics: Arc::new(Metrics::new()), graphql_client, + alert_client: alert_client, config, twitter_gateway, challenges: Arc::new(RwLock::new(HashMap::new())), diff --git a/src/main.rs b/src/main.rs index 2a7266c..37a5fcb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use crate::{ errors::{AppError, AppResult}, models::task::{Task, TaskInput}, services::{ - graphql_client::GraphqlClient, raid_leaderboard_service::RaidLeaderboardService, + alert_service::AlertService, graphql_client::GraphqlClient, raid_leaderboard_service::RaidLeaderboardService, reverser::start_reverser_service, task_generator::TaskGenerator, telegram_service::TelegramService, transaction_manager::TransactionManager, tweet_synchronizer_service::TweetSynchronizerService, }, @@ -271,16 +271,19 @@ async fn main() -> AppResult<()> { Some(config.tweet_sync.api_key.clone()), )?); let telegram_service = Arc::new(TelegramService::new(config.tg_bot.clone())); + let alert_service = Arc::new(AlertService::new(config.clone(), db.tweet_pull_usage.clone())); let server_db = db.clone(); let graphql_client = Arc::new(graphql_client.clone()); let server_addr_clone = server_address.clone(); let server_config = Arc::new(config.clone()); let server_twitter_gateway = twitter_gateway.clone(); + let server_alert_service = alert_service.clone(); let server_task = tokio::spawn(async move { http_server::start_server( server_db, graphql_client, server_twitter_gateway, + server_alert_service, &server_addr_clone, server_config, ) @@ -305,11 +308,13 @@ async fn main() -> AppResult<()> { db.clone(), twitter_gateway.clone(), telegram_service, + alert_service.clone(), Arc::new(config.clone()), ); // Initialize raid leaderboard service - let raid_leaderboard_service = RaidLeaderboardService::new(db.clone(), twitter_gateway, Arc::new(config.clone())); + let raid_leaderboard_service = + RaidLeaderboardService::new(db.clone(), twitter_gateway, alert_service, Arc::new(config.clone())); // Wait for any task to complete (they should run forever unless there's an error) tokio::select! { diff --git a/src/models/mod.rs b/src/models/mod.rs index 215e868..a5b3e32 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -20,4 +20,5 @@ pub mod referrals; pub mod relevant_tweet; pub mod task; pub mod tweet_author; +pub mod tweet_pull_usage; pub mod x_association; diff --git a/src/models/tweet_pull_usage.rs b/src/models/tweet_pull_usage.rs new file mode 100644 index 0000000..c02dbeb --- /dev/null +++ b/src/models/tweet_pull_usage.rs @@ -0,0 +1,11 @@ +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; +use chrono::{DateTime, Utc}; + +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct TweetPullUsage { + pub period: String, + pub tweet_count: i32, + pub updated_at: DateTime, +} + diff --git a/src/repositories/mod.rs b/src/repositories/mod.rs index 2c73031..2d893df 100644 --- a/src/repositories/mod.rs +++ b/src/repositories/mod.rs @@ -15,6 +15,7 @@ pub mod referral; pub mod relevant_tweet; pub mod task; pub mod tweet_author; +pub mod tweet_pull_usage; pub mod x_association; pub trait QueryBuilderExt { diff --git a/src/repositories/tweet_pull_usage.rs b/src/repositories/tweet_pull_usage.rs new file mode 100644 index 0000000..881e523 --- /dev/null +++ b/src/repositories/tweet_pull_usage.rs @@ -0,0 +1,228 @@ +use crate::db_persistence::DbError; +use crate::models::tweet_pull_usage::TweetPullUsage; +use chrono::{Datelike, NaiveDate, Utc}; +use sqlx::PgPool; + +#[derive(Debug, Clone)] +pub struct TweetPullUsageRepository { + pool: PgPool, +} + +impl TweetPullUsageRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + fn get_current_period(reset_day: u32) -> String { + Self::calculate_period_for_date(Utc::now(), reset_day) + } + + /// Calculates the billing period string (YYYY-MM) based on the given date and a reset day. + /// + /// The logic handles edge cases where the reset day (e.g., 31st) doesn't exist in the current or previous month. + /// In such cases, the reset day effectively becomes the last day of that month. + fn calculate_period_for_date(date: chrono::DateTime, reset_day: u32) -> String { + let current_year = date.year(); + let current_month = date.month(); + let current_day = date.day(); + + // 1. Determine the effective reset day for the CURRENT month. + // If `reset_day` exceeds the days in current month, cap it at the last day of the month. + let days_in_current_month = get_days_in_month(current_year, current_month); + let effective_reset_day_current_month = std::cmp::min(reset_day, days_in_current_month); + + // 2. Compare current day with the effective reset day. + if current_day >= effective_reset_day_current_month { + // We are in the cycle starting this month. + format!("{}-{:02}", current_year, current_month) + } else { + // We are in the cycle that started last month. + let (prev_year, prev_month) = if current_month == 1 { + (current_year - 1, 12) + } else { + (current_year, current_month - 1) + }; + format!("{}-{:02}", prev_year, prev_month) + } + } + + pub async fn get_current_usage(&self, reset_day: u32) -> Result { + let period = Self::get_current_period(reset_day); + self.get_usage_for_period(&period).await + } + + pub async fn increment_usage(&self, amount: i32, reset_day: u32) -> Result { + let period = Self::get_current_period(reset_day); + self.increment_usage_for_period(amount, &period).await + } + + /// Internal helper to get usage for a specific period string. + async fn get_usage_for_period(&self, period: &str) -> Result { + let usage = sqlx::query_as::<_, TweetPullUsage>( + "INSERT INTO tweet_pull_usage (period, tweet_count) + VALUES ($1, 0) + ON CONFLICT (period) DO UPDATE SET period = EXCLUDED.period + RETURNING *", + ) + .bind(period) + .fetch_one(&self.pool) + .await + .map_err(DbError::Database)?; + + Ok(usage) + } + + /// Internal helper to increment usage for a specific period string. + async fn increment_usage_for_period(&self, amount: i32, period: &str) -> Result { + let usage = sqlx::query_as::<_, TweetPullUsage>( + "INSERT INTO tweet_pull_usage (period, tweet_count) + VALUES ($1, $2) + ON CONFLICT (period) DO UPDATE + SET tweet_count = tweet_pull_usage.tweet_count + EXCLUDED.tweet_count + RETURNING *", + ) + .bind(period) + .bind(amount) + .fetch_one(&self.pool) + .await + .map_err(DbError::Database)?; + + Ok(usage) + } +} + +/// Helper to get the number of days in a given month/year. +fn get_days_in_month(year: i32, month: u32) -> u32 { + // If month is December (12), next month is Jan (1) of next year. + // Otherwise, just next month of same year. + let (next_year, next_month) = if month == 12 { (year + 1, 1) } else { (year, month + 1) }; + + // The '0th' day of the next month is the last day of the current month. + // We get the date of the 1st of the next month, subtract 1 day. + NaiveDate::from_ymd_opt(next_year, next_month, 1) + .unwrap() + .pred_opt() + .unwrap() + .day() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::utils::test_app_state::create_test_app_state; + use crate::utils::test_db::reset_database; + use chrono::{TimeZone, Utc}; + + #[tokio::test] + async fn test_get_current_usage_integration() { + let state = create_test_app_state().await; + reset_database(&state.db.pool).await; + + let repo = &state.db.tweet_pull_usage; + let reset_day = 1; + + // 1. Initial call should create a record with 0 + let usage = repo.get_current_usage(reset_day).await.unwrap(); + assert_eq!(usage.tweet_count, 0); + + // 2. Subsequent call should return the same record + let usage2 = repo.get_current_usage(reset_day).await.unwrap(); + assert_eq!(usage2.tweet_count, 0); + assert_eq!(usage.period, usage2.period); + } + + #[tokio::test] + async fn test_increment_usage_integration() { + let state = create_test_app_state().await; + reset_database(&state.db.pool).await; + + let repo = &state.db.tweet_pull_usage; + let reset_day = 1; + + // 1. Increment from zero + let usage = repo.increment_usage(10, reset_day).await.unwrap(); + assert_eq!(usage.tweet_count, 10); + + // 2. Increment again + let usage2 = repo.increment_usage(5, reset_day).await.unwrap(); + assert_eq!(usage2.tweet_count, 15); + } + + #[tokio::test] + async fn test_transition_between_months_integration() { + let state = create_test_app_state().await; + reset_database(&state.db.pool).await; + + let repo = &state.db.tweet_pull_usage; + + // 1. Increment for Month A + let period_a = "2023-01"; + repo.increment_usage_for_period(100, period_a).await.unwrap(); + + // 2. Increment for Month B + let period_b = "2023-02"; + repo.increment_usage_for_period(50, period_b).await.unwrap(); + + // 3. Verify they are separate + let usage_a = repo.get_usage_for_period(period_a).await.unwrap(); + let usage_b = repo.get_usage_for_period(period_b).await.unwrap(); + + assert_eq!(usage_a.tweet_count, 100); + assert_eq!(usage_b.tweet_count, 50); + assert_ne!(usage_a.period, usage_b.period); + } + + #[test] + fn test_period_standard_reset() { + // Reset on 7th. Current is Jan 5th. Should be Dec cycle. + let date = Utc.with_ymd_and_hms(2023, 1, 5, 0, 0, 0).unwrap(); + assert_eq!(TweetPullUsageRepository::calculate_period_for_date(date, 7), "2022-12"); + + // Reset on 7th. Current is Jan 7th. Should be Jan cycle. + let date = Utc.with_ymd_and_hms(2023, 1, 7, 0, 0, 0).unwrap(); + assert_eq!(TweetPullUsageRepository::calculate_period_for_date(date, 7), "2023-01"); + } + + #[test] + fn test_period_end_of_month_reset_short_feb() { + // Reset on 30th. + // Feb 2023 has 28 days. + + // Date: Feb 27th. + // Effective reset for Feb is 28th (min(30, 28)). + // 27 < 28 -> Previous cycle (Jan). + let date = Utc.with_ymd_and_hms(2023, 2, 27, 0, 0, 0).unwrap(); + assert_eq!(TweetPullUsageRepository::calculate_period_for_date(date, 30), "2023-01"); + + // Date: Feb 28th. + // 28 >= 28 -> Current cycle (Feb). + let date = Utc.with_ymd_and_hms(2023, 2, 28, 0, 0, 0).unwrap(); + assert_eq!(TweetPullUsageRepository::calculate_period_for_date(date, 30), "2023-02"); + } + + #[test] + fn test_period_leap_year() { + // Reset on 30th. + // Feb 2024 has 29 days. + + // Date: Feb 28th. + // Effective reset for Feb is 29th (min(30, 29)). + // 28 < 29 -> Previous cycle (Jan). + let date = Utc.with_ymd_and_hms(2024, 2, 28, 0, 0, 0).unwrap(); + assert_eq!(TweetPullUsageRepository::calculate_period_for_date(date, 30), "2024-01"); + + // Date: Feb 29th. + // 29 >= 29 -> Current cycle (Feb). + let date = Utc.with_ymd_and_hms(2024, 2, 29, 0, 0, 0).unwrap(); + assert_eq!(TweetPullUsageRepository::calculate_period_for_date(date, 30), "2024-02"); + } + + #[test] + fn test_period_new_year() { + // Reset on 31st. Current is Jan 1st 2024. + // Effective reset for Jan is 31st. + // 1 < 31 -> Previous cycle (Dec 2023). + let date = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap(); + assert_eq!(TweetPullUsageRepository::calculate_period_for_date(date, 31), "2023-12"); + } +} diff --git a/src/services/alert_service.rs b/src/services/alert_service.rs new file mode 100644 index 0000000..7691027 --- /dev/null +++ b/src/services/alert_service.rs @@ -0,0 +1,91 @@ +use crate::repositories::tweet_pull_usage::TweetPullUsageRepository; +use crate::{AppResult, Config}; +use reqwest::Client; +use serde::Serialize; + +#[derive(Debug, Clone)] +pub struct AlertService { + client: Client, + webhook_url: String, + config: Config, + usage_repo: TweetPullUsageRepository, +} + +#[derive(Serialize)] +struct WebhookPayload<'a> { + text: &'a str, +} + +impl AlertService { + pub fn new(config: Config, usage_repo: TweetPullUsageRepository) -> Self { + Self { + client: Client::new(), + webhook_url: config.alert.webhook_url.clone(), + config, + usage_repo, + } + } + + /// Increments Twitter API usage and sends an alert if the threshold is reached. + pub async fn track_and_alert_usage(&self, tweets_pulled: i32) -> AppResult<()> { + if tweets_pulled <= 0 { + return Ok(()); + } + + match self + .usage_repo + .increment_usage(tweets_pulled, self.config.tweet_sync.reset_day) + .await + { + Ok(usage) => { + let current_total = usage.tweet_count as u32; + if current_total >= self.config.tweet_sync.alert_threshold { + if let Err(e) = self + .send_twitter_limit_alert(current_total, self.config.tweet_sync.monthly_limit) + .await + { + tracing::error!("Failed to send Twitter limit alert: {:?}", e); + } + } + } + Err(e) => { + tracing::error!("Failed to increment Twitter API usage: {:?}", e); + } + } + + Ok(()) + } + + /// Sends an alert when the Twitter API tweet pull limit is nearing its threshold. + pub async fn send_twitter_limit_alert(&self, current_count: u32, limit: u32) -> AppResult<()> { + let message = format!( + "Twitter API Limit Warning: Current usage {} tweets pulled, Plan limit {} tweets ({:.1}% used).", + current_count, + limit, + (current_count as f32 / limit as f32) * 100.0 + ); + + self.send_webhook_alert(&message).await + } + + pub async fn send_webhook_alert(&self, text: &str) -> AppResult<()> { + let payload = WebhookPayload { text }; + + let response = self.client.post(&self.webhook_url).json(&payload).send().await; + + match response { + Ok(res) if res.status().is_success() => Ok(()), + Ok(res) => { + let status = res.status().as_u16(); + let body = res.text().await.unwrap_or_default(); + // Reusing Telegram error variant for now or we could add a new one. + // Given the instructions, let's keep it simple or add a Generic error. + Err(crate::AppError::Server(format!( + "Webhook alert failed with status {}: {}", + status, body + ))) + } + Err(e) => Err(crate::AppError::Server(format!("Webhook alert request failed: {}", e))), + } + } +} diff --git a/src/services/mod.rs b/src/services/mod.rs index b02b73d..3c674ec 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,3 +1,4 @@ +pub mod alert_service; pub mod ethereum_service; pub mod graphql_client; pub mod raid_leaderboard_service; diff --git a/src/services/raid_leaderboard_service.rs b/src/services/raid_leaderboard_service.rs index 5311a63..6c19541 100644 --- a/src/services/raid_leaderboard_service.rs +++ b/src/services/raid_leaderboard_service.rs @@ -11,6 +11,7 @@ use rusx::{ use crate::{ db_persistence::DbPersistence, models::raid_submission::{RaidSubmission, UpdateRaidSubmissionStats}, + services::alert_service::AlertService, AppError, AppResult, Config, }; @@ -18,6 +19,7 @@ use crate::{ pub struct RaidLeaderboardService { db: Arc, twitter_gateway: Arc, + alert_service: Arc, config: Arc, } @@ -32,10 +34,16 @@ impl RaidLeaderboardService { .collect() } - pub fn new(db: Arc, twitter_gateway: Arc, config: Arc) -> Self { + pub fn new( + db: Arc, + twitter_gateway: Arc, + alert_service: Arc, + config: Arc, + ) -> Self { Self { db, twitter_gateway, + alert_service, config, } } @@ -102,11 +110,16 @@ impl RaidLeaderboardService { .tweets() .get_many(query, Some(params.clone())) .await?; + let Some(tweets) = &response.data else { tracing::info!("No tweets found!."); continue; }; + // Track Twitter API usage + let tweets_pulled = tweets.len() as i32; + self.alert_service.track_and_alert_usage(tweets_pulled).await?; + // EXTRACT: Collect all referenced IDs from the fetched tweets // We use a HashSet immediately to remove duplicates before sending to DB let referenced_ids: Vec = tweets @@ -184,12 +197,14 @@ mod tests { // Setup & Helpers // ------------------------------------------------------------------------- - async fn setup_deps() -> (Arc, Arc) { + async fn setup_deps() -> (Arc, Arc, Arc) { let config = Config::load_test_env().expect("Failed to load test config"); let pool = PgPool::connect(config.get_database_url()).await.unwrap(); reset_database(&pool).await; let db = Arc::new(DbPersistence::new(config.get_database_url()).await.unwrap()); - (db, Arc::new(config)) + let app_config = Arc::new(config.clone()); + let alert_service = Arc::new(AlertService::new(config, db.tweet_pull_usage.clone())); + (db, alert_service, app_config) } fn create_mock_tweet(id: &str, target_id: String, author_id: String, impressions: u32, likes: u32) -> Tweet { @@ -264,13 +279,13 @@ mod tests { #[tokio::test] async fn test_sync_no_active_raid_does_nothing() { - let (db, config) = setup_deps().await; + let (db, alert_service, config) = setup_deps().await; // Setup Gateway: Expect NO calls because there is no active raid let mut mock_gateway = MockTwitterGateway::new(); mock_gateway.expect_tweets().times(0); - let service = RaidLeaderboardService::new(db, Arc::new(mock_gateway), config); + let service = RaidLeaderboardService::new(db, Arc::new(mock_gateway), alert_service, config); let result = service.sync_raid_leaderboard().await; assert!(result.is_ok()); @@ -278,7 +293,7 @@ mod tests { #[tokio::test] async fn test_sync_active_raid_but_no_submissions_does_nothing() { - let (db, config) = setup_deps().await; + let (db, alert_service, config) = setup_deps().await; // 1. Create Active Raid db.raid_quests @@ -292,7 +307,7 @@ mod tests { let mut mock_gateway = MockTwitterGateway::new(); mock_gateway.expect_tweets().times(0); - let service = RaidLeaderboardService::new(db, Arc::new(mock_gateway), config); + let service = RaidLeaderboardService::new(db, Arc::new(mock_gateway), alert_service, config); let result = service.sync_raid_leaderboard().await; assert!(result.is_ok()); @@ -300,7 +315,7 @@ mod tests { #[tokio::test] async fn test_sync_updates_stats_successfully() { - let (db, config) = setup_deps().await; + let (db, alert_service, config) = setup_deps().await; // 1. Create Active Raid let raid_id = db @@ -345,7 +360,7 @@ mod tests { .expect_tweets() .return_const(Arc::new(mock_tweet_api) as Arc); - let service = RaidLeaderboardService::new(db.clone(), Arc::new(mock_gateway), config); + let service = RaidLeaderboardService::new(db.clone(), Arc::new(mock_gateway), alert_service, config); // 4. Run Sync service.sync_raid_leaderboard().await.unwrap(); @@ -361,7 +376,7 @@ mod tests { #[tokio::test] async fn test_sync_flag_invalid() { - let (db, config) = setup_deps().await; + let (db, alert_service, config) = setup_deps().await; // 1. Create Active Raid let raid_id = db @@ -406,7 +421,7 @@ mod tests { .expect_tweets() .return_const(Arc::new(mock_tweet_api) as Arc); - let service = RaidLeaderboardService::new(db.clone(), Arc::new(mock_gateway), config); + let service = RaidLeaderboardService::new(db.clone(), Arc::new(mock_gateway), alert_service, config); // 4. Run Sync service.sync_raid_leaderboard().await.unwrap(); @@ -424,7 +439,7 @@ mod tests { async fn test_sync_batching_logic() { // This test verifies that if we have > 100 submissions, // the service makes multiple calls to Twitter. - let (db, config) = setup_deps().await; + let (db, alert_service, config) = setup_deps().await; let raid_id = db .raid_quests @@ -469,7 +484,7 @@ mod tests { .times(2) .return_const(Arc::new(mock_tweet_api) as Arc); - let service = RaidLeaderboardService::new(db, Arc::new(mock_gateway), config); + let service = RaidLeaderboardService::new(db, Arc::new(mock_gateway), alert_service, config); // 3. Run Sync diff --git a/src/services/tweet_synchronizer_service.rs b/src/services/tweet_synchronizer_service.rs index 3f4007a..cacc30a 100644 --- a/src/services/tweet_synchronizer_service.rs +++ b/src/services/tweet_synchronizer_service.rs @@ -12,7 +12,7 @@ use rusx::{ use crate::{ db_persistence::DbPersistence, models::{relevant_tweet::NewTweetPayload, tweet_author::NewAuthorPayload}, - services::telegram_service::TelegramService, + services::{alert_service::AlertService, telegram_service::TelegramService}, utils::x_url::build_x_status_url, AppError, AppResult, Config, }; @@ -22,6 +22,7 @@ pub struct TweetSynchronizerService { db: Arc, twitter_gateway: Arc, telegram_service: Arc, + alert_service: Arc, config: Arc, } @@ -140,12 +141,14 @@ impl TweetSynchronizerService { db: Arc, twitter_gateway: Arc, telegram_service: Arc, + alert_service: Arc, config: Arc, ) -> Self { Self { db, twitter_gateway, telegram_service, + alert_service, config, } } @@ -206,6 +209,10 @@ impl TweetSynchronizerService { let tweet_authors = self.process_tweet_authors(&response).await?; let relevant_tweets = self.process_relevant_tweets(&response).await?; + // Track Twitter API usage + let tweets_pulled = relevant_tweets.len() as i32; + self.alert_service.track_and_alert_usage(tweets_pulled).await?; + self.process_sending_raid_targets(&tweet_authors, &relevant_tweets) .await?; } @@ -239,13 +246,35 @@ mod tests { // Test Helpers // ------------------------------------------------------------------------- - async fn setup_deps() -> (Arc, MockServer, Arc, Arc) { + async fn setup_deps() -> ( + Arc, + MockServer, + Arc, + Arc, + Arc, + ) { // A. Setup DB let config = Config::load_test_env().expect("Failed to load test config"); let pool = PgPool::connect(config.get_database_url()).await.unwrap(); reset_database(&pool).await; let db = Arc::new(DbPersistence::new(config.get_database_url()).await.unwrap()); + // Seed an author for the whitelist so sync loop runs + db.tweet_authors + .upsert_many(&vec![crate::models::tweet_author::NewAuthorPayload { + id: "u1".to_string(), + name: "User One".to_string(), + username: "user_one".to_string(), + followers_count: 1000, + following_count: 100, + tweet_count: 50, + listed_count: 5, + like_count: 0, + media_count: 0, + }]) + .await + .unwrap(); + // B. Setup Telegram Mock Server let mock_server = MockServer::start().await; let telegram_config = TelegramBotConfig { @@ -257,9 +286,12 @@ mod tests { let telegram_service = Arc::new(TelegramService::new(telegram_config)); // C. Config - let app_config = Arc::new(config); + let app_config = Arc::new(config.clone()); + + // D. Alert Service + let alert_service = Arc::new(AlertService::new(config.clone(), db.tweet_pull_usage.clone())); - (db, mock_server, telegram_service, app_config) + (db, mock_server, telegram_service, alert_service, app_config) } fn create_mock_tweet(id: &str, author_id: &str) -> Tweet { @@ -302,7 +334,7 @@ mod tests { #[tokio::test] async fn test_sync_saves_data_no_raid_notification() { - let (db, _mock_tg, telegram_service, config) = setup_deps().await; + let (db, _mock_tg, telegram_service, alert_service, config) = setup_deps().await; // --- Setup Mocks --- let mut mock_gateway = MockTwitterGateway::new(); @@ -328,7 +360,13 @@ mod tests { mock_gateway.expect_search().times(1).return_const(search_api_arc); // --- Run Service --- - let service = TweetSynchronizerService::new(db.clone(), Arc::new(mock_gateway), telegram_service, config); + let service = TweetSynchronizerService::new( + db.clone(), + Arc::new(mock_gateway), + telegram_service, + alert_service, + config, + ); let result = service.sync_relevant_tweets().await; @@ -348,7 +386,7 @@ mod tests { #[tokio::test] async fn test_sync_sends_telegram_when_raid_active() { - let (db, mock_tg, telegram_service, config) = setup_deps().await; + let (db, mock_tg, telegram_service, alert_service, config) = setup_deps().await; // --- Setup Active Raid --- db.raid_quests @@ -387,7 +425,13 @@ mod tests { mock_gateway.expect_search().times(1).return_const(search_api_arc); // --- Run Service --- - let service = TweetSynchronizerService::new(db.clone(), Arc::new(mock_gateway), telegram_service, config); + let service = TweetSynchronizerService::new( + db.clone(), + Arc::new(mock_gateway), + telegram_service, + alert_service, + config, + ); // This will spawn the background task for telegram, so we need to wait slightly // or ensure the service function awaits the spawn (it doesn't in your code). @@ -401,7 +445,7 @@ mod tests { #[tokio::test] async fn test_pagination_logic_uses_last_id() { - let (db, _mock_tg, telegram_service, config) = setup_deps().await; + let (db, _mock_tg, telegram_service, alert_service, config) = setup_deps().await; // --- Setup DB with existing tweet --- // We insert a tweet directly to simulate "last state" @@ -458,7 +502,8 @@ mod tests { mock_gateway.expect_search().times(1).return_const(search_api_arc); - let service = TweetSynchronizerService::new(db, Arc::new(mock_gateway), telegram_service, config); + let service = + TweetSynchronizerService::new(db, Arc::new(mock_gateway), telegram_service, alert_service, config); service.sync_relevant_tweets().await.unwrap(); } diff --git a/src/utils/test_app_state.rs b/src/utils/test_app_state.rs index 61581c0..8b94b84 100644 --- a/src/utils/test_app_state.rs +++ b/src/utils/test_app_state.rs @@ -1,6 +1,6 @@ use crate::{ - db_persistence::DbPersistence, http_server::AppState, metrics::Metrics, models::auth::TokenClaims, Config, - GraphqlClient, + db_persistence::DbPersistence, http_server::AppState, metrics::Metrics, models::auth::TokenClaims, + services::alert_service::AlertService, Config, GraphqlClient, }; use jsonwebtoken::{encode, EncodingKey, Header}; use rusx::RusxGateway; @@ -12,10 +12,14 @@ pub async fn create_test_app_state() -> AppState { let twitter_gateway = RusxGateway::new(config.x_oauth.clone(), None).unwrap(); let graphql_client = GraphqlClient::new(db.clone(), config.candidates.graphql_url.clone()); + let db = Arc::new(db); + let alert_client = Arc::new(AlertService::new(config.clone(), db.tweet_pull_usage.clone())); + return AppState { - db: Arc::new(db), + db, metrics: Arc::new(Metrics::new()), graphql_client: Arc::new(graphql_client), + alert_client, config: Arc::new(config), twitter_gateway: Arc::new(twitter_gateway), oauth_sessions: Arc::new(Mutex::new(std::collections::HashMap::new())), diff --git a/src/utils/test_db.rs b/src/utils/test_db.rs index 20d2b84..8ea2673 100644 --- a/src/utils/test_db.rs +++ b/src/utils/test_db.rs @@ -15,7 +15,7 @@ use crate::{ }; pub async fn reset_database(pool: &PgPool) { - sqlx::query("TRUNCATE tasks, referrals, opt_ins, addresses, admins, eth_associations, x_associations, relevant_tweets, tweet_authors, raid_quests, raid_submissions RESTART IDENTITY CASCADE") + sqlx::query("TRUNCATE tasks, referrals, opt_ins, addresses, admins, eth_associations, x_associations, relevant_tweets, tweet_authors, raid_quests, raid_submissions, tweet_pull_usage RESTART IDENTITY CASCADE") .execute(pool) .await .expect("Failed to truncate tables for tests");