package services import ( "context" "encoding/json" "encoding/xml" "fmt" "io" "net/http" "strings" "sync" "time" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog/log" ) // OfficialAccountConfig represents a row in official_account_configs type OfficialAccountConfig struct { ID string `json:"id"` ProfileID string `json:"profile_id"` AccountType string `json:"account_type"` Enabled bool `json:"enabled"` ModelID string `json:"model_id"` SystemPrompt string `json:"system_prompt"` Temperature float64 `json:"temperature"` MaxTokens int `json:"max_tokens"` PostIntervalMinutes int `json:"post_interval_minutes"` MaxPostsPerDay int `json:"max_posts_per_day"` PostsToday int `json:"posts_today"` PostsTodayResetAt time.Time `json:"posts_today_reset_at"` LastPostedAt *time.Time `json:"last_posted_at"` NewsSources json.RawMessage `json:"news_sources"` LastFetchedAt *time.Time `json:"last_fetched_at"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` // Joined fields Handle string `json:"handle,omitempty"` DisplayName string `json:"display_name,omitempty"` AvatarURL string `json:"avatar_url,omitempty"` } // NewsSource represents a single RSS feed configuration type NewsSource struct { Name string `json:"name"` RSSURL string `json:"rss_url"` Enabled bool `json:"enabled"` } // RSSFeed represents a parsed RSS feed type RSSFeed struct { Channel struct { Title string `xml:"title"` Items []RSSItem `xml:"item"` } `xml:"channel"` } // RSSItem represents a single RSS item type RSSItem struct { Title string `xml:"title"` Link string `xml:"link"` Description string `xml:"description"` PubDate string `xml:"pubDate"` GUID string `xml:"guid"` } // PostedArticle represents a previously posted article type PostedArticle struct { ID string `json:"id"` ConfigID string `json:"config_id"` ArticleURL string `json:"article_url"` ArticleTitle string `json:"article_title"` SourceName string `json:"source_name"` PostedAt time.Time `json:"posted_at"` PostID *string `json:"post_id"` } // OfficialAccountsService manages official account automation type OfficialAccountsService struct { pool *pgxpool.Pool openRouterService *OpenRouterService httpClient *http.Client stopCh chan struct{} wg sync.WaitGroup } func NewOfficialAccountsService(pool *pgxpool.Pool, openRouterService *OpenRouterService) *OfficialAccountsService { return &OfficialAccountsService{ pool: pool, openRouterService: openRouterService, httpClient: &http.Client{ Timeout: 30 * time.Second, }, stopCh: make(chan struct{}), } } // ── CRUD ───────────────────────────────────────────── func (s *OfficialAccountsService) ListConfigs(ctx context.Context) ([]OfficialAccountConfig, error) { rows, err := s.pool.Query(ctx, ` SELECT c.id, c.profile_id, c.account_type, c.enabled, c.model_id, c.system_prompt, c.temperature, c.max_tokens, c.post_interval_minutes, c.max_posts_per_day, c.posts_today, c.posts_today_reset_at, c.last_posted_at, c.news_sources, c.last_fetched_at, c.created_at, c.updated_at, p.handle, p.display_name, COALESCE(p.avatar_url, '') FROM official_account_configs c JOIN public.profiles p ON p.id = c.profile_id ORDER BY c.created_at DESC `) if err != nil { return nil, err } defer rows.Close() var configs []OfficialAccountConfig for rows.Next() { var c OfficialAccountConfig if err := rows.Scan( &c.ID, &c.ProfileID, &c.AccountType, &c.Enabled, &c.ModelID, &c.SystemPrompt, &c.Temperature, &c.MaxTokens, &c.PostIntervalMinutes, &c.MaxPostsPerDay, &c.PostsToday, &c.PostsTodayResetAt, &c.LastPostedAt, &c.NewsSources, &c.LastFetchedAt, &c.CreatedAt, &c.UpdatedAt, &c.Handle, &c.DisplayName, &c.AvatarURL, ); err != nil { return nil, err } configs = append(configs, c) } return configs, nil } func (s *OfficialAccountsService) GetConfig(ctx context.Context, id string) (*OfficialAccountConfig, error) { var c OfficialAccountConfig err := s.pool.QueryRow(ctx, ` SELECT c.id, c.profile_id, c.account_type, c.enabled, c.model_id, c.system_prompt, c.temperature, c.max_tokens, c.post_interval_minutes, c.max_posts_per_day, c.posts_today, c.posts_today_reset_at, c.last_posted_at, c.news_sources, c.last_fetched_at, c.created_at, c.updated_at, p.handle, p.display_name, COALESCE(p.avatar_url, '') FROM official_account_configs c JOIN public.profiles p ON p.id = c.profile_id WHERE c.id = $1 `, id).Scan( &c.ID, &c.ProfileID, &c.AccountType, &c.Enabled, &c.ModelID, &c.SystemPrompt, &c.Temperature, &c.MaxTokens, &c.PostIntervalMinutes, &c.MaxPostsPerDay, &c.PostsToday, &c.PostsTodayResetAt, &c.LastPostedAt, &c.NewsSources, &c.LastFetchedAt, &c.CreatedAt, &c.UpdatedAt, &c.Handle, &c.DisplayName, &c.AvatarURL, ) if err != nil { return nil, err } return &c, nil } func (s *OfficialAccountsService) UpsertConfig(ctx context.Context, cfg OfficialAccountConfig) (*OfficialAccountConfig, error) { newsJSON, err := json.Marshal(cfg.NewsSources) if err != nil { newsJSON = []byte("[]") } var id string err = s.pool.QueryRow(ctx, ` INSERT INTO official_account_configs (profile_id, account_type, enabled, model_id, system_prompt, temperature, max_tokens, post_interval_minutes, max_posts_per_day, news_sources, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW()) ON CONFLICT (profile_id) DO UPDATE SET account_type = EXCLUDED.account_type, enabled = EXCLUDED.enabled, model_id = EXCLUDED.model_id, system_prompt = EXCLUDED.system_prompt, temperature = EXCLUDED.temperature, max_tokens = EXCLUDED.max_tokens, post_interval_minutes = EXCLUDED.post_interval_minutes, max_posts_per_day = EXCLUDED.max_posts_per_day, news_sources = EXCLUDED.news_sources, updated_at = NOW() RETURNING id `, cfg.ProfileID, cfg.AccountType, cfg.Enabled, cfg.ModelID, cfg.SystemPrompt, cfg.Temperature, cfg.MaxTokens, cfg.PostIntervalMinutes, cfg.MaxPostsPerDay, newsJSON, ).Scan(&id) if err != nil { return nil, err } return s.GetConfig(ctx, id) } func (s *OfficialAccountsService) DeleteConfig(ctx context.Context, id string) error { _, err := s.pool.Exec(ctx, `DELETE FROM official_account_configs WHERE id = $1`, id) return err } func (s *OfficialAccountsService) ToggleEnabled(ctx context.Context, id string, enabled bool) error { _, err := s.pool.Exec(ctx, `UPDATE official_account_configs SET enabled = $2, updated_at = NOW() WHERE id = $1`, id, enabled) return err } // ── RSS News Fetching ──────────────────────────────── func (s *OfficialAccountsService) FetchRSS(ctx context.Context, rssURL string) ([]RSSItem, error) { req, err := http.NewRequestWithContext(ctx, "GET", rssURL, nil) if err != nil { return nil, err } req.Header.Set("User-Agent", "Sojorn/1.0 (News Aggregator)") resp, err := s.httpClient.Do(req) if err != nil { return nil, fmt.Errorf("failed to fetch RSS %s: %w", rssURL, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("RSS feed %s returned status %d", rssURL, resp.StatusCode) } body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } var feed RSSFeed if err := xml.Unmarshal(body, &feed); err != nil { return nil, fmt.Errorf("failed to parse RSS from %s: %w", rssURL, err) } return feed.Channel.Items, nil } // FetchNewArticles fetches new articles from all enabled news sources for a config, // filtering out already-posted articles. func (s *OfficialAccountsService) FetchNewArticles(ctx context.Context, configID string) ([]RSSItem, []string, error) { cfg, err := s.GetConfig(ctx, configID) if err != nil { return nil, nil, err } var sources []NewsSource if err := json.Unmarshal(cfg.NewsSources, &sources); err != nil { return nil, nil, fmt.Errorf("failed to parse news sources: %w", err) } var allItems []RSSItem var sourceNames []string for _, src := range sources { if !src.Enabled || src.RSSURL == "" { continue } items, err := s.FetchRSS(ctx, src.RSSURL) if err != nil { log.Warn().Err(err).Str("source", src.Name).Msg("Failed to fetch RSS feed") continue } for _, item := range items { allItems = append(allItems, item) sourceNames = append(sourceNames, src.Name) } } // Filter out already-posted articles var newItems []RSSItem var newSourceNames []string for i, item := range allItems { link := item.Link if link == "" { link = item.GUID } if link == "" { continue } var exists bool _ = s.pool.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM official_account_posted_articles WHERE config_id = $1 AND article_url = $2)`, configID, link, ).Scan(&exists) if !exists { newItems = append(newItems, item) newSourceNames = append(newSourceNames, sourceNames[i]) } } // Update last_fetched_at _, _ = s.pool.Exec(ctx, `UPDATE official_account_configs SET last_fetched_at = NOW() WHERE id = $1`, configID) return newItems, newSourceNames, nil } // ── AI Post Generation ─────────────────────────────── // GeneratePost creates a post using AI for a given official account config. // For news accounts, it takes an article and generates a commentary/summary. // For general accounts, it generates an original post. func (s *OfficialAccountsService) GeneratePost(ctx context.Context, configID string, article *RSSItem, sourceName string) (string, error) { cfg, err := s.GetConfig(ctx, configID) if err != nil { return "", err } var userPrompt string if article != nil { // News mode: generate a post about this article desc := article.Description // Strip HTML tags from description desc = stripHTMLTags(desc) if len(desc) > 500 { desc = desc[:500] + "..." } userPrompt = fmt.Sprintf( "Write a social media post about this news article. Include the link.\n\nSource: %s\nTitle: %s\nDescription: %s\nLink: %s", sourceName, article.Title, desc, article.Link, ) } else { // General mode: generate an original post userPrompt = "Generate a new social media post. Be creative and engaging." } generated, err := s.openRouterService.GenerateText( ctx, cfg.ModelID, cfg.SystemPrompt, userPrompt, cfg.Temperature, cfg.MaxTokens, ) if err != nil { return "", fmt.Errorf("AI generation failed: %w", err) } return generated, nil } // CreatePostForAccount creates a post in the database for the official account func (s *OfficialAccountsService) CreatePostForAccount(ctx context.Context, configID string, body string, article *RSSItem, sourceName string) (string, error) { cfg, err := s.GetConfig(ctx, configID) if err != nil { return "", err } // Check daily limit if cfg.PostsToday >= cfg.MaxPostsPerDay { // Reset if it's a new day if time.Since(cfg.PostsTodayResetAt) > 24*time.Hour { _, _ = s.pool.Exec(ctx, `UPDATE official_account_configs SET posts_today = 0, posts_today_reset_at = NOW() WHERE id = $1`, configID) } else { return "", fmt.Errorf("daily post limit reached (%d/%d)", cfg.PostsToday, cfg.MaxPostsPerDay) } } // Get user_id from profile_id var authorID string err = s.pool.QueryRow(ctx, `SELECT user_id FROM public.profiles WHERE id = $1`, cfg.ProfileID).Scan(&authorID) if err != nil { return "", fmt.Errorf("failed to get user_id for profile: %w", err) } authorUUID, _ := uuid.Parse(authorID) postID := uuid.New() tx, err := s.pool.Begin(ctx) if err != nil { return "", err } defer tx.Rollback(ctx) _, err = tx.Exec(ctx, ` INSERT INTO public.posts (id, author_id, body, status, body_format, is_beacon, allow_chain, visibility, is_nsfw, confidence_score, created_at) VALUES ($1, $2, $3, 'active', 'plain', false, true, 'public', false, 1.0, $4) `, postID, authorUUID, body, time.Now()) if err != nil { return "", fmt.Errorf("failed to insert post: %w", err) } _, err = tx.Exec(ctx, ` INSERT INTO public.post_metrics (post_id, like_count, save_count, view_count, comment_count, updated_at) VALUES ($1, 0, 0, 0, 0, $2) ON CONFLICT DO NOTHING `, postID, time.Now()) if err != nil { return "", fmt.Errorf("failed to insert post_metrics: %w", err) } // Track article if this was a news post if article != nil { link := article.Link if link == "" { link = article.GUID } postIDStr := postID.String() _, _ = tx.Exec(ctx, ` INSERT INTO official_account_posted_articles (config_id, article_url, article_title, source_name, post_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (config_id, article_url) DO NOTHING `, configID, link, article.Title, sourceName, postIDStr) } // Update counters _, _ = tx.Exec(ctx, ` UPDATE official_account_configs SET posts_today = posts_today + 1, last_posted_at = NOW(), updated_at = NOW() WHERE id = $1 `, configID) if err := tx.Commit(ctx); err != nil { return "", err } return postID.String(), nil } // GenerateAndPost generates an AI post and creates it in the database func (s *OfficialAccountsService) GenerateAndPost(ctx context.Context, configID string, article *RSSItem, sourceName string) (string, string, error) { body, err := s.GeneratePost(ctx, configID, article, sourceName) if err != nil { return "", "", err } postID, err := s.CreatePostForAccount(ctx, configID, body, article, sourceName) if err != nil { return "", "", err } return postID, body, nil } // ── Scheduled Auto-Posting ─────────────────────────── func (s *OfficialAccountsService) StartScheduler() { s.wg.Add(1) go func() { defer s.wg.Done() ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() log.Info().Msg("[OfficialAccounts] Scheduler started (5-min tick)") for { select { case <-s.stopCh: log.Info().Msg("[OfficialAccounts] Scheduler stopped") return case <-ticker.C: s.runScheduledPosts() } } }() } func (s *OfficialAccountsService) StopScheduler() { close(s.stopCh) s.wg.Wait() } func (s *OfficialAccountsService) runScheduledPosts() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) defer cancel() // Find enabled configs that are due for a post rows, err := s.pool.Query(ctx, ` SELECT id, account_type, post_interval_minutes, last_posted_at, posts_today, max_posts_per_day, posts_today_reset_at FROM official_account_configs WHERE enabled = true `) if err != nil { log.Error().Err(err).Msg("[OfficialAccounts] Failed to query configs") return } defer rows.Close() type candidate struct { ID string AccountType string PostIntervalMinutes int LastPostedAt *time.Time PostsToday int MaxPostsPerDay int PostsTodayResetAt time.Time } var candidates []candidate for rows.Next() { var c candidate if err := rows.Scan(&c.ID, &c.AccountType, &c.PostIntervalMinutes, &c.LastPostedAt, &c.PostsToday, &c.MaxPostsPerDay, &c.PostsTodayResetAt); err != nil { continue } candidates = append(candidates, c) } for _, c := range candidates { // Reset daily counter if needed if time.Since(c.PostsTodayResetAt) > 24*time.Hour { _, _ = s.pool.Exec(ctx, `UPDATE official_account_configs SET posts_today = 0, posts_today_reset_at = NOW() WHERE id = $1`, c.ID) c.PostsToday = 0 } // Check daily limit if c.PostsToday >= c.MaxPostsPerDay { continue } // Check interval if c.LastPostedAt != nil && time.Since(*c.LastPostedAt) < time.Duration(c.PostIntervalMinutes)*time.Minute { continue } // Time to post! if c.AccountType == "news" { s.scheduleNewsPost(ctx, c.ID) } else { s.scheduleGeneralPost(ctx, c.ID) } } } func (s *OfficialAccountsService) scheduleNewsPost(ctx context.Context, configID string) { items, sourceNames, err := s.FetchNewArticles(ctx, configID) if err != nil { log.Error().Err(err).Str("config", configID).Msg("[OfficialAccounts] Failed to fetch news") return } if len(items) == 0 { log.Debug().Str("config", configID).Msg("[OfficialAccounts] No new articles to post") return } // Post the first new article article := items[0] sourceName := sourceNames[0] postID, body, err := s.GenerateAndPost(ctx, configID, &article, sourceName) if err != nil { log.Error().Err(err).Str("config", configID).Msg("[OfficialAccounts] Failed to generate news post") return } log.Info().Str("config", configID).Str("post_id", postID).Str("source", sourceName).Str("title", article.Title).Msg("[OfficialAccounts] News post created") _ = body // logged implicitly via post } func (s *OfficialAccountsService) scheduleGeneralPost(ctx context.Context, configID string) { postID, body, err := s.GenerateAndPost(ctx, configID, nil, "") if err != nil { log.Error().Err(err).Str("config", configID).Msg("[OfficialAccounts] Failed to generate post") return } log.Info().Str("config", configID).Str("post_id", postID).Msg("[OfficialAccounts] General post created") _ = body } // ── Recent Articles ────────────────────────────────── func (s *OfficialAccountsService) GetRecentArticles(ctx context.Context, configID string, limit int) ([]PostedArticle, error) { if limit <= 0 { limit = 20 } rows, err := s.pool.Query(ctx, ` SELECT id, config_id, article_url, article_title, source_name, posted_at, post_id FROM official_account_posted_articles WHERE config_id = $1 ORDER BY posted_at DESC LIMIT $2 `, configID, limit) if err != nil { return nil, err } defer rows.Close() var articles []PostedArticle for rows.Next() { var a PostedArticle if err := rows.Scan(&a.ID, &a.ConfigID, &a.ArticleURL, &a.ArticleTitle, &a.SourceName, &a.PostedAt, &a.PostID); err != nil { continue } articles = append(articles, a) } return articles, nil } // ── Helpers ────────────────────────────────────────── // StripHTMLTagsPublic is the exported version for use by handlers func StripHTMLTagsPublic(s string) string { return stripHTMLTags(s) } func stripHTMLTags(s string) string { var result strings.Builder inTag := false for _, r := range s { if r == '<' { inTag = true continue } if r == '>' { inTag = false continue } if !inTag { result.WriteRune(r) } } return strings.TrimSpace(result.String()) } // LookupProfileID finds a profile ID by handle func (s *OfficialAccountsService) LookupProfileID(ctx context.Context, handle string) (string, error) { var id string err := s.pool.QueryRow(ctx, `SELECT id FROM public.profiles WHERE handle = $1`, strings.ToLower(handle)).Scan(&id) if err != nil { if err == pgx.ErrNoRows { return "", fmt.Errorf("profile not found: @%s", handle) } return "", err } return id, nil }