From ebbe8d92d172a786ccd0ebfc27de3178ff4a75ba Mon Sep 17 00:00:00 2001 From: Patrick Britton Date: Sun, 8 Feb 2026 20:01:35 -0600 Subject: [PATCH] feat: article pipeline - two-phase discover/post flow with DB-backed article cache and status tracking --- .../20260209000001_article_pipeline.up.sql | 29 ++ go-backend/internal/handlers/admin_handler.go | 114 ++--- .../services/official_accounts_service.go | 471 +++++++++++++----- 3 files changed, 426 insertions(+), 188 deletions(-) create mode 100644 go-backend/internal/database/migrations/20260209000001_article_pipeline.up.sql diff --git a/go-backend/internal/database/migrations/20260209000001_article_pipeline.up.sql b/go-backend/internal/database/migrations/20260209000001_article_pipeline.up.sql new file mode 100644 index 0000000..bffdec6 --- /dev/null +++ b/go-backend/internal/database/migrations/20260209000001_article_pipeline.up.sql @@ -0,0 +1,29 @@ +-- Article pipeline: track articles from discovery through posting +CREATE TABLE IF NOT EXISTS official_account_articles ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + config_id UUID NOT NULL REFERENCES official_account_configs(id) ON DELETE CASCADE, + guid TEXT NOT NULL, -- unique article ID (Google News URL or RSS GUID) + title TEXT NOT NULL DEFAULT '', + link TEXT NOT NULL, -- resolved URL (what gets posted) + source_name TEXT NOT NULL DEFAULT '', + source_url TEXT NOT NULL DEFAULT '', + description TEXT NOT NULL DEFAULT '', + pub_date TIMESTAMPTZ, + status TEXT NOT NULL DEFAULT 'discovered', -- discovered | posted | failed | skipped + post_id UUID REFERENCES public.posts(id) ON DELETE SET NULL, + error_message TEXT, + discovered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + posted_at TIMESTAMPTZ, + + CONSTRAINT unique_article_guid_per_config UNIQUE (config_id, guid) +); + +CREATE INDEX IF NOT EXISTS idx_oaa_config_status ON official_account_articles(config_id, status); +CREATE INDEX IF NOT EXISTS idx_oaa_discovered ON official_account_articles(discovered_at DESC); +CREATE INDEX IF NOT EXISTS idx_oaa_guid ON official_account_articles(guid); + +-- Migrate existing posted articles into the new table +INSERT INTO official_account_articles (config_id, guid, title, link, source_name, status, post_id, discovered_at, posted_at) +SELECT config_id, article_url, article_title, article_url, source_name, 'posted', post_id, posted_at, posted_at +FROM official_account_posted_articles +ON CONFLICT (config_id, guid) DO NOTHING; diff --git a/go-backend/internal/handlers/admin_handler.go b/go-backend/internal/handlers/admin_handler.go index 847e448..e97fa01 100644 --- a/go-backend/internal/handlers/admin_handler.go +++ b/go-backend/internal/handlers/admin_handler.go @@ -2982,57 +2982,36 @@ func (h *AdminHandler) TriggerOfficialPost(c *gin.Context) { } switch cfg.AccountType { - case "news": - // Fetch new articles and post the first one with AI commentary - items, sourceNames, err := h.officialAccountsService.FetchNewArticles(ctx, id) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to fetch news: " + err.Error()}) - return - } - if len(items) == 0 { - c.JSON(http.StatusOK, gin.H{"message": "No new articles found", "post_id": nil}) - return + case "news", "rss": + // Phase 1: Discover new articles + _, discErr := h.officialAccountsService.DiscoverArticles(ctx, id) + if discErr != nil { + // Log but continue — there may be previously discovered articles } - postID, body, err := h.officialAccountsService.GenerateAndPost(ctx, id, &items[0], sourceNames[0]) + // Phase 2: Post next article from the queue + article, postID, err := h.officialAccountsService.PostNextArticle(ctx, id) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } - c.JSON(http.StatusOK, gin.H{ - "message": "News post created", - "post_id": postID, - "body": body, - "source": sourceNames[0], - "title": items[0].Title, - "remaining": len(items) - 1, - }) - - case "rss": - // Post link directly — no AI - items, sourceNames, err := h.officialAccountsService.FetchNewArticles(ctx, id) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to fetch RSS: " + err.Error()}) - return - } - if len(items) == 0 { - c.JSON(http.StatusOK, gin.H{"message": "No new articles found", "post_id": nil}) + if article == nil { + msg := "No new articles found" + if discErr != nil { + msg += " (discover error: " + discErr.Error() + ")" + } + c.JSON(http.StatusOK, gin.H{"message": msg, "post_id": nil}) return } - body := items[0].Link - postID, err := h.officialAccountsService.CreatePostForAccount(ctx, id, body, &items[0], sourceNames[0]) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } + stats, _ := h.officialAccountsService.GetArticleStats(ctx, id) c.JSON(http.StatusOK, gin.H{ - "message": "RSS post created", - "post_id": postID, - "body": body, - "source": sourceNames[0], - "title": items[0].Title, - "remaining": len(items) - 1, + "message": "Article posted", + "post_id": postID, + "body": article.Link, + "source": article.SourceName, + "title": article.Title, + "stats": stats, }) default: @@ -3061,47 +3040,29 @@ func (h *AdminHandler) PreviewOfficialPost(c *gin.Context) { } switch cfg.AccountType { - case "news": - items, sourceNames, err := h.officialAccountsService.FetchNewArticles(ctx, id) + case "news", "rss": + // Discover then show the next article that would be posted + _, _ = h.officialAccountsService.DiscoverArticles(ctx, id) + + pending, err := h.officialAccountsService.GetArticleQueue(ctx, id, "discovered", 10) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } - if len(items) == 0 { - c.JSON(http.StatusOK, gin.H{"message": "No new articles", "preview": nil}) + if len(pending) == 0 { + c.JSON(http.StatusOK, gin.H{"message": "No pending articles", "preview": nil}) return } - body, err := h.officialAccountsService.GeneratePost(ctx, id, &items[0], sourceNames[0]) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } + next := pending[0] + stats, _ := h.officialAccountsService.GetArticleStats(ctx, id) c.JSON(http.StatusOK, gin.H{ - "preview": body, - "source": sourceNames[0], - "article_title": items[0].Title, - "article_link": items[0].Link, - "pending_count": len(items), - }) - - case "rss": - // No AI — preview shows the link that would be posted - items, sourceNames, err := h.officialAccountsService.FetchNewArticles(ctx, id) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - if len(items) == 0 { - c.JSON(http.StatusOK, gin.H{"message": "No new articles", "preview": nil}) - return - } - c.JSON(http.StatusOK, gin.H{ - "preview": items[0].Link, - "source": sourceNames[0], - "article_title": items[0].Title, - "article_link": items[0].Link, - "pending_count": len(items), + "preview": next.Link, + "source": next.SourceName, + "article_title": next.Title, + "article_link": next.Link, + "pending_count": len(pending), + "stats": stats, }) default: @@ -3148,7 +3109,8 @@ func (h *AdminHandler) FetchNewsArticles(c *gin.Context) { if previews == nil { previews = []articlePreview{} } - c.JSON(http.StatusOK, gin.H{"articles": previews, "count": len(previews)}) + stats, _ := h.officialAccountsService.GetArticleStats(c.Request.Context(), id) + c.JSON(http.StatusOK, gin.H{"articles": previews, "count": len(previews), "stats": stats}) } // Get posted articles history for an account @@ -3167,7 +3129,7 @@ func (h *AdminHandler) GetPostedArticles(c *gin.Context) { return } if articles == nil { - articles = []services.PostedArticle{} + articles = []services.CachedArticle{} } c.JSON(http.StatusOK, gin.H{"articles": articles}) } diff --git a/go-backend/internal/services/official_accounts_service.go b/go-backend/internal/services/official_accounts_service.go index dcbaf8c..76a038e 100644 --- a/go-backend/internal/services/official_accounts_service.go +++ b/go-backend/internal/services/official_accounts_service.go @@ -91,15 +91,23 @@ type RSSSource struct { Name string `xml:",chardata" json:"name"` } -// PostedArticle represents a previously posted article -type PostedArticle struct { - ID string `json:"id"` - ConfigID string `json:"config_id"` - ArticleURL string `json:"article_url"` - ArticleTitle string `json:"article_title"` - SourceName string `json:"source_name"` - PostedAt time.Time `json:"posted_at"` - PostID *string `json:"post_id"` +// CachedArticle represents a row in official_account_articles (the article pipeline). +// Status flow: discovered → posted | failed | skipped +type CachedArticle struct { + ID string `json:"id"` + ConfigID string `json:"config_id"` + GUID string `json:"guid"` + Title string `json:"title"` + Link string `json:"link"` + SourceName string `json:"source_name"` + SourceURL string `json:"source_url"` + Description string `json:"description"` + PubDate *time.Time `json:"pub_date,omitempty"` + Status string `json:"status"` + PostID *string `json:"post_id,omitempty"` + ErrorMessage *string `json:"error_message,omitempty"` + DiscoveredAt time.Time `json:"discovered_at"` + PostedAt *time.Time `json:"posted_at,omitempty"` } // OfficialAccountsService manages official account automation @@ -383,22 +391,22 @@ func extractURLFromBytes(data []byte) string { return "" } -// FetchNewArticles fetches new articles from all enabled news sources for a config, -// filtering out already-posted articles. -func (s *OfficialAccountsService) FetchNewArticles(ctx context.Context, configID string) ([]RSSItem, []string, error) { +// ── Article Pipeline ───────────────────────────────── + +// DiscoverArticles fetches RSS feeds and caches all new articles in the DB as 'discovered'. +// Returns the number of newly discovered articles. +func (s *OfficialAccountsService) DiscoverArticles(ctx context.Context, configID string) (int, error) { cfg, err := s.GetConfig(ctx, configID) if err != nil { - return nil, nil, err + return 0, err } var sources []NewsSource if err := json.Unmarshal(cfg.NewsSources, &sources); err != nil { - return nil, nil, fmt.Errorf("failed to parse news sources: %w", err) + return 0, fmt.Errorf("failed to parse news sources: %w", err) } - var allItems []RSSItem - var sourceNames []string - + newCount := 0 for _, src := range sources { rssURL := src.EffectiveRSSURL() if !src.Enabled || rssURL == "" { @@ -409,39 +417,246 @@ func (s *OfficialAccountsService) FetchNewArticles(ctx context.Context, configID log.Warn().Err(err).Str("source", src.Name).Msg("Failed to fetch RSS feed") continue } - for _, item := range items { - allItems = append(allItems, item) - sourceNames = append(sourceNames, src.Name) - } - } - // Filter out already-posted articles - var newItems []RSSItem - var newSourceNames []string - for i, item := range allItems { - // Use GUID (original Google News URL) for dedup — Link may be a source homepage - link := item.GUID - if link == "" { - link = item.Link - } - if link == "" { - continue - } - var exists bool - _ = s.pool.QueryRow(ctx, - `SELECT EXISTS(SELECT 1 FROM official_account_posted_articles WHERE config_id = $1 AND article_url = $2)`, - configID, link, - ).Scan(&exists) - if !exists { - newItems = append(newItems, item) - newSourceNames = append(newSourceNames, sourceNames[i]) + for _, item := range items { + guid := item.GUID + if guid == "" { + guid = item.Link + } + if guid == "" { + continue + } + + // Parse pub date + var pubDate *time.Time + if item.PubDate != "" { + for _, layout := range []string{ + time.RFC1123Z, time.RFC1123, time.RFC822Z, time.RFC822, + "Mon, 2 Jan 2006 15:04:05 -0700", + "2006-01-02T15:04:05Z", + } { + if t, err := time.Parse(layout, item.PubDate); err == nil { + pubDate = &t + break + } + } + } + + // Strip HTML from description + desc := stripHTMLTags(item.Description) + if len(desc) > 1000 { + desc = desc[:1000] + } + + // Insert into pipeline — ON CONFLICT means we already know about this article + tag, err := s.pool.Exec(ctx, ` + INSERT INTO official_account_articles + (config_id, guid, title, link, source_name, source_url, description, pub_date, status) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'discovered') + ON CONFLICT (config_id, guid) DO NOTHING + `, configID, guid, item.Title, item.Link, src.Name, item.Source.URL, desc, pubDate) + if err != nil { + log.Warn().Err(err).Str("guid", guid).Msg("Failed to cache article") + continue + } + if tag.RowsAffected() > 0 { + newCount++ + } } } // Update last_fetched_at _, _ = s.pool.Exec(ctx, `UPDATE official_account_configs SET last_fetched_at = NOW() WHERE id = $1`, configID) - return newItems, newSourceNames, nil + if newCount > 0 { + log.Info().Int("new", newCount).Str("config", configID).Msg("[OfficialAccounts] Discovered new articles") + } + return newCount, nil +} + +// PostNextArticle picks the oldest 'discovered' article and posts it. +// For RSS accounts: posts the link directly. +// For news accounts: generates AI commentary then posts. +// Returns the CachedArticle and post ID, or nil if nothing to post. +func (s *OfficialAccountsService) PostNextArticle(ctx context.Context, configID string) (*CachedArticle, string, error) { + cfg, err := s.GetConfig(ctx, configID) + if err != nil { + return nil, "", err + } + + // Pick the oldest discovered article + var art CachedArticle + err = s.pool.QueryRow(ctx, ` + SELECT id, config_id, guid, title, link, source_name, source_url, description, pub_date, + status, post_id, error_message, discovered_at, posted_at + FROM official_account_articles + WHERE config_id = $1 AND status = 'discovered' + ORDER BY discovered_at ASC + LIMIT 1 + `, configID).Scan( + &art.ID, &art.ConfigID, &art.GUID, &art.Title, &art.Link, &art.SourceName, &art.SourceURL, + &art.Description, &art.PubDate, &art.Status, &art.PostID, &art.ErrorMessage, + &art.DiscoveredAt, &art.PostedAt, + ) + if err == pgx.ErrNoRows { + return nil, "", nil // nothing to post + } + if err != nil { + return nil, "", fmt.Errorf("failed to query next article: %w", err) + } + + // Build the post body + var body string + switch cfg.AccountType { + case "rss": + // Post the link directly + body = art.Link + case "news": + // Generate AI commentary + rssItem := &RSSItem{ + Title: art.Title, + Link: art.Link, + Description: art.Description, + } + generated, err := s.GeneratePost(ctx, configID, rssItem, art.SourceName) + if err != nil { + // Mark as failed + _, _ = s.pool.Exec(ctx, + `UPDATE official_account_articles SET status = 'failed', error_message = $2 WHERE id = $1`, + art.ID, err.Error()) + return &art, "", fmt.Errorf("AI generation failed: %w", err) + } + body = generated + default: + body = art.Link + } + + // Create the post + postID, err := s.CreatePostForArticle(ctx, configID, body, &art) + if err != nil { + // Mark as failed + _, _ = s.pool.Exec(ctx, + `UPDATE official_account_articles SET status = 'failed', error_message = $2 WHERE id = $1`, + art.ID, err.Error()) + return &art, "", err + } + + // Mark as posted + _, _ = s.pool.Exec(ctx, + `UPDATE official_account_articles SET status = 'posted', post_id = $2, posted_at = NOW() WHERE id = $1`, + art.ID, postID) + + return &art, postID, nil +} + +// GetArticleQueue returns articles for a config filtered by status. +func (s *OfficialAccountsService) GetArticleQueue(ctx context.Context, configID string, status string, limit int) ([]CachedArticle, error) { + if limit <= 0 { + limit = 50 + } + orderDir := "DESC" + if status == "discovered" { + orderDir = "ASC" // oldest first (next to be posted) + } + + query := fmt.Sprintf(` + SELECT id, config_id, guid, title, link, source_name, source_url, description, pub_date, + status, post_id, error_message, discovered_at, posted_at + FROM official_account_articles + WHERE config_id = $1 AND status = $2 + ORDER BY discovered_at %s + LIMIT $3 + `, orderDir) + + rows, err := s.pool.Query(ctx, query, configID, status, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var articles []CachedArticle + for rows.Next() { + var a CachedArticle + if err := rows.Scan( + &a.ID, &a.ConfigID, &a.GUID, &a.Title, &a.Link, &a.SourceName, &a.SourceURL, + &a.Description, &a.PubDate, &a.Status, &a.PostID, &a.ErrorMessage, + &a.DiscoveredAt, &a.PostedAt, + ); err != nil { + continue + } + articles = append(articles, a) + } + return articles, nil +} + +// ArticleStats holds counts by status for the admin UI. +type ArticleStats struct { + Discovered int `json:"discovered"` + Posted int `json:"posted"` + Failed int `json:"failed"` + Skipped int `json:"skipped"` + Total int `json:"total"` +} + +// GetArticleStats returns article counts by status for a config. +func (s *OfficialAccountsService) GetArticleStats(ctx context.Context, configID string) (*ArticleStats, error) { + rows, err := s.pool.Query(ctx, ` + SELECT status, COUNT(*) FROM official_account_articles + WHERE config_id = $1 + GROUP BY status + `, configID) + if err != nil { + return nil, err + } + defer rows.Close() + + stats := &ArticleStats{} + for rows.Next() { + var status string + var count int + if err := rows.Scan(&status, &count); err != nil { + continue + } + switch status { + case "discovered": + stats.Discovered = count + case "posted": + stats.Posted = count + case "failed": + stats.Failed = count + case "skipped": + stats.Skipped = count + } + stats.Total += count + } + return stats, nil +} + +// FetchNewArticles is a backward-compatible wrapper that discovers articles +// and returns the pending ones. Used by admin handlers. +func (s *OfficialAccountsService) FetchNewArticles(ctx context.Context, configID string) ([]RSSItem, []string, error) { + // Discover first + _, _ = s.DiscoverArticles(ctx, configID) + + // Return pending articles as RSSItems + articles, err := s.GetArticleQueue(ctx, configID, "discovered", 50) + if err != nil { + return nil, nil, err + } + + var items []RSSItem + var sourceNames []string + for _, a := range articles { + items = append(items, RSSItem{ + Title: a.Title, + Link: a.Link, + Description: a.Description, + GUID: a.GUID, + Source: RSSSource{URL: a.SourceURL, Name: a.SourceName}, + }) + sourceNames = append(sourceNames, a.SourceName) + } + return items, sourceNames, nil } // ── AI Post Generation ─────────────────────────────── @@ -572,6 +787,79 @@ func (s *OfficialAccountsService) CreatePostForAccount(ctx context.Context, conf return postID.String(), nil } +// CreatePostForArticle creates a post in the database from a pipeline CachedArticle. +// This is the new pipeline version — article status is updated by the caller (PostNextArticle). +func (s *OfficialAccountsService) CreatePostForArticle(ctx context.Context, configID string, body string, article *CachedArticle) (string, error) { + cfg, err := s.GetConfig(ctx, configID) + if err != nil { + return "", err + } + + // Check daily limit + if cfg.PostsToday >= cfg.MaxPostsPerDay { + if time.Since(cfg.PostsTodayResetAt) > 24*time.Hour { + _, _ = s.pool.Exec(ctx, `UPDATE official_account_configs SET posts_today = 0, posts_today_reset_at = NOW() WHERE id = $1`, configID) + } else { + return "", fmt.Errorf("daily post limit reached (%d/%d)", cfg.PostsToday, cfg.MaxPostsPerDay) + } + } + + authorUUID, _ := uuid.Parse(cfg.ProfileID) + postID := uuid.New() + + tx, err := s.pool.Begin(ctx) + if err != nil { + return "", err + } + defer tx.Rollback(ctx) + + _, err = tx.Exec(ctx, ` + INSERT INTO public.posts (id, author_id, body, status, body_format, is_beacon, allow_chain, visibility, is_nsfw, confidence_score, created_at) + VALUES ($1, $2, $3, 'active', 'plain', false, true, 'public', false, 1.0, $4) + `, postID, authorUUID, body, time.Now()) + if err != nil { + return "", fmt.Errorf("failed to insert post: %w", err) + } + + _, err = tx.Exec(ctx, ` + INSERT INTO public.post_metrics (post_id, like_count, save_count, view_count, comment_count, updated_at) + VALUES ($1, 0, 0, 0, 0, $2) ON CONFLICT DO NOTHING + `, postID, time.Now()) + if err != nil { + return "", fmt.Errorf("failed to insert post_metrics: %w", err) + } + + // Update counters + _, _ = tx.Exec(ctx, ` + UPDATE official_account_configs + SET posts_today = posts_today + 1, last_posted_at = NOW(), updated_at = NOW() + WHERE id = $1 + `, configID) + + if err := tx.Commit(ctx); err != nil { + return "", err + } + + // Fetch and store link preview in background + go func() { + bgCtx := context.Background() + linkURL := ExtractFirstURL(body) + if linkURL == "" && article != nil { + linkURL = article.Link + } + if linkURL != "" { + lps := NewLinkPreviewService(s.pool) + lp, lpErr := lps.FetchPreview(bgCtx, linkURL, true) + if lpErr == nil && lp != nil { + _ = lps.SaveLinkPreview(bgCtx, postID.String(), lp) + log.Debug().Str("post_id", postID.String()).Str("url", linkURL).Msg("Saved link preview for official account post") + } + } + }() + + return postID.String(), nil +} + // GenerateAndPost generates an AI post and creates it in the database func (s *OfficialAccountsService) GenerateAndPost(ctx context.Context, configID string, article *RSSItem, sourceName string) (string, string, error) { body, err := s.GeneratePost(ctx, configID, article, sourceName) @@ -669,66 +957,45 @@ func (s *OfficialAccountsService) runScheduledPosts() { // Time to post! switch c.AccountType { - case "news": - s.scheduleNewsPost(ctx, c.ID) - case "rss": - s.scheduleRSSPost(ctx, c.ID) + case "news", "rss": + s.scheduleArticlePost(ctx, c.ID) default: s.scheduleGeneralPost(ctx, c.ID) } } } -func (s *OfficialAccountsService) scheduleNewsPost(ctx context.Context, configID string) { - items, sourceNames, err := s.FetchNewArticles(ctx, configID) +// scheduleArticlePost handles the two-phase pipeline for news/rss accounts: +// Phase 1: Discover new articles from RSS feeds → cache in DB +// Phase 2: Post the next pending article from the queue +func (s *OfficialAccountsService) scheduleArticlePost(ctx context.Context, configID string) { + // Phase 1: Discover + newCount, err := s.DiscoverArticles(ctx, configID) if err != nil { - log.Error().Err(err).Str("config", configID).Msg("[OfficialAccounts] Failed to fetch news") - return + log.Error().Err(err).Str("config", configID).Msg("[OfficialAccounts] Failed to discover articles") + // Continue to Phase 2 — there may be previously discovered articles to post } - if len(items) == 0 { - log.Debug().Str("config", configID).Msg("[OfficialAccounts] No new articles to post") - return - } - - // Post the first new article - article := items[0] - sourceName := sourceNames[0] - - postID, body, err := s.GenerateAndPost(ctx, configID, &article, sourceName) + // Phase 2: Post next pending article + article, postID, err := s.PostNextArticle(ctx, configID) if err != nil { - log.Error().Err(err).Str("config", configID).Msg("[OfficialAccounts] Failed to generate news post") + log.Error().Err(err).Str("config", configID).Msg("[OfficialAccounts] Failed to post article") + return + } + if article == nil { + if newCount == 0 { + log.Debug().Str("config", configID).Msg("[OfficialAccounts] No pending articles to post") + } return } - log.Info().Str("config", configID).Str("post_id", postID).Str("source", sourceName).Str("title", article.Title).Msg("[OfficialAccounts] News post created") - _ = body // logged implicitly via post -} - -func (s *OfficialAccountsService) scheduleRSSPost(ctx context.Context, configID string) { - items, sourceNames, err := s.FetchNewArticles(ctx, configID) - if err != nil { - log.Error().Err(err).Str("config", configID).Msg("[OfficialAccounts] Failed to fetch RSS articles") - return - } - - if len(items) == 0 { - log.Debug().Str("config", configID).Msg("[OfficialAccounts] No new RSS articles to post") - return - } - - // Post the first new article — body is just the link - article := items[0] - sourceName := sourceNames[0] - body := article.Link - - postID, err := s.CreatePostForAccount(ctx, configID, body, &article, sourceName) - if err != nil { - log.Error().Err(err).Str("config", configID).Msg("[OfficialAccounts] Failed to create RSS post") - return - } - - log.Info().Str("config", configID).Str("post_id", postID).Str("source", sourceName).Str("title", article.Title).Str("link", body).Msg("[OfficialAccounts] RSS post created") + log.Info(). + Str("config", configID). + Str("post_id", postID). + Str("source", article.SourceName). + Str("title", article.Title). + Str("link", article.Link). + Msg("[OfficialAccounts] Article posted") } func (s *OfficialAccountsService) scheduleGeneralPost(ctx context.Context, configID string) { @@ -744,31 +1011,11 @@ func (s *OfficialAccountsService) scheduleGeneralPost(ctx context.Context, confi // ── Recent Articles ────────────────────────────────── -func (s *OfficialAccountsService) GetRecentArticles(ctx context.Context, configID string, limit int) ([]PostedArticle, error) { +func (s *OfficialAccountsService) GetRecentArticles(ctx context.Context, configID string, limit int) ([]CachedArticle, error) { if limit <= 0 { limit = 20 } - rows, err := s.pool.Query(ctx, ` - SELECT id, config_id, article_url, article_title, source_name, posted_at, post_id - FROM official_account_posted_articles - WHERE config_id = $1 - ORDER BY posted_at DESC - LIMIT $2 - `, configID, limit) - if err != nil { - return nil, err - } - defer rows.Close() - - var articles []PostedArticle - for rows.Next() { - var a PostedArticle - if err := rows.Scan(&a.ID, &a.ConfigID, &a.ArticleURL, &a.ArticleTitle, &a.SourceName, &a.PostedAt, &a.PostID); err != nil { - continue - } - articles = append(articles, a) - } - return articles, nil + return s.GetArticleQueue(ctx, configID, "posted", limit) } // ── Helpers ──────────────────────────────────────────