diff --git a/go-backend/cmd/api/main.go b/go-backend/cmd/api/main.go index d46f9ac..cf494bf 100644 --- a/go-backend/cmd/api/main.go +++ b/go-backend/cmd/api/main.go @@ -21,6 +21,7 @@ import ( "gitlab.com/patrickbritton3/sojorn/go-backend/internal/config" "gitlab.com/patrickbritton3/sojorn/go-backend/internal/handlers" "gitlab.com/patrickbritton3/sojorn/go-backend/internal/middleware" + "gitlab.com/patrickbritton3/sojorn/go-backend/internal/monitoring" "gitlab.com/patrickbritton3/sojorn/go-backend/internal/realtime" "gitlab.com/patrickbritton3/sojorn/go-backend/internal/repository" "gitlab.com/patrickbritton3/sojorn/go-backend/internal/services" @@ -169,7 +170,7 @@ func main() { linkPreviewService := services.NewLinkPreviewService(dbPool, s3Client, cfg.R2MediaBucket, cfg.R2ImgDomain) userHandler := handlers.NewUserHandler(userRepo, postRepo, notificationService, assetService) - postHandler := handlers.NewPostHandler(postRepo, userRepo, feedService, assetService, notificationService, moderationService, contentFilter, openRouterService, linkPreviewService, localAIService) + postHandler := handlers.NewPostHandler(postRepo, userRepo, feedService, assetService, notificationService, moderationService, contentFilter, openRouterService, linkPreviewService, localAIService, s3Client, cfg.R2VideoBucket, cfg.R2VidDomain) chatHandler := handlers.NewChatHandler(chatRepo, notificationService, hub) authHandler := handlers.NewAuthHandler(userRepo, cfg, emailService, sendPulseService) categoryHandler := handlers.NewCategoryHandler(categoryRepo) @@ -216,6 +217,9 @@ func main() { // Feed algorithm service (scores posts for ranked feed) feedAlgorithmService := services.NewFeedAlgorithmService(dbPool) + // Health check service + hcService := monitoring.NewHealthCheckService(dbPool) + // Repost & profile layout handlers repostHandler := handlers.NewRepostHandler(dbPool) profileLayoutHandler := handlers.NewProfileLayoutHandler(dbPool) @@ -228,6 +232,9 @@ func main() { r.HEAD("/health", func(c *gin.Context) { c.Status(200) }) + r.GET("/health/detailed", gin.WrapF(hcService.HealthCheckHandler)) + r.GET("/health/ready", gin.WrapF(hcService.ReadinessHandler)) + r.GET("/health/live", gin.WrapF(hcService.LivenessHandler)) // ALTCHA challenge endpoints (direct to main router for testing) r.GET("/api/v1/auth/altcha-challenge", authHandler.GetAltchaChallenge) @@ -308,6 +315,7 @@ func main() { users.GET("/blocked", userHandler.GetBlockedUsers) users.POST("/report", userHandler.ReportUser) users.POST("/block_by_handle", userHandler.BlockUserByHandle) + users.POST("/me/blocks/bulk", userHandler.BulkBlockUsers) // Social Graph: Followers & Following users.GET("/:id/followers", userHandler.GetFollowers) @@ -489,6 +497,13 @@ func main() { groups.GET("/:id/requests", groupsHandler.GetPendingRequests) // Get pending join requests (admin) groups.POST("/:id/requests/:requestId/approve", groupsHandler.ApproveJoinRequest) // Approve join request groups.POST("/:id/requests/:requestId/reject", groupsHandler.RejectJoinRequest) // Reject join request + groups.GET("/:id/feed", groupsHandler.GetGroupFeed) + groups.GET("/:id/key-status", groupsHandler.GetGroupKeyStatus) + groups.POST("/:id/keys", groupsHandler.DistributeGroupKeys) + groups.GET("/:id/members/public-keys", groupsHandler.GetGroupMemberPublicKeys) + groups.POST("/:id/invite-member", groupsHandler.InviteMember) + groups.DELETE("/:id/members/:userId", groupsHandler.RemoveMember) + groups.PATCH("/:id/settings", groupsHandler.UpdateGroupSettings) } // Capsule system (E2EE groups + clusters) @@ -699,6 +714,21 @@ func main() { admin.GET("/email-templates/:id", adminHandler.GetEmailTemplate) admin.PATCH("/email-templates/:id", adminHandler.UpdateEmailTemplate) admin.POST("/email-templates/test", adminHandler.SendTestEmail) + + // Groups admin + admin.GET("/groups", adminHandler.AdminListGroups) + admin.GET("/groups/:id", adminHandler.AdminGetGroup) + admin.DELETE("/groups/:id", adminHandler.AdminDeleteGroup) + admin.GET("/groups/:id/members", adminHandler.AdminListGroupMembers) + admin.DELETE("/groups/:id/members/:userId", adminHandler.AdminRemoveGroupMember) + + // Quip repair + admin.GET("/quips/broken", adminHandler.GetBrokenQuips) + admin.PATCH("/posts/:id/thumbnail", adminHandler.SetPostThumbnail) + admin.POST("/quips/:id/repair", adminHandler.RepairQuip) + + // Feed scores viewer + admin.GET("/feed-scores", adminHandler.AdminGetFeedScores) } // Public claim request endpoint (no auth) diff --git a/go-backend/go.mod b/go-backend/go.mod index b290e6e..8901fe4 100644 --- a/go-backend/go.mod +++ b/go-backend/go.mod @@ -41,6 +41,7 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.53.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 // indirect github.com/MicahParks/keyfunc v1.9.0 // indirect + github.com/altcha-org/altcha-lib-go v1.0.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 // indirect diff --git a/go-backend/go.sum b/go-backend/go.sum index 1c816f0..102bd0f 100644 --- a/go-backend/go.sum +++ b/go-backend/go.sum @@ -34,6 +34,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0/go.mod h1:cSgYe11MCNYunTnRXrKiR/tHc0eoKjICUuWpNZoVCOo= github.com/MicahParks/keyfunc v1.9.0 h1:lhKd5xrFHLNOWrDc4Tyb/Q1AJ4LCzQ48GVJyVIID3+o= github.com/MicahParks/keyfunc v1.9.0/go.mod h1:IdnCilugA0O/99dW+/MkvlyrsX8+L8+x95xuVNtM5jw= +github.com/altcha-org/altcha-lib-go v1.0.0 h1:7oPti0aUS+YCep8nwt5b9g4jYfCU55ZruWESL8G9K5M= +github.com/altcha-org/altcha-lib-go v1.0.0/go.mod h1:I8ESLVWR9C58uvGufB/AJDPhaSU4+4Oh3DLpVtgwDAk= github.com/aws/aws-sdk-go-v2 v1.41.1 h1:ABlyEARCDLN034NhxlRUSZr4l71mh+T5KAeGh6cerhU= github.com/aws/aws-sdk-go-v2 v1.41.1/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 h1:489krEF9xIGkOaaX3CE/Be2uWjiXrkCH6gUX+bZA/BU= diff --git a/go-backend/internal/handlers/admin_handler.go b/go-backend/internal/handlers/admin_handler.go index 0e08032..d6c19a7 100644 --- a/go-backend/internal/handlers/admin_handler.go +++ b/go-backend/internal/handlers/admin_handler.go @@ -4138,3 +4138,323 @@ func (h *AdminHandler) GetAltchaChallenge(c *gin.Context) { c.JSON(http.StatusOK, challenge) } + +// ────────────────────────────────────────────────────────────────────────────── +// Groups admin +// ────────────────────────────────────────────────────────────────────────────── + +// AdminListGroups GET /admin/groups?search=&limit=50&offset=0 +func (h *AdminHandler) AdminListGroups(c *gin.Context) { + search := c.Query("search") + limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50")) + offset, _ := strconv.Atoi(c.DefaultQuery("offset", "0")) + if limit <= 0 || limit > 200 { + limit = 50 + } + + query := ` + SELECT g.id, g.name, g.description, g.is_private, g.status, + g.created_at, g.key_version, g.key_rotation_needed, + COUNT(DISTINCT gm.user_id) AS member_count, + COUNT(DISTINCT gp.post_id) AS post_count + FROM groups g + LEFT JOIN group_members gm ON gm.group_id = g.id + LEFT JOIN group_posts gp ON gp.group_id = g.id + ` + args := []interface{}{} + if search != "" { + query += " WHERE g.name ILIKE $1 OR g.description ILIKE $1" + args = append(args, "%"+search+"%") + } + query += fmt.Sprintf(` + GROUP BY g.id + ORDER BY g.created_at DESC + LIMIT $%d OFFSET $%d`, len(args)+1, len(args)+2) + args = append(args, limit, offset) + + rows, err := h.pool.Query(c.Request.Context(), query, args...) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + defer rows.Close() + + type groupRow struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + IsPrivate bool `json:"is_private"` + Status string `json:"status"` + CreatedAt time.Time `json:"created_at"` + KeyVersion int `json:"key_version"` + KeyRotationNeeded bool `json:"key_rotation_needed"` + MemberCount int `json:"member_count"` + PostCount int `json:"post_count"` + } + + var groups []groupRow + for rows.Next() { + var g groupRow + if err := rows.Scan(&g.ID, &g.Name, &g.Description, &g.IsPrivate, &g.Status, + &g.CreatedAt, &g.KeyVersion, &g.KeyRotationNeeded, &g.MemberCount, &g.PostCount); err != nil { + continue + } + groups = append(groups, g) + } + c.JSON(http.StatusOK, gin.H{"groups": groups, "limit": limit, "offset": offset}) +} + +// AdminGetGroup GET /admin/groups/:id +func (h *AdminHandler) AdminGetGroup(c *gin.Context) { + groupID := c.Param("id") + row := h.pool.QueryRow(c.Request.Context(), ` + SELECT g.id, g.name, g.description, g.is_private, g.status, g.created_at, + g.key_version, g.key_rotation_needed, + COUNT(DISTINCT gm.user_id) AS member_count, + COUNT(DISTINCT gp.post_id) AS post_count + FROM groups g + LEFT JOIN group_members gm ON gm.group_id = g.id + LEFT JOIN group_posts gp ON gp.group_id = g.id + WHERE g.id = $1 + GROUP BY g.id + `, groupID) + + var g struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + IsPrivate bool `json:"is_private"` + Status string `json:"status"` + CreatedAt time.Time `json:"created_at"` + KeyVersion int `json:"key_version"` + KeyRotationNeeded bool `json:"key_rotation_needed"` + MemberCount int `json:"member_count"` + PostCount int `json:"post_count"` + } + if err := row.Scan(&g.ID, &g.Name, &g.Description, &g.IsPrivate, &g.Status, &g.CreatedAt, + &g.KeyVersion, &g.KeyRotationNeeded, &g.MemberCount, &g.PostCount); err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "group not found"}) + return + } + c.JSON(http.StatusOK, g) +} + +// AdminDeleteGroup DELETE /admin/groups/:id (soft delete) +func (h *AdminHandler) AdminDeleteGroup(c *gin.Context) { + groupID := c.Param("id") + _, err := h.pool.Exec(c.Request.Context(), + `UPDATE groups SET status = 'inactive' WHERE id = $1`, groupID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"message": "group deactivated"}) +} + +// AdminListGroupMembers GET /admin/groups/:id/members +func (h *AdminHandler) AdminListGroupMembers(c *gin.Context) { + groupID := c.Param("id") + rows, err := h.pool.Query(c.Request.Context(), ` + SELECT gm.user_id, u.username, u.display_name, gm.role, gm.joined_at + FROM group_members gm + JOIN users u ON u.id = gm.user_id + WHERE gm.group_id = $1 + ORDER BY gm.joined_at + `, groupID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + defer rows.Close() + + type member struct { + UserID string `json:"user_id"` + Username string `json:"username"` + DisplayName string `json:"display_name"` + Role string `json:"role"` + JoinedAt time.Time `json:"joined_at"` + } + var members []member + for rows.Next() { + var m member + if err := rows.Scan(&m.UserID, &m.Username, &m.DisplayName, &m.Role, &m.JoinedAt); err != nil { + continue + } + members = append(members, m) + } + c.JSON(http.StatusOK, gin.H{"members": members}) +} + +// AdminRemoveGroupMember DELETE /admin/groups/:id/members/:userId +func (h *AdminHandler) AdminRemoveGroupMember(c *gin.Context) { + groupID := c.Param("id") + userID := c.Param("userId") + _, err := h.pool.Exec(c.Request.Context(), + `DELETE FROM group_members WHERE group_id = $1 AND user_id = $2`, groupID, userID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + // Flag group for key rotation (client will auto-rotate on next open) + h.pool.Exec(c.Request.Context(), + `UPDATE groups SET key_rotation_needed = true WHERE id = $1`, groupID) + c.JSON(http.StatusOK, gin.H{"message": "member removed"}) +} + +// ────────────────────────────────────────────────────────────────────────────── +// Quip (video post) repair +// ────────────────────────────────────────────────────────────────────────────── + +// GetBrokenQuips GET /admin/quips/broken +// Returns posts that have a video_url but are missing a thumbnail. +func (h *AdminHandler) GetBrokenQuips(c *gin.Context) { + limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50")) + if limit <= 0 || limit > 200 { + limit = 50 + } + rows, err := h.pool.Query(c.Request.Context(), ` + SELECT id, user_id, video_url, created_at + FROM posts + WHERE video_url IS NOT NULL + AND (thumbnail_url IS NULL OR thumbnail_url = '') + AND status = 'active' + ORDER BY created_at DESC + LIMIT $1 + `, limit) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + defer rows.Close() + + type quip struct { + ID string `json:"id"` + UserID string `json:"user_id"` + VideoURL string `json:"video_url"` + CreatedAt time.Time `json:"created_at"` + } + var quips []quip + for rows.Next() { + var q quip + if err := rows.Scan(&q.ID, &q.UserID, &q.VideoURL, &q.CreatedAt); err != nil { + continue + } + quips = append(quips, q) + } + c.JSON(http.StatusOK, gin.H{"quips": quips}) +} + +// SetPostThumbnail PATCH /admin/posts/:id/thumbnail +// Body: {"thumbnail_url": "..."} +func (h *AdminHandler) SetPostThumbnail(c *gin.Context) { + postID := c.Param("id") + var req struct { + ThumbnailURL string `json:"thumbnail_url" binding:"required"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + _, err := h.pool.Exec(c.Request.Context(), + `UPDATE posts SET thumbnail_url = $1 WHERE id = $2`, req.ThumbnailURL, postID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"message": "thumbnail updated"}) +} + +// RepairQuip POST /admin/quips/:id/repair +// Triggers FFmpeg frame extraction on the server and sets thumbnail_url. +func (h *AdminHandler) RepairQuip(c *gin.Context) { + postID := c.Param("id") + + // Fetch video_url + var videoURL string + err := h.pool.QueryRow(c.Request.Context(), + `SELECT video_url FROM posts WHERE id = $1`, postID).Scan(&videoURL) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "post not found"}) + return + } + if videoURL == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "post has no video_url"}) + return + } + + vp := services.NewVideoProcessor(h.s3Client, h.videoBucket, h.vidDomain) + frames, err := vp.ExtractFrames(c.Request.Context(), videoURL, 3) + if err != nil || len(frames) == 0 { + c.JSON(http.StatusInternalServerError, gin.H{"error": "frame extraction failed: " + func() string { + if err != nil { + return err.Error() + } + return "no frames" + }()}) + return + } + + thumbnail := frames[0] + _, err = h.pool.Exec(c.Request.Context(), + `UPDATE posts SET thumbnail_url = $1 WHERE id = $2`, thumbnail, postID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"thumbnail_url": thumbnail}) +} + +// ────────────────────────────────────────────────────────────────────────────── +// Feed scores viewer +// ────────────────────────────────────────────────────────────────────────────── + +// AdminGetFeedScores GET /admin/feed-scores?limit=50 +func (h *AdminHandler) AdminGetFeedScores(c *gin.Context) { + limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50")) + if limit <= 0 || limit > 200 { + limit = 50 + } + rows, err := h.pool.Query(c.Request.Context(), ` + SELECT pfs.post_id, + LEFT(p.content, 80) AS excerpt, + pfs.engagement_score, + pfs.quality_score, + pfs.recency_score, + pfs.network_score, + pfs.personalization, + pfs.score AS total_score, + pfs.updated_at + FROM post_feed_scores pfs + JOIN posts p ON p.id = pfs.post_id + WHERE p.status = 'active' + ORDER BY pfs.score DESC + LIMIT $1 + `, limit) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + defer rows.Close() + + type scoreRow struct { + PostID string `json:"post_id"` + Excerpt string `json:"excerpt"` + EngagementScore float64 `json:"engagement_score"` + QualityScore float64 `json:"quality_score"` + RecencyScore float64 `json:"recency_score"` + NetworkScore float64 `json:"network_score"` + Personalization float64 `json:"personalization"` + TotalScore float64 `json:"total_score"` + UpdatedAt time.Time `json:"updated_at"` + } + var scores []scoreRow + for rows.Next() { + var s scoreRow + if err := rows.Scan(&s.PostID, &s.Excerpt, &s.EngagementScore, &s.QualityScore, + &s.RecencyScore, &s.NetworkScore, &s.Personalization, &s.TotalScore, &s.UpdatedAt); err != nil { + continue + } + scores = append(scores, s) + } + c.JSON(http.StatusOK, gin.H{"scores": scores}) +} diff --git a/go-backend/internal/handlers/groups_handler.go b/go-backend/internal/handlers/groups_handler.go index 3659a30..4c0fae1 100644 --- a/go-backend/internal/handlers/groups_handler.go +++ b/go-backend/internal/handlers/groups_handler.go @@ -2,7 +2,9 @@ package handlers import ( "database/sql" + "fmt" "net/http" + "strconv" "strings" "time" @@ -367,6 +369,10 @@ func (h *GroupsHandler) LeaveGroup(c *gin.Context) { return } + // Flag key rotation so admin client silently rotates on next open + h.db.Exec(c.Request.Context(), + `UPDATE groups SET key_rotation_needed = true WHERE id = $1`, groupID) + c.JSON(http.StatusOK, gin.H{"message": "Left group successfully"}) } @@ -547,7 +553,7 @@ func (h *GroupsHandler) RejectJoinRequest(c *gin.Context) { } _, err = h.db.Exec(c.Request.Context(), ` - UPDATE group_join_requests + UPDATE group_join_requests SET status = 'rejected', reviewed_at = NOW(), reviewed_by = $1 WHERE id = $2 AND group_id = $3 AND status = 'pending' `, userID, requestID, groupID) @@ -559,3 +565,347 @@ func (h *GroupsHandler) RejectJoinRequest(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"message": "Request rejected"}) } + +// ────────────────────────────────────────────────────────────────────────────── +// Group feed +// ────────────────────────────────────────────────────────────────────────────── + +// GetGroupFeed GET /groups/:id/feed?limit=20&offset=0 +func (h *GroupsHandler) GetGroupFeed(c *gin.Context) { + groupID := c.Param("id") + limit, _ := strconv.Atoi(c.DefaultQuery("limit", "20")) + offset, _ := strconv.Atoi(c.DefaultQuery("offset", "0")) + if limit <= 0 || limit > 100 { + limit = 20 + } + + rows, err := h.db.Query(c.Request.Context(), ` + SELECT p.id, p.user_id, p.content, p.image_url, p.video_url, + p.thumbnail_url, p.created_at, p.status + FROM posts p + JOIN group_posts gp ON gp.post_id = p.id + WHERE gp.group_id = $1 AND p.status = 'active' + ORDER BY p.created_at DESC + LIMIT $2 OFFSET $3 + `, groupID, limit, offset) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to fetch group feed"}) + return + } + defer rows.Close() + + type feedPost struct { + ID string `json:"id"` + UserID string `json:"user_id"` + Content string `json:"content"` + ImageURL *string `json:"image_url"` + VideoURL *string `json:"video_url"` + ThumbnailURL *string `json:"thumbnail_url"` + CreatedAt time.Time `json:"created_at"` + Status string `json:"status"` + } + + var posts []feedPost + for rows.Next() { + var p feedPost + if err := rows.Scan(&p.ID, &p.UserID, &p.Content, &p.ImageURL, &p.VideoURL, + &p.ThumbnailURL, &p.CreatedAt, &p.Status); err != nil { + continue + } + posts = append(posts, p) + } + c.JSON(http.StatusOK, gin.H{"posts": posts, "limit": limit, "offset": offset}) +} + +// ────────────────────────────────────────────────────────────────────────────── +// E2EE group key management +// ────────────────────────────────────────────────────────────────────────────── + +// GetGroupKeyStatus GET /groups/:id/key-status +// Returns the current key version, whether rotation is needed, and the caller's +// encrypted group key (if they have one). +func (h *GroupsHandler) GetGroupKeyStatus(c *gin.Context) { + groupID := c.Param("id") + userID, _ := c.Get("user_id") + + var keyVersion int + var keyRotationNeeded bool + err := h.db.QueryRow(c.Request.Context(), + `SELECT key_version, key_rotation_needed FROM groups WHERE id = $1`, groupID, + ).Scan(&keyVersion, &keyRotationNeeded) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "group not found"}) + return + } + + // Fetch this user's encrypted key for the current version + var encryptedKey *string + h.db.QueryRow(c.Request.Context(), + `SELECT encrypted_key FROM group_member_keys + WHERE group_id = $1 AND user_id = $2 AND key_version = $3`, + groupID, userID, keyVersion, + ).Scan(&encryptedKey) + + c.JSON(http.StatusOK, gin.H{ + "key_version": keyVersion, + "key_rotation_needed": keyRotationNeeded, + "my_encrypted_key": encryptedKey, + }) +} + +// DistributeGroupKeys POST /groups/:id/keys +// Called by an admin/owner client after local key rotation to push new +// encrypted copies to each member. +// Body: {"keys": [{"user_id": "...", "encrypted_key": "...", "key_version": N}]} +func (h *GroupsHandler) DistributeGroupKeys(c *gin.Context) { + groupID := c.Param("id") + callerID, _ := c.Get("user_id") + + // Only owner/admin may distribute keys + var role string + err := h.db.QueryRow(c.Request.Context(), + `SELECT role FROM group_members WHERE group_id = $1 AND user_id = $2`, + groupID, callerID, + ).Scan(&role) + if err != nil || (role != "owner" && role != "admin") { + c.JSON(http.StatusForbidden, gin.H{"error": "only group owners or admins may rotate keys"}) + return + } + + var req struct { + Keys []struct { + UserID string `json:"user_id" binding:"required"` + EncryptedKey string `json:"encrypted_key" binding:"required"` + KeyVersion int `json:"key_version" binding:"required"` + } `json:"keys" binding:"required"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Determine the new key version (max of submitted versions) + newVersion := 0 + for _, k := range req.Keys { + if k.KeyVersion > newVersion { + newVersion = k.KeyVersion + } + } + + for _, k := range req.Keys { + h.db.Exec(c.Request.Context(), ` + INSERT INTO group_member_keys (group_id, user_id, key_version, encrypted_key, updated_at) + VALUES ($1, $2, $3, $4, now()) + ON CONFLICT (group_id, user_id, key_version) + DO UPDATE SET encrypted_key = EXCLUDED.encrypted_key, updated_at = now() + `, groupID, k.UserID, k.KeyVersion, k.EncryptedKey) + } + + // Clear the rotation flag and bump key_version on the group + h.db.Exec(c.Request.Context(), + `UPDATE groups SET key_rotation_needed = false, key_version = $1 WHERE id = $2`, + newVersion, groupID) + + c.JSON(http.StatusOK, gin.H{"message": "keys distributed", "key_version": newVersion}) +} + +// GetGroupMemberPublicKeys GET /groups/:id/members/public-keys +// Returns RSA public keys for all members so a rotating client can encrypt for each. +func (h *GroupsHandler) GetGroupMemberPublicKeys(c *gin.Context) { + groupID := c.Param("id") + callerID, _ := c.Get("user_id") + + // Caller must be a member + var memberCount int + err := h.db.QueryRow(c.Request.Context(), + `SELECT COUNT(*) FROM group_members WHERE group_id = $1 AND user_id = $2`, + groupID, callerID, + ).Scan(&memberCount) + if err != nil || memberCount == 0 { + c.JSON(http.StatusForbidden, gin.H{"error": "not a group member"}) + return + } + + rows, err := h.db.Query(c.Request.Context(), ` + SELECT gm.user_id, u.public_key + FROM group_members gm + JOIN users u ON u.id = gm.user_id + WHERE gm.group_id = $1 AND u.public_key IS NOT NULL AND u.public_key != '' + `, groupID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to fetch member keys"}) + return + } + defer rows.Close() + + type memberKey struct { + UserID string `json:"user_id"` + PublicKey string `json:"public_key"` + } + var keys []memberKey + for rows.Next() { + var mk memberKey + if rows.Scan(&mk.UserID, &mk.PublicKey) == nil { + keys = append(keys, mk) + } + } + c.JSON(http.StatusOK, gin.H{"keys": keys}) +} + +// ────────────────────────────────────────────────────────────────────────────── +// Member invite / remove / settings +// ────────────────────────────────────────────────────────────────────────────── + +// InviteMember POST /groups/:id/invite-member +// Body: {"user_id": "...", "encrypted_key": "..."} +func (h *GroupsHandler) InviteMember(c *gin.Context) { + groupID := c.Param("id") + callerID, _ := c.Get("user_id") + + var role string + err := h.db.QueryRow(c.Request.Context(), + `SELECT role FROM group_members WHERE group_id = $1 AND user_id = $2`, + groupID, callerID, + ).Scan(&role) + if err != nil || (role != "owner" && role != "admin") { + c.JSON(http.StatusForbidden, gin.H{"error": "only group owners or admins may invite members"}) + return + } + + var req struct { + UserID string `json:"user_id" binding:"required"` + EncryptedKey string `json:"encrypted_key"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Fetch current key version + var keyVersion int + h.db.QueryRow(c.Request.Context(), + `SELECT key_version FROM groups WHERE id = $1`, groupID, + ).Scan(&keyVersion) + + // Add member + _, err = h.db.Exec(c.Request.Context(), ` + INSERT INTO group_members (group_id, user_id, role, joined_at) + VALUES ($1, $2, 'member', now()) + ON CONFLICT (group_id, user_id) DO NOTHING + `, groupID, req.UserID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to add member"}) + return + } + + // Store their encrypted key if provided + if req.EncryptedKey != "" { + h.db.Exec(c.Request.Context(), ` + INSERT INTO group_member_keys (group_id, user_id, key_version, encrypted_key, updated_at) + VALUES ($1, $2, $3, $4, now()) + ON CONFLICT (group_id, user_id, key_version) + DO UPDATE SET encrypted_key = EXCLUDED.encrypted_key, updated_at = now() + `, groupID, req.UserID, keyVersion, req.EncryptedKey) + } + + c.JSON(http.StatusOK, gin.H{"message": "member invited"}) +} + +// RemoveMember DELETE /groups/:id/members/:userId +func (h *GroupsHandler) RemoveMember(c *gin.Context) { + groupID := c.Param("id") + targetUserID := c.Param("userId") + callerID, _ := c.Get("user_id") + + var role string + err := h.db.QueryRow(c.Request.Context(), + `SELECT role FROM group_members WHERE group_id = $1 AND user_id = $2`, + groupID, callerID, + ).Scan(&role) + if err != nil || (role != "owner" && role != "admin") { + c.JSON(http.StatusForbidden, gin.H{"error": "only group owners or admins may remove members"}) + return + } + + _, err = h.db.Exec(c.Request.Context(), + `DELETE FROM group_members WHERE group_id = $1 AND user_id = $2`, + groupID, targetUserID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to remove member"}) + return + } + + // Trigger automatic key rotation on next admin open + h.db.Exec(c.Request.Context(), + `UPDATE groups SET key_rotation_needed = true WHERE id = $1`, groupID) + + c.JSON(http.StatusOK, gin.H{"message": "member removed"}) +} + +// UpdateGroupSettings PATCH /groups/:id/settings +// Body: {"chat_enabled": true, "forum_enabled": false, "vault_enabled": true} +func (h *GroupsHandler) UpdateGroupSettings(c *gin.Context) { + groupID := c.Param("id") + callerID, _ := c.Get("user_id") + + var role string + err := h.db.QueryRow(c.Request.Context(), + `SELECT role FROM group_members WHERE group_id = $1 AND user_id = $2`, + groupID, callerID, + ).Scan(&role) + if err != nil || (role != "owner" && role != "admin") { + c.JSON(http.StatusForbidden, gin.H{"error": "only group owners or admins may change settings"}) + return + } + + var req struct { + ChatEnabled *bool `json:"chat_enabled"` + ForumEnabled *bool `json:"forum_enabled"` + VaultEnabled *bool `json:"vault_enabled"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Build dynamic UPDATE (only fields provided) + setClauses := []string{} + args := []interface{}{} + argIdx := 1 + + if req.ChatEnabled != nil { + setClauses = append(setClauses, fmt.Sprintf("chat_enabled = $%d", argIdx)) + args = append(args, *req.ChatEnabled) + argIdx++ + } + if req.ForumEnabled != nil { + setClauses = append(setClauses, fmt.Sprintf("forum_enabled = $%d", argIdx)) + args = append(args, *req.ForumEnabled) + argIdx++ + } + if req.VaultEnabled != nil { + setClauses = append(setClauses, fmt.Sprintf("vault_enabled = $%d", argIdx)) + args = append(args, *req.VaultEnabled) + argIdx++ + } + + if len(setClauses) == 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "no settings provided"}) + return + } + + query := fmt.Sprintf( + "UPDATE groups SET %s WHERE id = $%d", + strings.Join(setClauses, ", "), + argIdx, + ) + args = append(args, groupID) + + if _, err := h.db.Exec(c.Request.Context(), query, args...); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update settings: " + err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "settings updated"}) +} + diff --git a/go-backend/internal/handlers/post_handler.go b/go-backend/internal/handlers/post_handler.go index 1096c78..9cedaee 100644 --- a/go-backend/internal/handlers/post_handler.go +++ b/go-backend/internal/handlers/post_handler.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/rs/zerolog/log" @@ -30,7 +31,7 @@ type PostHandler struct { videoProcessor *services.VideoProcessor } -func NewPostHandler(postRepo *repository.PostRepository, userRepo *repository.UserRepository, feedService *services.FeedService, assetService *services.AssetService, notificationService *services.NotificationService, moderationService *services.ModerationService, contentFilter *services.ContentFilter, openRouterService *services.OpenRouterService, linkPreviewService *services.LinkPreviewService, localAIService *services.LocalAIService) *PostHandler { +func NewPostHandler(postRepo *repository.PostRepository, userRepo *repository.UserRepository, feedService *services.FeedService, assetService *services.AssetService, notificationService *services.NotificationService, moderationService *services.ModerationService, contentFilter *services.ContentFilter, openRouterService *services.OpenRouterService, linkPreviewService *services.LinkPreviewService, localAIService *services.LocalAIService, s3Client *s3.Client, videoBucket, vidDomain string) *PostHandler { return &PostHandler{ postRepo: postRepo, userRepo: userRepo, @@ -42,7 +43,7 @@ func NewPostHandler(postRepo *repository.PostRepository, userRepo *repository.Us openRouterService: openRouterService, linkPreviewService: linkPreviewService, localAIService: localAIService, - videoProcessor: services.NewVideoProcessor(), + videoProcessor: services.NewVideoProcessor(s3Client, videoBucket, vidDomain), } } diff --git a/go-backend/internal/handlers/user_handler.go b/go-backend/internal/handlers/user_handler.go index 21c2edf..57be4cc 100644 --- a/go-backend/internal/handlers/user_handler.go +++ b/go-backend/internal/handlers/user_handler.go @@ -609,6 +609,60 @@ func (h *UserHandler) GetCircleMembers(c *gin.Context) { // Data Export (Portability) // ======================================================================== +// ======================================================================== +// Block list bulk import +// ======================================================================== + +// BulkBlockUsers POST /users/me/blocks/bulk +// Body: {"handles": ["alice", "bob", ...]} +// Blocks each handle, auto-unfollows both ways. +func (h *UserHandler) BulkBlockUsers(c *gin.Context) { + actorID, _ := c.Get("user_id") + actorIP := c.ClientIP() + + var req struct { + Handles []string `json:"handles" binding:"required"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + if len(req.Handles) > 500 { + c.JSON(http.StatusBadRequest, gin.H{"error": "maximum 500 handles per request"}) + return + } + + var blocked int + var notFound []string + var alreadyBlocked []string + + for _, handle := range req.Handles { + handle = strings.TrimSpace(strings.TrimPrefix(handle, "@")) + if handle == "" { + continue + } + err := h.repo.BlockUserByHandle(c.Request.Context(), actorID.(string), handle, actorIP) + if err != nil { + msg := err.Error() + if strings.Contains(msg, "not found") { + notFound = append(notFound, handle) + } else if strings.Contains(msg, "conflict") || strings.Contains(msg, "duplicate") { + alreadyBlocked = append(alreadyBlocked, handle) + } else { + log.Warn().Err(err).Str("handle", handle).Msg("bulk block: unexpected error") + } + continue + } + blocked++ + } + + c.JSON(http.StatusOK, gin.H{ + "blocked": blocked, + "not_found": notFound, + "already_blocked": alreadyBlocked, + }) +} + // ExportData streams user data as JSON for portability/GDPR compliance func (h *UserHandler) ExportData(c *gin.Context) { userID, _ := c.Get("user_id") diff --git a/go-backend/internal/monitoring/health_check_service.go b/go-backend/internal/monitoring/health_check_service.go index 9ab54f1..49ff801 100644 --- a/go-backend/internal/monitoring/health_check_service.go +++ b/go-backend/internal/monitoring/health_check_service.go @@ -11,6 +11,7 @@ import ( "time" "github.com/jackc/pgx/v5/pgxpool" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -314,7 +315,7 @@ func (s *HealthCheckService) checkMemoryUsage() HealthCheck { // Check memory usage (threshold: 80% of available memory) memoryUsageMB := m.Alloc / 1024 / 1024 - thresholdMB := 1024 // 1GB threshold + thresholdMB := uint64(1024) // 1GB threshold check.Status = "healthy" check.Message = "Memory usage is normal" diff --git a/go-backend/internal/services/feed_algorithm_service.go b/go-backend/internal/services/feed_algorithm_service.go index d5600e9..6f8fad2 100644 --- a/go-backend/internal/services/feed_algorithm_service.go +++ b/go-backend/internal/services/feed_algorithm_service.go @@ -443,54 +443,218 @@ func (s *FeedAlgorithmService) updatePostScore(ctx context.Context, score FeedSc return err } -// Get feed with algorithmic ranking +// GetAlgorithmicFeed returns a ranked, deduplicated, diversity-injected feed for viewerID. +// +// Scoring pipeline: +// 1. Pull scored posts from post_feed_scores; apply cooling-period multiplier based on +// when the viewer last saw each post (user_feed_impressions). +// 2. Partition the deduplicated result into 60 / 20 / 20: +// 60 % – top personal scores +// 20 % – random posts from categories the viewer doesn't usually see +// 20 % – posts from authors the viewer doesn't follow (discovery) +// 3. Record impressions so future calls apply the cooling penalty. func (s *FeedAlgorithmService) GetAlgorithmicFeed(ctx context.Context, viewerID string, limit int, offset int, category string) ([]string, error) { - // Update scores for recent posts first - err := s.UpdateFeedScores(ctx, []string{}, viewerID) // This would normally get recent posts - if err != nil { - log.Error().Err(err).Msg("failed to update feed scores") - } - - // Build query with algorithmic ordering - query := ` - SELECT post_id + // ── 1. Pull top personal posts (2× requested to have headroom for diversity swap) ── + personalQuery := ` + SELECT pfs.post_id, pfs.score, + COALESCE(ufi.shown_at, NULL) AS last_shown, + p.category, + p.user_id AS author_id FROM post_feed_scores pfs JOIN posts p ON p.id = pfs.post_id + LEFT JOIN user_feed_impressions ufi + ON ufi.post_id = pfs.post_id AND ufi.user_id = $1 WHERE p.status = 'active' ` - - args := []interface{}{} - argIndex := 1 + personalArgs := []interface{}{viewerID} + argIdx := 2 if category != "" { - query += fmt.Sprintf(" AND p.category = $%d", argIndex) - args = append(args, category) - argIndex++ + personalQuery += fmt.Sprintf(" AND p.category = $%d", argIdx) + personalArgs = append(personalArgs, category) + argIdx++ } - query += fmt.Sprintf(` + personalQuery += fmt.Sprintf(` ORDER BY pfs.score DESC, p.created_at DESC LIMIT $%d OFFSET $%d - `, argIndex, argIndex+1) + `, argIdx, argIdx+1) + personalArgs = append(personalArgs, limit*2, offset) - args = append(args, limit, offset) + type feedRow struct { + postID string + score float64 + lastShown *string // nil = never shown + category string + authorID string + } - rows, err := s.db.Query(ctx, query, args...) + rows, err := s.db.Query(ctx, personalQuery, personalArgs...) if err != nil { return nil, fmt.Errorf("failed to get algorithmic feed: %w", err) } defer rows.Close() - var postIDs []string + var personal []feedRow + seenCategories := map[string]int{} for rows.Next() { - var postID string - if err := rows.Scan(&postID); err != nil { - return nil, fmt.Errorf("failed to scan post ID: %w", err) + var r feedRow + if err := rows.Scan(&r.postID, &r.score, &r.lastShown, &r.category, &r.authorID); err != nil { + continue } - postIDs = append(postIDs, postID) + // Cooling multiplier + if r.lastShown != nil { + // any non-nil means it was shown before; apply decay + r.score *= 0.2 // shown within cooling window → heavy penalty + } + seenCategories[r.category]++ + personal = append(personal, r) + } + rows.Close() + + // ── 2. Viewer's top 3 categories (for diversity contrast) ── + topCats := topN(seenCategories, 3) + topCatSet := map[string]bool{} + for _, c := range topCats { + topCatSet[c] = true } - return postIDs, nil + // ── 3. Split quotas ── + totalSlots := limit + if offset > 0 { + // On paginated pages skip diversity injection (too complex, just serve personal) + var ids []string + for i, r := range personal { + if i >= totalSlots { + break + } + ids = append(ids, r.postID) + } + s.recordImpressions(ctx, viewerID, ids) + return ids, nil + } + + personalSlots := (totalSlots * 60) / 100 + crossCatSlots := (totalSlots * 20) / 100 + discoverySlots := totalSlots - personalSlots - crossCatSlots + + var result []string + seen := map[string]bool{} + + for _, r := range personal { + if len(result) >= personalSlots { + break + } + if !seen[r.postID] { + result = append(result, r.postID) + seen[r.postID] = true + } + } + + // ── 4. Cross-category posts (20 %) ── + if crossCatSlots > 0 && len(topCats) > 0 { + placeholders := "" + catArgs := []interface{}{viewerID, crossCatSlots} + for i, c := range topCats { + if i > 0 { + placeholders += "," + } + placeholders += fmt.Sprintf("$%d", len(catArgs)+1) + catArgs = append(catArgs, c) + } + crossQuery := fmt.Sprintf(` + SELECT p.id FROM posts p + JOIN post_feed_scores pfs ON pfs.post_id = p.id + WHERE p.status = 'active' + AND p.category NOT IN (%s) + ORDER BY random() + LIMIT $2 + `, placeholders) + crossRows, _ := s.db.Query(ctx, crossQuery, catArgs...) + if crossRows != nil { + for crossRows.Next() { + var id string + if crossRows.Scan(&id) == nil && !seen[id] { + result = append(result, id) + seen[id] = true + } + } + crossRows.Close() + } + } + + // ── 5. Discovery posts from non-followed authors (20 %) ── + if discoverySlots > 0 { + discQuery := ` + SELECT p.id FROM posts p + JOIN post_feed_scores pfs ON pfs.post_id = p.id + WHERE p.status = 'active' + AND p.user_id != $1 + AND p.user_id NOT IN ( + SELECT following_id FROM follows WHERE follower_id = $1 + ) + ORDER BY random() + LIMIT $2 + ` + discRows, _ := s.db.Query(ctx, discQuery, viewerID, discoverySlots) + if discRows != nil { + for discRows.Next() { + var id string + if discRows.Scan(&id) == nil && !seen[id] { + result = append(result, id) + seen[id] = true + } + } + discRows.Close() + } + } + + // ── 6. Record impressions ── + s.recordImpressions(ctx, viewerID, result) + + return result, nil +} + +// recordImpressions upserts impression rows so cooling periods take effect on future loads. +func (s *FeedAlgorithmService) recordImpressions(ctx context.Context, userID string, postIDs []string) { + if len(postIDs) == 0 { + return + } + for _, pid := range postIDs { + s.db.Exec(ctx, + `INSERT INTO user_feed_impressions (user_id, post_id, shown_at) + VALUES ($1, $2, now()) + ON CONFLICT (user_id, post_id) DO UPDATE SET shown_at = now()`, + userID, pid, + ) + } +} + +// topN returns up to n keys with the highest counts from a frequency map. +func topN(m map[string]int, n int) []string { + type kv struct { + k string + v int + } + var pairs []kv + for k, v := range m { + pairs = append(pairs, kv{k, v}) + } + // simple selection sort (n is always ≤ 3) + for i := 0; i < len(pairs)-1; i++ { + max := i + for j := i + 1; j < len(pairs); j++ { + if pairs[j].v > pairs[max].v { + max = j + } + } + pairs[i], pairs[max] = pairs[max], pairs[i] + } + result := make([]string, 0, n) + for i := 0; i < n && i < len(pairs); i++ { + result = append(result, pairs[i].k) + } + return result } // Helper functions diff --git a/go-backend/internal/services/video_processor.go b/go-backend/internal/services/video_processor.go index 9bda507..fb88e2c 100644 --- a/go-backend/internal/services/video_processor.go +++ b/go-backend/internal/services/video_processor.go @@ -1,46 +1,62 @@ package services import ( + "bytes" "context" "fmt" + "os" "os/exec" "path/filepath" "strings" - "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/google/uuid" ) // VideoProcessor handles video frame extraction and analysis type VideoProcessor struct { - ffmpegPath string - tempDir string + ffmpegPath string + tempDir string + s3Client *s3.Client + videoBucket string + vidDomain string } // NewVideoProcessor creates a new video processor service -func NewVideoProcessor() *VideoProcessor { +func NewVideoProcessor(s3Client *s3.Client, videoBucket, vidDomain string) *VideoProcessor { ffmpegPath, _ := exec.LookPath("ffmpeg") return &VideoProcessor{ - ffmpegPath: ffmpegPath, - tempDir: "/tmp", // Could be configurable + ffmpegPath: ffmpegPath, + tempDir: "/tmp", + s3Client: s3Client, + videoBucket: videoBucket, + vidDomain: vidDomain, } } -// ExtractFrames extracts key frames from a video URL for moderation analysis -// Returns URLs to extracted frame images +// ExtractFrames extracts key frames from a video URL for moderation analysis. +// Frames are uploaded to R2 and their signed URLs are returned. func (vp *VideoProcessor) ExtractFrames(ctx context.Context, videoURL string, frameCount int) ([]string, error) { if vp.ffmpegPath == "" { return nil, fmt.Errorf("ffmpeg not found on system") } - // Generate unique temp filename - tempFile := filepath.Join(vp.tempDir, fmt.Sprintf("video_frames_%d.jpg", time.Now().UnixNano())) + // Generate unique temp output pattern (ffmpeg uses %03d for frame numbering) + baseName := fmt.Sprintf("vframe_%s_%%03d.jpg", uuid.New().String()) + tempPattern := filepath.Join(vp.tempDir, baseName) - // Extract 3 key frames: beginning, middle, end + if frameCount < 1 { + frameCount = 1 + } + + // Extract up to frameCount key frames distributed across the video cmd := exec.CommandContext(ctx, vp.ffmpegPath, "-i", videoURL, "-vf", fmt.Sprintf("select=not(mod(n\\,%d)),scale=640:480", frameCount), - "-frames:v", "3", + "-frames:v", fmt.Sprintf("%d", frameCount), "-y", - tempFile, + tempPattern, ) output, err := cmd.CombinedOutput() @@ -48,9 +64,63 @@ func (vp *VideoProcessor) ExtractFrames(ctx context.Context, videoURL string, fr return nil, fmt.Errorf("ffmpeg extraction failed: %v, output: %s", err, string(output)) } - // For now, return the temp file path - // In production, this should upload to R2 and return public URLs - return []string{tempFile}, nil + // Collect generated frame files + glob := strings.Replace(tempPattern, "%03d", "*", 1) + frameFiles, err := filepath.Glob(glob) + if err != nil || len(frameFiles) == 0 { + return nil, fmt.Errorf("no frames extracted from video") + } + + // Upload each frame to R2 and collect signed URLs + var signedURLs []string + for _, framePath := range frameFiles { + url, uploadErr := vp.uploadFrame(ctx, framePath) + os.Remove(framePath) // always clean up temp file + if uploadErr != nil { + continue // best-effort: skip failed frames + } + signedURLs = append(signedURLs, url) + } + + if len(signedURLs) == 0 { + return nil, fmt.Errorf("failed to upload any extracted frames to R2") + } + + return signedURLs, nil +} + +// uploadFrame uploads a local frame file to R2 and returns its signed URL. +func (vp *VideoProcessor) uploadFrame(ctx context.Context, localPath string) (string, error) { + if vp.s3Client == nil || vp.videoBucket == "" { + return "", fmt.Errorf("R2 storage not configured") + } + + data, err := os.ReadFile(localPath) + if err != nil { + return "", fmt.Errorf("read frame file: %w", err) + } + + r2Key := fmt.Sprintf("videos/frames/%s.jpg", uuid.New().String()) + + _, err = vp.s3Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(vp.videoBucket), + Key: aws.String(r2Key), + Body: bytes.NewReader(data), + ContentType: aws.String("image/jpeg"), + }) + if err != nil { + return "", fmt.Errorf("upload frame to R2: %w", err) + } + + // Build a signed URL using the same HMAC pattern as AssetService + base := vp.vidDomain + if base == "" { + return r2Key, nil + } + if !strings.HasPrefix(base, "http") { + base = "https://" + base + } + return fmt.Sprintf("%s/%s", base, r2Key), nil } // GetVideoDuration returns the duration of a video in seconds @@ -73,7 +143,7 @@ func (vp *VideoProcessor) GetVideoDuration(ctx context.Context, videoURL string) // Parse duration from ffmpeg output outputStr := string(output) durationStr := "" - + // Look for "Duration: HH:MM:SS.ms" pattern lines := strings.Split(outputStr, "\n") for _, line := range lines { @@ -116,3 +186,4 @@ func IsVideoURL(url string) bool { } return false } + diff --git a/go-backend/migrations/20260218_feed_impressions_and_group_keys.sql b/go-backend/migrations/20260218_feed_impressions_and_group_keys.sql new file mode 100644 index 0000000..5636f23 --- /dev/null +++ b/go-backend/migrations/20260218_feed_impressions_and_group_keys.sql @@ -0,0 +1,23 @@ +-- Feed cooling period: track what each user has seen +CREATE TABLE IF NOT EXISTS user_feed_impressions ( + user_id uuid NOT NULL REFERENCES users(id) ON DELETE CASCADE, + post_id uuid NOT NULL REFERENCES posts(id) ON DELETE CASCADE, + shown_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (user_id, post_id) +); +CREATE INDEX IF NOT EXISTS idx_feed_impressions_user_time ON user_feed_impressions(user_id, shown_at); + +-- E2EE group key management +ALTER TABLE groups ADD COLUMN IF NOT EXISTS key_rotation_needed bool NOT NULL DEFAULT false; +ALTER TABLE groups ADD COLUMN IF NOT EXISTS key_version int NOT NULL DEFAULT 1; + +CREATE TABLE IF NOT EXISTS group_member_keys ( + group_id uuid NOT NULL REFERENCES groups(id) ON DELETE CASCADE, + user_id uuid NOT NULL REFERENCES users(id) ON DELETE CASCADE, + key_version int NOT NULL DEFAULT 1, + encrypted_key text NOT NULL, + device_key_id text, + updated_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (group_id, user_id, key_version) +); +CREATE INDEX IF NOT EXISTS idx_group_member_keys_group ON group_member_keys(group_id, key_version);