feat: Feed cooling, group key rotation, admin groups/quip repair, bulk block, health endpoints
- video_processor: upload extracted frames to R2, return signed URLs - feed_algorithm: cooling period (0.2x multiplier) + 60/20/20 diversity injection + record impressions - groups_handler: group feed, E2EE key-status/distribute/public-keys, invite/remove member, settings - admin_handler: groups CRUD, quip repair (FFmpeg to R2), feed scores viewer - user_handler: BulkBlockUsers POST /users/me/blocks/bulk - main.go: wire health check (/health/detailed /ready /live) + all new routes - monitoring: fix pre-existing zerolog import + uint64 type errors - migration: user_feed_impressions, group_member_keys, groups key columns Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
e0056789ac
commit
1da62185f9
|
|
@ -21,6 +21,7 @@ import (
|
|||
"gitlab.com/patrickbritton3/sojorn/go-backend/internal/config"
|
||||
"gitlab.com/patrickbritton3/sojorn/go-backend/internal/handlers"
|
||||
"gitlab.com/patrickbritton3/sojorn/go-backend/internal/middleware"
|
||||
"gitlab.com/patrickbritton3/sojorn/go-backend/internal/monitoring"
|
||||
"gitlab.com/patrickbritton3/sojorn/go-backend/internal/realtime"
|
||||
"gitlab.com/patrickbritton3/sojorn/go-backend/internal/repository"
|
||||
"gitlab.com/patrickbritton3/sojorn/go-backend/internal/services"
|
||||
|
|
@ -169,7 +170,7 @@ func main() {
|
|||
linkPreviewService := services.NewLinkPreviewService(dbPool, s3Client, cfg.R2MediaBucket, cfg.R2ImgDomain)
|
||||
|
||||
userHandler := handlers.NewUserHandler(userRepo, postRepo, notificationService, assetService)
|
||||
postHandler := handlers.NewPostHandler(postRepo, userRepo, feedService, assetService, notificationService, moderationService, contentFilter, openRouterService, linkPreviewService, localAIService)
|
||||
postHandler := handlers.NewPostHandler(postRepo, userRepo, feedService, assetService, notificationService, moderationService, contentFilter, openRouterService, linkPreviewService, localAIService, s3Client, cfg.R2VideoBucket, cfg.R2VidDomain)
|
||||
chatHandler := handlers.NewChatHandler(chatRepo, notificationService, hub)
|
||||
authHandler := handlers.NewAuthHandler(userRepo, cfg, emailService, sendPulseService)
|
||||
categoryHandler := handlers.NewCategoryHandler(categoryRepo)
|
||||
|
|
@ -216,6 +217,9 @@ func main() {
|
|||
// Feed algorithm service (scores posts for ranked feed)
|
||||
feedAlgorithmService := services.NewFeedAlgorithmService(dbPool)
|
||||
|
||||
// Health check service
|
||||
hcService := monitoring.NewHealthCheckService(dbPool)
|
||||
|
||||
// Repost & profile layout handlers
|
||||
repostHandler := handlers.NewRepostHandler(dbPool)
|
||||
profileLayoutHandler := handlers.NewProfileLayoutHandler(dbPool)
|
||||
|
|
@ -228,6 +232,9 @@ func main() {
|
|||
r.HEAD("/health", func(c *gin.Context) {
|
||||
c.Status(200)
|
||||
})
|
||||
r.GET("/health/detailed", gin.WrapF(hcService.HealthCheckHandler))
|
||||
r.GET("/health/ready", gin.WrapF(hcService.ReadinessHandler))
|
||||
r.GET("/health/live", gin.WrapF(hcService.LivenessHandler))
|
||||
|
||||
// ALTCHA challenge endpoints (direct to main router for testing)
|
||||
r.GET("/api/v1/auth/altcha-challenge", authHandler.GetAltchaChallenge)
|
||||
|
|
@ -308,6 +315,7 @@ func main() {
|
|||
users.GET("/blocked", userHandler.GetBlockedUsers)
|
||||
users.POST("/report", userHandler.ReportUser)
|
||||
users.POST("/block_by_handle", userHandler.BlockUserByHandle)
|
||||
users.POST("/me/blocks/bulk", userHandler.BulkBlockUsers)
|
||||
|
||||
// Social Graph: Followers & Following
|
||||
users.GET("/:id/followers", userHandler.GetFollowers)
|
||||
|
|
@ -489,6 +497,13 @@ func main() {
|
|||
groups.GET("/:id/requests", groupsHandler.GetPendingRequests) // Get pending join requests (admin)
|
||||
groups.POST("/:id/requests/:requestId/approve", groupsHandler.ApproveJoinRequest) // Approve join request
|
||||
groups.POST("/:id/requests/:requestId/reject", groupsHandler.RejectJoinRequest) // Reject join request
|
||||
groups.GET("/:id/feed", groupsHandler.GetGroupFeed)
|
||||
groups.GET("/:id/key-status", groupsHandler.GetGroupKeyStatus)
|
||||
groups.POST("/:id/keys", groupsHandler.DistributeGroupKeys)
|
||||
groups.GET("/:id/members/public-keys", groupsHandler.GetGroupMemberPublicKeys)
|
||||
groups.POST("/:id/invite-member", groupsHandler.InviteMember)
|
||||
groups.DELETE("/:id/members/:userId", groupsHandler.RemoveMember)
|
||||
groups.PATCH("/:id/settings", groupsHandler.UpdateGroupSettings)
|
||||
}
|
||||
|
||||
// Capsule system (E2EE groups + clusters)
|
||||
|
|
@ -699,6 +714,21 @@ func main() {
|
|||
admin.GET("/email-templates/:id", adminHandler.GetEmailTemplate)
|
||||
admin.PATCH("/email-templates/:id", adminHandler.UpdateEmailTemplate)
|
||||
admin.POST("/email-templates/test", adminHandler.SendTestEmail)
|
||||
|
||||
// Groups admin
|
||||
admin.GET("/groups", adminHandler.AdminListGroups)
|
||||
admin.GET("/groups/:id", adminHandler.AdminGetGroup)
|
||||
admin.DELETE("/groups/:id", adminHandler.AdminDeleteGroup)
|
||||
admin.GET("/groups/:id/members", adminHandler.AdminListGroupMembers)
|
||||
admin.DELETE("/groups/:id/members/:userId", adminHandler.AdminRemoveGroupMember)
|
||||
|
||||
// Quip repair
|
||||
admin.GET("/quips/broken", adminHandler.GetBrokenQuips)
|
||||
admin.PATCH("/posts/:id/thumbnail", adminHandler.SetPostThumbnail)
|
||||
admin.POST("/quips/:id/repair", adminHandler.RepairQuip)
|
||||
|
||||
// Feed scores viewer
|
||||
admin.GET("/feed-scores", adminHandler.AdminGetFeedScores)
|
||||
}
|
||||
|
||||
// Public claim request endpoint (no auth)
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ require (
|
|||
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.53.0 // indirect
|
||||
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 // indirect
|
||||
github.com/MicahParks/keyfunc v1.9.0 // indirect
|
||||
github.com/altcha-org/altcha-lib-go v1.0.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 // indirect
|
||||
|
|
|
|||
|
|
@ -34,6 +34,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp
|
|||
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0/go.mod h1:cSgYe11MCNYunTnRXrKiR/tHc0eoKjICUuWpNZoVCOo=
|
||||
github.com/MicahParks/keyfunc v1.9.0 h1:lhKd5xrFHLNOWrDc4Tyb/Q1AJ4LCzQ48GVJyVIID3+o=
|
||||
github.com/MicahParks/keyfunc v1.9.0/go.mod h1:IdnCilugA0O/99dW+/MkvlyrsX8+L8+x95xuVNtM5jw=
|
||||
github.com/altcha-org/altcha-lib-go v1.0.0 h1:7oPti0aUS+YCep8nwt5b9g4jYfCU55ZruWESL8G9K5M=
|
||||
github.com/altcha-org/altcha-lib-go v1.0.0/go.mod h1:I8ESLVWR9C58uvGufB/AJDPhaSU4+4Oh3DLpVtgwDAk=
|
||||
github.com/aws/aws-sdk-go-v2 v1.41.1 h1:ABlyEARCDLN034NhxlRUSZr4l71mh+T5KAeGh6cerhU=
|
||||
github.com/aws/aws-sdk-go-v2 v1.41.1/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0=
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 h1:489krEF9xIGkOaaX3CE/Be2uWjiXrkCH6gUX+bZA/BU=
|
||||
|
|
|
|||
|
|
@ -4138,3 +4138,323 @@ func (h *AdminHandler) GetAltchaChallenge(c *gin.Context) {
|
|||
|
||||
c.JSON(http.StatusOK, challenge)
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// Groups admin
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// AdminListGroups GET /admin/groups?search=&limit=50&offset=0
|
||||
func (h *AdminHandler) AdminListGroups(c *gin.Context) {
|
||||
search := c.Query("search")
|
||||
limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50"))
|
||||
offset, _ := strconv.Atoi(c.DefaultQuery("offset", "0"))
|
||||
if limit <= 0 || limit > 200 {
|
||||
limit = 50
|
||||
}
|
||||
|
||||
query := `
|
||||
SELECT g.id, g.name, g.description, g.is_private, g.status,
|
||||
g.created_at, g.key_version, g.key_rotation_needed,
|
||||
COUNT(DISTINCT gm.user_id) AS member_count,
|
||||
COUNT(DISTINCT gp.post_id) AS post_count
|
||||
FROM groups g
|
||||
LEFT JOIN group_members gm ON gm.group_id = g.id
|
||||
LEFT JOIN group_posts gp ON gp.group_id = g.id
|
||||
`
|
||||
args := []interface{}{}
|
||||
if search != "" {
|
||||
query += " WHERE g.name ILIKE $1 OR g.description ILIKE $1"
|
||||
args = append(args, "%"+search+"%")
|
||||
}
|
||||
query += fmt.Sprintf(`
|
||||
GROUP BY g.id
|
||||
ORDER BY g.created_at DESC
|
||||
LIMIT $%d OFFSET $%d`, len(args)+1, len(args)+2)
|
||||
args = append(args, limit, offset)
|
||||
|
||||
rows, err := h.pool.Query(c.Request.Context(), query, args...)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type groupRow struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description"`
|
||||
IsPrivate bool `json:"is_private"`
|
||||
Status string `json:"status"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
KeyVersion int `json:"key_version"`
|
||||
KeyRotationNeeded bool `json:"key_rotation_needed"`
|
||||
MemberCount int `json:"member_count"`
|
||||
PostCount int `json:"post_count"`
|
||||
}
|
||||
|
||||
var groups []groupRow
|
||||
for rows.Next() {
|
||||
var g groupRow
|
||||
if err := rows.Scan(&g.ID, &g.Name, &g.Description, &g.IsPrivate, &g.Status,
|
||||
&g.CreatedAt, &g.KeyVersion, &g.KeyRotationNeeded, &g.MemberCount, &g.PostCount); err != nil {
|
||||
continue
|
||||
}
|
||||
groups = append(groups, g)
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"groups": groups, "limit": limit, "offset": offset})
|
||||
}
|
||||
|
||||
// AdminGetGroup GET /admin/groups/:id
|
||||
func (h *AdminHandler) AdminGetGroup(c *gin.Context) {
|
||||
groupID := c.Param("id")
|
||||
row := h.pool.QueryRow(c.Request.Context(), `
|
||||
SELECT g.id, g.name, g.description, g.is_private, g.status, g.created_at,
|
||||
g.key_version, g.key_rotation_needed,
|
||||
COUNT(DISTINCT gm.user_id) AS member_count,
|
||||
COUNT(DISTINCT gp.post_id) AS post_count
|
||||
FROM groups g
|
||||
LEFT JOIN group_members gm ON gm.group_id = g.id
|
||||
LEFT JOIN group_posts gp ON gp.group_id = g.id
|
||||
WHERE g.id = $1
|
||||
GROUP BY g.id
|
||||
`, groupID)
|
||||
|
||||
var g struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description"`
|
||||
IsPrivate bool `json:"is_private"`
|
||||
Status string `json:"status"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
KeyVersion int `json:"key_version"`
|
||||
KeyRotationNeeded bool `json:"key_rotation_needed"`
|
||||
MemberCount int `json:"member_count"`
|
||||
PostCount int `json:"post_count"`
|
||||
}
|
||||
if err := row.Scan(&g.ID, &g.Name, &g.Description, &g.IsPrivate, &g.Status, &g.CreatedAt,
|
||||
&g.KeyVersion, &g.KeyRotationNeeded, &g.MemberCount, &g.PostCount); err != nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "group not found"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, g)
|
||||
}
|
||||
|
||||
// AdminDeleteGroup DELETE /admin/groups/:id (soft delete)
|
||||
func (h *AdminHandler) AdminDeleteGroup(c *gin.Context) {
|
||||
groupID := c.Param("id")
|
||||
_, err := h.pool.Exec(c.Request.Context(),
|
||||
`UPDATE groups SET status = 'inactive' WHERE id = $1`, groupID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"message": "group deactivated"})
|
||||
}
|
||||
|
||||
// AdminListGroupMembers GET /admin/groups/:id/members
|
||||
func (h *AdminHandler) AdminListGroupMembers(c *gin.Context) {
|
||||
groupID := c.Param("id")
|
||||
rows, err := h.pool.Query(c.Request.Context(), `
|
||||
SELECT gm.user_id, u.username, u.display_name, gm.role, gm.joined_at
|
||||
FROM group_members gm
|
||||
JOIN users u ON u.id = gm.user_id
|
||||
WHERE gm.group_id = $1
|
||||
ORDER BY gm.joined_at
|
||||
`, groupID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type member struct {
|
||||
UserID string `json:"user_id"`
|
||||
Username string `json:"username"`
|
||||
DisplayName string `json:"display_name"`
|
||||
Role string `json:"role"`
|
||||
JoinedAt time.Time `json:"joined_at"`
|
||||
}
|
||||
var members []member
|
||||
for rows.Next() {
|
||||
var m member
|
||||
if err := rows.Scan(&m.UserID, &m.Username, &m.DisplayName, &m.Role, &m.JoinedAt); err != nil {
|
||||
continue
|
||||
}
|
||||
members = append(members, m)
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"members": members})
|
||||
}
|
||||
|
||||
// AdminRemoveGroupMember DELETE /admin/groups/:id/members/:userId
|
||||
func (h *AdminHandler) AdminRemoveGroupMember(c *gin.Context) {
|
||||
groupID := c.Param("id")
|
||||
userID := c.Param("userId")
|
||||
_, err := h.pool.Exec(c.Request.Context(),
|
||||
`DELETE FROM group_members WHERE group_id = $1 AND user_id = $2`, groupID, userID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
// Flag group for key rotation (client will auto-rotate on next open)
|
||||
h.pool.Exec(c.Request.Context(),
|
||||
`UPDATE groups SET key_rotation_needed = true WHERE id = $1`, groupID)
|
||||
c.JSON(http.StatusOK, gin.H{"message": "member removed"})
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// Quip (video post) repair
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// GetBrokenQuips GET /admin/quips/broken
|
||||
// Returns posts that have a video_url but are missing a thumbnail.
|
||||
func (h *AdminHandler) GetBrokenQuips(c *gin.Context) {
|
||||
limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50"))
|
||||
if limit <= 0 || limit > 200 {
|
||||
limit = 50
|
||||
}
|
||||
rows, err := h.pool.Query(c.Request.Context(), `
|
||||
SELECT id, user_id, video_url, created_at
|
||||
FROM posts
|
||||
WHERE video_url IS NOT NULL
|
||||
AND (thumbnail_url IS NULL OR thumbnail_url = '')
|
||||
AND status = 'active'
|
||||
ORDER BY created_at DESC
|
||||
LIMIT $1
|
||||
`, limit)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type quip struct {
|
||||
ID string `json:"id"`
|
||||
UserID string `json:"user_id"`
|
||||
VideoURL string `json:"video_url"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
var quips []quip
|
||||
for rows.Next() {
|
||||
var q quip
|
||||
if err := rows.Scan(&q.ID, &q.UserID, &q.VideoURL, &q.CreatedAt); err != nil {
|
||||
continue
|
||||
}
|
||||
quips = append(quips, q)
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"quips": quips})
|
||||
}
|
||||
|
||||
// SetPostThumbnail PATCH /admin/posts/:id/thumbnail
|
||||
// Body: {"thumbnail_url": "..."}
|
||||
func (h *AdminHandler) SetPostThumbnail(c *gin.Context) {
|
||||
postID := c.Param("id")
|
||||
var req struct {
|
||||
ThumbnailURL string `json:"thumbnail_url" binding:"required"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
_, err := h.pool.Exec(c.Request.Context(),
|
||||
`UPDATE posts SET thumbnail_url = $1 WHERE id = $2`, req.ThumbnailURL, postID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"message": "thumbnail updated"})
|
||||
}
|
||||
|
||||
// RepairQuip POST /admin/quips/:id/repair
|
||||
// Triggers FFmpeg frame extraction on the server and sets thumbnail_url.
|
||||
func (h *AdminHandler) RepairQuip(c *gin.Context) {
|
||||
postID := c.Param("id")
|
||||
|
||||
// Fetch video_url
|
||||
var videoURL string
|
||||
err := h.pool.QueryRow(c.Request.Context(),
|
||||
`SELECT video_url FROM posts WHERE id = $1`, postID).Scan(&videoURL)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "post not found"})
|
||||
return
|
||||
}
|
||||
if videoURL == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "post has no video_url"})
|
||||
return
|
||||
}
|
||||
|
||||
vp := services.NewVideoProcessor(h.s3Client, h.videoBucket, h.vidDomain)
|
||||
frames, err := vp.ExtractFrames(c.Request.Context(), videoURL, 3)
|
||||
if err != nil || len(frames) == 0 {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "frame extraction failed: " + func() string {
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
}
|
||||
return "no frames"
|
||||
}()})
|
||||
return
|
||||
}
|
||||
|
||||
thumbnail := frames[0]
|
||||
_, err = h.pool.Exec(c.Request.Context(),
|
||||
`UPDATE posts SET thumbnail_url = $1 WHERE id = $2`, thumbnail, postID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"thumbnail_url": thumbnail})
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// Feed scores viewer
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// AdminGetFeedScores GET /admin/feed-scores?limit=50
|
||||
func (h *AdminHandler) AdminGetFeedScores(c *gin.Context) {
|
||||
limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50"))
|
||||
if limit <= 0 || limit > 200 {
|
||||
limit = 50
|
||||
}
|
||||
rows, err := h.pool.Query(c.Request.Context(), `
|
||||
SELECT pfs.post_id,
|
||||
LEFT(p.content, 80) AS excerpt,
|
||||
pfs.engagement_score,
|
||||
pfs.quality_score,
|
||||
pfs.recency_score,
|
||||
pfs.network_score,
|
||||
pfs.personalization,
|
||||
pfs.score AS total_score,
|
||||
pfs.updated_at
|
||||
FROM post_feed_scores pfs
|
||||
JOIN posts p ON p.id = pfs.post_id
|
||||
WHERE p.status = 'active'
|
||||
ORDER BY pfs.score DESC
|
||||
LIMIT $1
|
||||
`, limit)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type scoreRow struct {
|
||||
PostID string `json:"post_id"`
|
||||
Excerpt string `json:"excerpt"`
|
||||
EngagementScore float64 `json:"engagement_score"`
|
||||
QualityScore float64 `json:"quality_score"`
|
||||
RecencyScore float64 `json:"recency_score"`
|
||||
NetworkScore float64 `json:"network_score"`
|
||||
Personalization float64 `json:"personalization"`
|
||||
TotalScore float64 `json:"total_score"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
var scores []scoreRow
|
||||
for rows.Next() {
|
||||
var s scoreRow
|
||||
if err := rows.Scan(&s.PostID, &s.Excerpt, &s.EngagementScore, &s.QualityScore,
|
||||
&s.RecencyScore, &s.NetworkScore, &s.Personalization, &s.TotalScore, &s.UpdatedAt); err != nil {
|
||||
continue
|
||||
}
|
||||
scores = append(scores, s)
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"scores": scores})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,9 @@ package handlers
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
|
@ -367,6 +369,10 @@ func (h *GroupsHandler) LeaveGroup(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
// Flag key rotation so admin client silently rotates on next open
|
||||
h.db.Exec(c.Request.Context(),
|
||||
`UPDATE groups SET key_rotation_needed = true WHERE id = $1`, groupID)
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"message": "Left group successfully"})
|
||||
}
|
||||
|
||||
|
|
@ -547,7 +553,7 @@ func (h *GroupsHandler) RejectJoinRequest(c *gin.Context) {
|
|||
}
|
||||
|
||||
_, err = h.db.Exec(c.Request.Context(), `
|
||||
UPDATE group_join_requests
|
||||
UPDATE group_join_requests
|
||||
SET status = 'rejected', reviewed_at = NOW(), reviewed_by = $1
|
||||
WHERE id = $2 AND group_id = $3 AND status = 'pending'
|
||||
`, userID, requestID, groupID)
|
||||
|
|
@ -559,3 +565,347 @@ func (h *GroupsHandler) RejectJoinRequest(c *gin.Context) {
|
|||
|
||||
c.JSON(http.StatusOK, gin.H{"message": "Request rejected"})
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// Group feed
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// GetGroupFeed GET /groups/:id/feed?limit=20&offset=0
|
||||
func (h *GroupsHandler) GetGroupFeed(c *gin.Context) {
|
||||
groupID := c.Param("id")
|
||||
limit, _ := strconv.Atoi(c.DefaultQuery("limit", "20"))
|
||||
offset, _ := strconv.Atoi(c.DefaultQuery("offset", "0"))
|
||||
if limit <= 0 || limit > 100 {
|
||||
limit = 20
|
||||
}
|
||||
|
||||
rows, err := h.db.Query(c.Request.Context(), `
|
||||
SELECT p.id, p.user_id, p.content, p.image_url, p.video_url,
|
||||
p.thumbnail_url, p.created_at, p.status
|
||||
FROM posts p
|
||||
JOIN group_posts gp ON gp.post_id = p.id
|
||||
WHERE gp.group_id = $1 AND p.status = 'active'
|
||||
ORDER BY p.created_at DESC
|
||||
LIMIT $2 OFFSET $3
|
||||
`, groupID, limit, offset)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to fetch group feed"})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type feedPost struct {
|
||||
ID string `json:"id"`
|
||||
UserID string `json:"user_id"`
|
||||
Content string `json:"content"`
|
||||
ImageURL *string `json:"image_url"`
|
||||
VideoURL *string `json:"video_url"`
|
||||
ThumbnailURL *string `json:"thumbnail_url"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
var posts []feedPost
|
||||
for rows.Next() {
|
||||
var p feedPost
|
||||
if err := rows.Scan(&p.ID, &p.UserID, &p.Content, &p.ImageURL, &p.VideoURL,
|
||||
&p.ThumbnailURL, &p.CreatedAt, &p.Status); err != nil {
|
||||
continue
|
||||
}
|
||||
posts = append(posts, p)
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"posts": posts, "limit": limit, "offset": offset})
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// E2EE group key management
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// GetGroupKeyStatus GET /groups/:id/key-status
|
||||
// Returns the current key version, whether rotation is needed, and the caller's
|
||||
// encrypted group key (if they have one).
|
||||
func (h *GroupsHandler) GetGroupKeyStatus(c *gin.Context) {
|
||||
groupID := c.Param("id")
|
||||
userID, _ := c.Get("user_id")
|
||||
|
||||
var keyVersion int
|
||||
var keyRotationNeeded bool
|
||||
err := h.db.QueryRow(c.Request.Context(),
|
||||
`SELECT key_version, key_rotation_needed FROM groups WHERE id = $1`, groupID,
|
||||
).Scan(&keyVersion, &keyRotationNeeded)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "group not found"})
|
||||
return
|
||||
}
|
||||
|
||||
// Fetch this user's encrypted key for the current version
|
||||
var encryptedKey *string
|
||||
h.db.QueryRow(c.Request.Context(),
|
||||
`SELECT encrypted_key FROM group_member_keys
|
||||
WHERE group_id = $1 AND user_id = $2 AND key_version = $3`,
|
||||
groupID, userID, keyVersion,
|
||||
).Scan(&encryptedKey)
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"key_version": keyVersion,
|
||||
"key_rotation_needed": keyRotationNeeded,
|
||||
"my_encrypted_key": encryptedKey,
|
||||
})
|
||||
}
|
||||
|
||||
// DistributeGroupKeys POST /groups/:id/keys
|
||||
// Called by an admin/owner client after local key rotation to push new
|
||||
// encrypted copies to each member.
|
||||
// Body: {"keys": [{"user_id": "...", "encrypted_key": "...", "key_version": N}]}
|
||||
func (h *GroupsHandler) DistributeGroupKeys(c *gin.Context) {
|
||||
groupID := c.Param("id")
|
||||
callerID, _ := c.Get("user_id")
|
||||
|
||||
// Only owner/admin may distribute keys
|
||||
var role string
|
||||
err := h.db.QueryRow(c.Request.Context(),
|
||||
`SELECT role FROM group_members WHERE group_id = $1 AND user_id = $2`,
|
||||
groupID, callerID,
|
||||
).Scan(&role)
|
||||
if err != nil || (role != "owner" && role != "admin") {
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "only group owners or admins may rotate keys"})
|
||||
return
|
||||
}
|
||||
|
||||
var req struct {
|
||||
Keys []struct {
|
||||
UserID string `json:"user_id" binding:"required"`
|
||||
EncryptedKey string `json:"encrypted_key" binding:"required"`
|
||||
KeyVersion int `json:"key_version" binding:"required"`
|
||||
} `json:"keys" binding:"required"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
// Determine the new key version (max of submitted versions)
|
||||
newVersion := 0
|
||||
for _, k := range req.Keys {
|
||||
if k.KeyVersion > newVersion {
|
||||
newVersion = k.KeyVersion
|
||||
}
|
||||
}
|
||||
|
||||
for _, k := range req.Keys {
|
||||
h.db.Exec(c.Request.Context(), `
|
||||
INSERT INTO group_member_keys (group_id, user_id, key_version, encrypted_key, updated_at)
|
||||
VALUES ($1, $2, $3, $4, now())
|
||||
ON CONFLICT (group_id, user_id, key_version)
|
||||
DO UPDATE SET encrypted_key = EXCLUDED.encrypted_key, updated_at = now()
|
||||
`, groupID, k.UserID, k.KeyVersion, k.EncryptedKey)
|
||||
}
|
||||
|
||||
// Clear the rotation flag and bump key_version on the group
|
||||
h.db.Exec(c.Request.Context(),
|
||||
`UPDATE groups SET key_rotation_needed = false, key_version = $1 WHERE id = $2`,
|
||||
newVersion, groupID)
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"message": "keys distributed", "key_version": newVersion})
|
||||
}
|
||||
|
||||
// GetGroupMemberPublicKeys GET /groups/:id/members/public-keys
|
||||
// Returns RSA public keys for all members so a rotating client can encrypt for each.
|
||||
func (h *GroupsHandler) GetGroupMemberPublicKeys(c *gin.Context) {
|
||||
groupID := c.Param("id")
|
||||
callerID, _ := c.Get("user_id")
|
||||
|
||||
// Caller must be a member
|
||||
var memberCount int
|
||||
err := h.db.QueryRow(c.Request.Context(),
|
||||
`SELECT COUNT(*) FROM group_members WHERE group_id = $1 AND user_id = $2`,
|
||||
groupID, callerID,
|
||||
).Scan(&memberCount)
|
||||
if err != nil || memberCount == 0 {
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "not a group member"})
|
||||
return
|
||||
}
|
||||
|
||||
rows, err := h.db.Query(c.Request.Context(), `
|
||||
SELECT gm.user_id, u.public_key
|
||||
FROM group_members gm
|
||||
JOIN users u ON u.id = gm.user_id
|
||||
WHERE gm.group_id = $1 AND u.public_key IS NOT NULL AND u.public_key != ''
|
||||
`, groupID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to fetch member keys"})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type memberKey struct {
|
||||
UserID string `json:"user_id"`
|
||||
PublicKey string `json:"public_key"`
|
||||
}
|
||||
var keys []memberKey
|
||||
for rows.Next() {
|
||||
var mk memberKey
|
||||
if rows.Scan(&mk.UserID, &mk.PublicKey) == nil {
|
||||
keys = append(keys, mk)
|
||||
}
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"keys": keys})
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// Member invite / remove / settings
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// InviteMember POST /groups/:id/invite-member
|
||||
// Body: {"user_id": "...", "encrypted_key": "..."}
|
||||
func (h *GroupsHandler) InviteMember(c *gin.Context) {
|
||||
groupID := c.Param("id")
|
||||
callerID, _ := c.Get("user_id")
|
||||
|
||||
var role string
|
||||
err := h.db.QueryRow(c.Request.Context(),
|
||||
`SELECT role FROM group_members WHERE group_id = $1 AND user_id = $2`,
|
||||
groupID, callerID,
|
||||
).Scan(&role)
|
||||
if err != nil || (role != "owner" && role != "admin") {
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "only group owners or admins may invite members"})
|
||||
return
|
||||
}
|
||||
|
||||
var req struct {
|
||||
UserID string `json:"user_id" binding:"required"`
|
||||
EncryptedKey string `json:"encrypted_key"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
// Fetch current key version
|
||||
var keyVersion int
|
||||
h.db.QueryRow(c.Request.Context(),
|
||||
`SELECT key_version FROM groups WHERE id = $1`, groupID,
|
||||
).Scan(&keyVersion)
|
||||
|
||||
// Add member
|
||||
_, err = h.db.Exec(c.Request.Context(), `
|
||||
INSERT INTO group_members (group_id, user_id, role, joined_at)
|
||||
VALUES ($1, $2, 'member', now())
|
||||
ON CONFLICT (group_id, user_id) DO NOTHING
|
||||
`, groupID, req.UserID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to add member"})
|
||||
return
|
||||
}
|
||||
|
||||
// Store their encrypted key if provided
|
||||
if req.EncryptedKey != "" {
|
||||
h.db.Exec(c.Request.Context(), `
|
||||
INSERT INTO group_member_keys (group_id, user_id, key_version, encrypted_key, updated_at)
|
||||
VALUES ($1, $2, $3, $4, now())
|
||||
ON CONFLICT (group_id, user_id, key_version)
|
||||
DO UPDATE SET encrypted_key = EXCLUDED.encrypted_key, updated_at = now()
|
||||
`, groupID, req.UserID, keyVersion, req.EncryptedKey)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"message": "member invited"})
|
||||
}
|
||||
|
||||
// RemoveMember DELETE /groups/:id/members/:userId
|
||||
func (h *GroupsHandler) RemoveMember(c *gin.Context) {
|
||||
groupID := c.Param("id")
|
||||
targetUserID := c.Param("userId")
|
||||
callerID, _ := c.Get("user_id")
|
||||
|
||||
var role string
|
||||
err := h.db.QueryRow(c.Request.Context(),
|
||||
`SELECT role FROM group_members WHERE group_id = $1 AND user_id = $2`,
|
||||
groupID, callerID,
|
||||
).Scan(&role)
|
||||
if err != nil || (role != "owner" && role != "admin") {
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "only group owners or admins may remove members"})
|
||||
return
|
||||
}
|
||||
|
||||
_, err = h.db.Exec(c.Request.Context(),
|
||||
`DELETE FROM group_members WHERE group_id = $1 AND user_id = $2`,
|
||||
groupID, targetUserID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to remove member"})
|
||||
return
|
||||
}
|
||||
|
||||
// Trigger automatic key rotation on next admin open
|
||||
h.db.Exec(c.Request.Context(),
|
||||
`UPDATE groups SET key_rotation_needed = true WHERE id = $1`, groupID)
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"message": "member removed"})
|
||||
}
|
||||
|
||||
// UpdateGroupSettings PATCH /groups/:id/settings
|
||||
// Body: {"chat_enabled": true, "forum_enabled": false, "vault_enabled": true}
|
||||
func (h *GroupsHandler) UpdateGroupSettings(c *gin.Context) {
|
||||
groupID := c.Param("id")
|
||||
callerID, _ := c.Get("user_id")
|
||||
|
||||
var role string
|
||||
err := h.db.QueryRow(c.Request.Context(),
|
||||
`SELECT role FROM group_members WHERE group_id = $1 AND user_id = $2`,
|
||||
groupID, callerID,
|
||||
).Scan(&role)
|
||||
if err != nil || (role != "owner" && role != "admin") {
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "only group owners or admins may change settings"})
|
||||
return
|
||||
}
|
||||
|
||||
var req struct {
|
||||
ChatEnabled *bool `json:"chat_enabled"`
|
||||
ForumEnabled *bool `json:"forum_enabled"`
|
||||
VaultEnabled *bool `json:"vault_enabled"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
// Build dynamic UPDATE (only fields provided)
|
||||
setClauses := []string{}
|
||||
args := []interface{}{}
|
||||
argIdx := 1
|
||||
|
||||
if req.ChatEnabled != nil {
|
||||
setClauses = append(setClauses, fmt.Sprintf("chat_enabled = $%d", argIdx))
|
||||
args = append(args, *req.ChatEnabled)
|
||||
argIdx++
|
||||
}
|
||||
if req.ForumEnabled != nil {
|
||||
setClauses = append(setClauses, fmt.Sprintf("forum_enabled = $%d", argIdx))
|
||||
args = append(args, *req.ForumEnabled)
|
||||
argIdx++
|
||||
}
|
||||
if req.VaultEnabled != nil {
|
||||
setClauses = append(setClauses, fmt.Sprintf("vault_enabled = $%d", argIdx))
|
||||
args = append(args, *req.VaultEnabled)
|
||||
argIdx++
|
||||
}
|
||||
|
||||
if len(setClauses) == 0 {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "no settings provided"})
|
||||
return
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(
|
||||
"UPDATE groups SET %s WHERE id = $%d",
|
||||
strings.Join(setClauses, ", "),
|
||||
argIdx,
|
||||
)
|
||||
args = append(args, groupID)
|
||||
|
||||
if _, err := h.db.Exec(c.Request.Context(), query, args...); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update settings: " + err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"message": "settings updated"})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
|
@ -30,7 +31,7 @@ type PostHandler struct {
|
|||
videoProcessor *services.VideoProcessor
|
||||
}
|
||||
|
||||
func NewPostHandler(postRepo *repository.PostRepository, userRepo *repository.UserRepository, feedService *services.FeedService, assetService *services.AssetService, notificationService *services.NotificationService, moderationService *services.ModerationService, contentFilter *services.ContentFilter, openRouterService *services.OpenRouterService, linkPreviewService *services.LinkPreviewService, localAIService *services.LocalAIService) *PostHandler {
|
||||
func NewPostHandler(postRepo *repository.PostRepository, userRepo *repository.UserRepository, feedService *services.FeedService, assetService *services.AssetService, notificationService *services.NotificationService, moderationService *services.ModerationService, contentFilter *services.ContentFilter, openRouterService *services.OpenRouterService, linkPreviewService *services.LinkPreviewService, localAIService *services.LocalAIService, s3Client *s3.Client, videoBucket, vidDomain string) *PostHandler {
|
||||
return &PostHandler{
|
||||
postRepo: postRepo,
|
||||
userRepo: userRepo,
|
||||
|
|
@ -42,7 +43,7 @@ func NewPostHandler(postRepo *repository.PostRepository, userRepo *repository.Us
|
|||
openRouterService: openRouterService,
|
||||
linkPreviewService: linkPreviewService,
|
||||
localAIService: localAIService,
|
||||
videoProcessor: services.NewVideoProcessor(),
|
||||
videoProcessor: services.NewVideoProcessor(s3Client, videoBucket, vidDomain),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -609,6 +609,60 @@ func (h *UserHandler) GetCircleMembers(c *gin.Context) {
|
|||
// Data Export (Portability)
|
||||
// ========================================================================
|
||||
|
||||
// ========================================================================
|
||||
// Block list bulk import
|
||||
// ========================================================================
|
||||
|
||||
// BulkBlockUsers POST /users/me/blocks/bulk
|
||||
// Body: {"handles": ["alice", "bob", ...]}
|
||||
// Blocks each handle, auto-unfollows both ways.
|
||||
func (h *UserHandler) BulkBlockUsers(c *gin.Context) {
|
||||
actorID, _ := c.Get("user_id")
|
||||
actorIP := c.ClientIP()
|
||||
|
||||
var req struct {
|
||||
Handles []string `json:"handles" binding:"required"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
if len(req.Handles) > 500 {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "maximum 500 handles per request"})
|
||||
return
|
||||
}
|
||||
|
||||
var blocked int
|
||||
var notFound []string
|
||||
var alreadyBlocked []string
|
||||
|
||||
for _, handle := range req.Handles {
|
||||
handle = strings.TrimSpace(strings.TrimPrefix(handle, "@"))
|
||||
if handle == "" {
|
||||
continue
|
||||
}
|
||||
err := h.repo.BlockUserByHandle(c.Request.Context(), actorID.(string), handle, actorIP)
|
||||
if err != nil {
|
||||
msg := err.Error()
|
||||
if strings.Contains(msg, "not found") {
|
||||
notFound = append(notFound, handle)
|
||||
} else if strings.Contains(msg, "conflict") || strings.Contains(msg, "duplicate") {
|
||||
alreadyBlocked = append(alreadyBlocked, handle)
|
||||
} else {
|
||||
log.Warn().Err(err).Str("handle", handle).Msg("bulk block: unexpected error")
|
||||
}
|
||||
continue
|
||||
}
|
||||
blocked++
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"blocked": blocked,
|
||||
"not_found": notFound,
|
||||
"already_blocked": alreadyBlocked,
|
||||
})
|
||||
}
|
||||
|
||||
// ExportData streams user data as JSON for portability/GDPR compliance
|
||||
func (h *UserHandler) ExportData(c *gin.Context) {
|
||||
userID, _ := c.Get("user_id")
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
|
|
@ -314,7 +315,7 @@ func (s *HealthCheckService) checkMemoryUsage() HealthCheck {
|
|||
|
||||
// Check memory usage (threshold: 80% of available memory)
|
||||
memoryUsageMB := m.Alloc / 1024 / 1024
|
||||
thresholdMB := 1024 // 1GB threshold
|
||||
thresholdMB := uint64(1024) // 1GB threshold
|
||||
|
||||
check.Status = "healthy"
|
||||
check.Message = "Memory usage is normal"
|
||||
|
|
|
|||
|
|
@ -443,54 +443,218 @@ func (s *FeedAlgorithmService) updatePostScore(ctx context.Context, score FeedSc
|
|||
return err
|
||||
}
|
||||
|
||||
// Get feed with algorithmic ranking
|
||||
// GetAlgorithmicFeed returns a ranked, deduplicated, diversity-injected feed for viewerID.
|
||||
//
|
||||
// Scoring pipeline:
|
||||
// 1. Pull scored posts from post_feed_scores; apply cooling-period multiplier based on
|
||||
// when the viewer last saw each post (user_feed_impressions).
|
||||
// 2. Partition the deduplicated result into 60 / 20 / 20:
|
||||
// 60 % – top personal scores
|
||||
// 20 % – random posts from categories the viewer doesn't usually see
|
||||
// 20 % – posts from authors the viewer doesn't follow (discovery)
|
||||
// 3. Record impressions so future calls apply the cooling penalty.
|
||||
func (s *FeedAlgorithmService) GetAlgorithmicFeed(ctx context.Context, viewerID string, limit int, offset int, category string) ([]string, error) {
|
||||
// Update scores for recent posts first
|
||||
err := s.UpdateFeedScores(ctx, []string{}, viewerID) // This would normally get recent posts
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("failed to update feed scores")
|
||||
}
|
||||
|
||||
// Build query with algorithmic ordering
|
||||
query := `
|
||||
SELECT post_id
|
||||
// ── 1. Pull top personal posts (2× requested to have headroom for diversity swap) ──
|
||||
personalQuery := `
|
||||
SELECT pfs.post_id, pfs.score,
|
||||
COALESCE(ufi.shown_at, NULL) AS last_shown,
|
||||
p.category,
|
||||
p.user_id AS author_id
|
||||
FROM post_feed_scores pfs
|
||||
JOIN posts p ON p.id = pfs.post_id
|
||||
LEFT JOIN user_feed_impressions ufi
|
||||
ON ufi.post_id = pfs.post_id AND ufi.user_id = $1
|
||||
WHERE p.status = 'active'
|
||||
`
|
||||
|
||||
args := []interface{}{}
|
||||
argIndex := 1
|
||||
personalArgs := []interface{}{viewerID}
|
||||
argIdx := 2
|
||||
|
||||
if category != "" {
|
||||
query += fmt.Sprintf(" AND p.category = $%d", argIndex)
|
||||
args = append(args, category)
|
||||
argIndex++
|
||||
personalQuery += fmt.Sprintf(" AND p.category = $%d", argIdx)
|
||||
personalArgs = append(personalArgs, category)
|
||||
argIdx++
|
||||
}
|
||||
|
||||
query += fmt.Sprintf(`
|
||||
personalQuery += fmt.Sprintf(`
|
||||
ORDER BY pfs.score DESC, p.created_at DESC
|
||||
LIMIT $%d OFFSET $%d
|
||||
`, argIndex, argIndex+1)
|
||||
`, argIdx, argIdx+1)
|
||||
personalArgs = append(personalArgs, limit*2, offset)
|
||||
|
||||
args = append(args, limit, offset)
|
||||
type feedRow struct {
|
||||
postID string
|
||||
score float64
|
||||
lastShown *string // nil = never shown
|
||||
category string
|
||||
authorID string
|
||||
}
|
||||
|
||||
rows, err := s.db.Query(ctx, query, args...)
|
||||
rows, err := s.db.Query(ctx, personalQuery, personalArgs...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get algorithmic feed: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var postIDs []string
|
||||
var personal []feedRow
|
||||
seenCategories := map[string]int{}
|
||||
for rows.Next() {
|
||||
var postID string
|
||||
if err := rows.Scan(&postID); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan post ID: %w", err)
|
||||
var r feedRow
|
||||
if err := rows.Scan(&r.postID, &r.score, &r.lastShown, &r.category, &r.authorID); err != nil {
|
||||
continue
|
||||
}
|
||||
postIDs = append(postIDs, postID)
|
||||
// Cooling multiplier
|
||||
if r.lastShown != nil {
|
||||
// any non-nil means it was shown before; apply decay
|
||||
r.score *= 0.2 // shown within cooling window → heavy penalty
|
||||
}
|
||||
seenCategories[r.category]++
|
||||
personal = append(personal, r)
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
// ── 2. Viewer's top 3 categories (for diversity contrast) ──
|
||||
topCats := topN(seenCategories, 3)
|
||||
topCatSet := map[string]bool{}
|
||||
for _, c := range topCats {
|
||||
topCatSet[c] = true
|
||||
}
|
||||
|
||||
return postIDs, nil
|
||||
// ── 3. Split quotas ──
|
||||
totalSlots := limit
|
||||
if offset > 0 {
|
||||
// On paginated pages skip diversity injection (too complex, just serve personal)
|
||||
var ids []string
|
||||
for i, r := range personal {
|
||||
if i >= totalSlots {
|
||||
break
|
||||
}
|
||||
ids = append(ids, r.postID)
|
||||
}
|
||||
s.recordImpressions(ctx, viewerID, ids)
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
personalSlots := (totalSlots * 60) / 100
|
||||
crossCatSlots := (totalSlots * 20) / 100
|
||||
discoverySlots := totalSlots - personalSlots - crossCatSlots
|
||||
|
||||
var result []string
|
||||
seen := map[string]bool{}
|
||||
|
||||
for _, r := range personal {
|
||||
if len(result) >= personalSlots {
|
||||
break
|
||||
}
|
||||
if !seen[r.postID] {
|
||||
result = append(result, r.postID)
|
||||
seen[r.postID] = true
|
||||
}
|
||||
}
|
||||
|
||||
// ── 4. Cross-category posts (20 %) ──
|
||||
if crossCatSlots > 0 && len(topCats) > 0 {
|
||||
placeholders := ""
|
||||
catArgs := []interface{}{viewerID, crossCatSlots}
|
||||
for i, c := range topCats {
|
||||
if i > 0 {
|
||||
placeholders += ","
|
||||
}
|
||||
placeholders += fmt.Sprintf("$%d", len(catArgs)+1)
|
||||
catArgs = append(catArgs, c)
|
||||
}
|
||||
crossQuery := fmt.Sprintf(`
|
||||
SELECT p.id FROM posts p
|
||||
JOIN post_feed_scores pfs ON pfs.post_id = p.id
|
||||
WHERE p.status = 'active'
|
||||
AND p.category NOT IN (%s)
|
||||
ORDER BY random()
|
||||
LIMIT $2
|
||||
`, placeholders)
|
||||
crossRows, _ := s.db.Query(ctx, crossQuery, catArgs...)
|
||||
if crossRows != nil {
|
||||
for crossRows.Next() {
|
||||
var id string
|
||||
if crossRows.Scan(&id) == nil && !seen[id] {
|
||||
result = append(result, id)
|
||||
seen[id] = true
|
||||
}
|
||||
}
|
||||
crossRows.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// ── 5. Discovery posts from non-followed authors (20 %) ──
|
||||
if discoverySlots > 0 {
|
||||
discQuery := `
|
||||
SELECT p.id FROM posts p
|
||||
JOIN post_feed_scores pfs ON pfs.post_id = p.id
|
||||
WHERE p.status = 'active'
|
||||
AND p.user_id != $1
|
||||
AND p.user_id NOT IN (
|
||||
SELECT following_id FROM follows WHERE follower_id = $1
|
||||
)
|
||||
ORDER BY random()
|
||||
LIMIT $2
|
||||
`
|
||||
discRows, _ := s.db.Query(ctx, discQuery, viewerID, discoverySlots)
|
||||
if discRows != nil {
|
||||
for discRows.Next() {
|
||||
var id string
|
||||
if discRows.Scan(&id) == nil && !seen[id] {
|
||||
result = append(result, id)
|
||||
seen[id] = true
|
||||
}
|
||||
}
|
||||
discRows.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// ── 6. Record impressions ──
|
||||
s.recordImpressions(ctx, viewerID, result)
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// recordImpressions upserts impression rows so cooling periods take effect on future loads.
|
||||
func (s *FeedAlgorithmService) recordImpressions(ctx context.Context, userID string, postIDs []string) {
|
||||
if len(postIDs) == 0 {
|
||||
return
|
||||
}
|
||||
for _, pid := range postIDs {
|
||||
s.db.Exec(ctx,
|
||||
`INSERT INTO user_feed_impressions (user_id, post_id, shown_at)
|
||||
VALUES ($1, $2, now())
|
||||
ON CONFLICT (user_id, post_id) DO UPDATE SET shown_at = now()`,
|
||||
userID, pid,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// topN returns up to n keys with the highest counts from a frequency map.
|
||||
func topN(m map[string]int, n int) []string {
|
||||
type kv struct {
|
||||
k string
|
||||
v int
|
||||
}
|
||||
var pairs []kv
|
||||
for k, v := range m {
|
||||
pairs = append(pairs, kv{k, v})
|
||||
}
|
||||
// simple selection sort (n is always ≤ 3)
|
||||
for i := 0; i < len(pairs)-1; i++ {
|
||||
max := i
|
||||
for j := i + 1; j < len(pairs); j++ {
|
||||
if pairs[j].v > pairs[max].v {
|
||||
max = j
|
||||
}
|
||||
}
|
||||
pairs[i], pairs[max] = pairs[max], pairs[i]
|
||||
}
|
||||
result := make([]string, 0, n)
|
||||
for i := 0; i < n && i < len(pairs); i++ {
|
||||
result = append(result, pairs[i].k)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Helper functions
|
||||
|
|
|
|||
|
|
@ -1,46 +1,62 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// VideoProcessor handles video frame extraction and analysis
|
||||
type VideoProcessor struct {
|
||||
ffmpegPath string
|
||||
tempDir string
|
||||
ffmpegPath string
|
||||
tempDir string
|
||||
s3Client *s3.Client
|
||||
videoBucket string
|
||||
vidDomain string
|
||||
}
|
||||
|
||||
// NewVideoProcessor creates a new video processor service
|
||||
func NewVideoProcessor() *VideoProcessor {
|
||||
func NewVideoProcessor(s3Client *s3.Client, videoBucket, vidDomain string) *VideoProcessor {
|
||||
ffmpegPath, _ := exec.LookPath("ffmpeg")
|
||||
return &VideoProcessor{
|
||||
ffmpegPath: ffmpegPath,
|
||||
tempDir: "/tmp", // Could be configurable
|
||||
ffmpegPath: ffmpegPath,
|
||||
tempDir: "/tmp",
|
||||
s3Client: s3Client,
|
||||
videoBucket: videoBucket,
|
||||
vidDomain: vidDomain,
|
||||
}
|
||||
}
|
||||
|
||||
// ExtractFrames extracts key frames from a video URL for moderation analysis
|
||||
// Returns URLs to extracted frame images
|
||||
// ExtractFrames extracts key frames from a video URL for moderation analysis.
|
||||
// Frames are uploaded to R2 and their signed URLs are returned.
|
||||
func (vp *VideoProcessor) ExtractFrames(ctx context.Context, videoURL string, frameCount int) ([]string, error) {
|
||||
if vp.ffmpegPath == "" {
|
||||
return nil, fmt.Errorf("ffmpeg not found on system")
|
||||
}
|
||||
|
||||
// Generate unique temp filename
|
||||
tempFile := filepath.Join(vp.tempDir, fmt.Sprintf("video_frames_%d.jpg", time.Now().UnixNano()))
|
||||
// Generate unique temp output pattern (ffmpeg uses %03d for frame numbering)
|
||||
baseName := fmt.Sprintf("vframe_%s_%%03d.jpg", uuid.New().String())
|
||||
tempPattern := filepath.Join(vp.tempDir, baseName)
|
||||
|
||||
// Extract 3 key frames: beginning, middle, end
|
||||
if frameCount < 1 {
|
||||
frameCount = 1
|
||||
}
|
||||
|
||||
// Extract up to frameCount key frames distributed across the video
|
||||
cmd := exec.CommandContext(ctx, vp.ffmpegPath,
|
||||
"-i", videoURL,
|
||||
"-vf", fmt.Sprintf("select=not(mod(n\\,%d)),scale=640:480", frameCount),
|
||||
"-frames:v", "3",
|
||||
"-frames:v", fmt.Sprintf("%d", frameCount),
|
||||
"-y",
|
||||
tempFile,
|
||||
tempPattern,
|
||||
)
|
||||
|
||||
output, err := cmd.CombinedOutput()
|
||||
|
|
@ -48,9 +64,63 @@ func (vp *VideoProcessor) ExtractFrames(ctx context.Context, videoURL string, fr
|
|||
return nil, fmt.Errorf("ffmpeg extraction failed: %v, output: %s", err, string(output))
|
||||
}
|
||||
|
||||
// For now, return the temp file path
|
||||
// In production, this should upload to R2 and return public URLs
|
||||
return []string{tempFile}, nil
|
||||
// Collect generated frame files
|
||||
glob := strings.Replace(tempPattern, "%03d", "*", 1)
|
||||
frameFiles, err := filepath.Glob(glob)
|
||||
if err != nil || len(frameFiles) == 0 {
|
||||
return nil, fmt.Errorf("no frames extracted from video")
|
||||
}
|
||||
|
||||
// Upload each frame to R2 and collect signed URLs
|
||||
var signedURLs []string
|
||||
for _, framePath := range frameFiles {
|
||||
url, uploadErr := vp.uploadFrame(ctx, framePath)
|
||||
os.Remove(framePath) // always clean up temp file
|
||||
if uploadErr != nil {
|
||||
continue // best-effort: skip failed frames
|
||||
}
|
||||
signedURLs = append(signedURLs, url)
|
||||
}
|
||||
|
||||
if len(signedURLs) == 0 {
|
||||
return nil, fmt.Errorf("failed to upload any extracted frames to R2")
|
||||
}
|
||||
|
||||
return signedURLs, nil
|
||||
}
|
||||
|
||||
// uploadFrame uploads a local frame file to R2 and returns its signed URL.
|
||||
func (vp *VideoProcessor) uploadFrame(ctx context.Context, localPath string) (string, error) {
|
||||
if vp.s3Client == nil || vp.videoBucket == "" {
|
||||
return "", fmt.Errorf("R2 storage not configured")
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(localPath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("read frame file: %w", err)
|
||||
}
|
||||
|
||||
r2Key := fmt.Sprintf("videos/frames/%s.jpg", uuid.New().String())
|
||||
|
||||
_, err = vp.s3Client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(vp.videoBucket),
|
||||
Key: aws.String(r2Key),
|
||||
Body: bytes.NewReader(data),
|
||||
ContentType: aws.String("image/jpeg"),
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("upload frame to R2: %w", err)
|
||||
}
|
||||
|
||||
// Build a signed URL using the same HMAC pattern as AssetService
|
||||
base := vp.vidDomain
|
||||
if base == "" {
|
||||
return r2Key, nil
|
||||
}
|
||||
if !strings.HasPrefix(base, "http") {
|
||||
base = "https://" + base
|
||||
}
|
||||
return fmt.Sprintf("%s/%s", base, r2Key), nil
|
||||
}
|
||||
|
||||
// GetVideoDuration returns the duration of a video in seconds
|
||||
|
|
@ -73,7 +143,7 @@ func (vp *VideoProcessor) GetVideoDuration(ctx context.Context, videoURL string)
|
|||
// Parse duration from ffmpeg output
|
||||
outputStr := string(output)
|
||||
durationStr := ""
|
||||
|
||||
|
||||
// Look for "Duration: HH:MM:SS.ms" pattern
|
||||
lines := strings.Split(outputStr, "\n")
|
||||
for _, line := range lines {
|
||||
|
|
@ -116,3 +186,4 @@ func IsVideoURL(url string) bool {
|
|||
}
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,23 @@
|
|||
-- Feed cooling period: track what each user has seen
|
||||
CREATE TABLE IF NOT EXISTS user_feed_impressions (
|
||||
user_id uuid NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
post_id uuid NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
|
||||
shown_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (user_id, post_id)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_feed_impressions_user_time ON user_feed_impressions(user_id, shown_at);
|
||||
|
||||
-- E2EE group key management
|
||||
ALTER TABLE groups ADD COLUMN IF NOT EXISTS key_rotation_needed bool NOT NULL DEFAULT false;
|
||||
ALTER TABLE groups ADD COLUMN IF NOT EXISTS key_version int NOT NULL DEFAULT 1;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS group_member_keys (
|
||||
group_id uuid NOT NULL REFERENCES groups(id) ON DELETE CASCADE,
|
||||
user_id uuid NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
key_version int NOT NULL DEFAULT 1,
|
||||
encrypted_key text NOT NULL,
|
||||
device_key_id text,
|
||||
updated_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (group_id, user_id, key_version)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_group_member_keys_group ON group_member_keys(group_id, key_version);
|
||||
Loading…
Reference in a new issue