diff --git a/ddl/migrations/0225_track_trending_scores_index_cleanup.sql b/ddl/migrations/0225_track_trending_scores_index_cleanup.sql new file mode 100644 index 00000000..a9f2c944 --- /dev/null +++ b/ddl/migrations/0225_track_trending_scores_index_cleanup.sql @@ -0,0 +1,22 @@ +-- Keep the current score read paths covered while reducing write amplification +-- for TrendingJob's bulk track_trending_scores refresh. This file is +-- intentionally not wrapped in BEGIN/COMMIT because CREATE/DROP INDEX +-- CONCURRENTLY cannot run inside a transaction block. + +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_tts_tracks_pnagd_genre_time_score_desc + ON track_trending_scores (genre, time_range, score DESC, track_id DESC) + WHERE type = 'TRACKS' + AND version = 'pnagD'; + +COMMENT ON INDEX idx_tts_tracks_pnagd_genre_time_score_desc IS + 'Covers current TRACKS/pnagD genre-filtered trending reads with score desc, track_id desc ordering.'; + +-- track_trending_scores_pkey starts with track_id and still covers point +-- lookups, so this standalone index is redundant. +DROP INDEX CONCURRENTLY IF EXISTS public.ix_track_trending_scores_track_id; + +-- The new partial genre index covers the active TRACKS/pnagD slice with the +-- correct descending tie-break, so the older broad genre indexes only add +-- refresh-time write cost for stale score types/versions. +DROP INDEX CONCURRENTLY IF EXISTS public.ix_track_trending_scores_genre; +DROP INDEX CONCURRENTLY IF EXISTS public.idx_tts_genre_time_score; diff --git a/jobs/index_trending.go b/jobs/index_trending.go index 79d5e1d6..39999266 100644 --- a/jobs/index_trending.go +++ b/jobs/index_trending.go @@ -39,24 +39,29 @@ type TrendingJob struct { mutex sync.Mutex isRunning bool + now func() time.Time } func NewTrendingJob(cfg config.Config, pool database.DbPool) *TrendingJob { return &TrendingJob{ pool: pool, logger: logging.NewZapLogger(cfg).Named("TrendingJob"), + now: time.Now, } } // ScheduleEvery runs the job every `interval` until the context is cancelled. +// The interval is measured after each completed run, so a slow run does not +// leave a pending ticker event that immediately starts another DB-heavy pass. func (j *TrendingJob) ScheduleEvery(ctx context.Context, interval time.Duration) *TrendingJob { go func() { - ticker := time.NewTicker(interval) - defer ticker.Stop() + timer := time.NewTimer(interval) + defer timer.Stop() for { select { - case <-ticker.C: + case <-timer.C: j.Run(ctx) + timer.Reset(interval) case <-ctx.Done(): j.logger.Info("Job shutting down") return @@ -103,6 +108,10 @@ func (j *TrendingJob) run(ctx context.Context) error { return fmt.Errorf("trending playlists pnagD: %w", err) } + if err := j.computeTrendingTrackAllTimeIfDue(ctx, "TRACKS", "pnagD", trackParamsPnagD); err != nil { + return fmt.Errorf("trending tracks pnagD allTime: %w", err) + } + j.logger.Info("Trending scores recomputed", zap.Duration("duration", time.Since(start))) return nil } @@ -148,9 +157,12 @@ const ( trendingY = 3 trendingWk = 7 trendingMo = 30 + + trendingAllTimeCheckpoint = "track_trending_scores_all_time" + trendingAllTimeInterval = 24 * time.Hour ) -// computeTrendingTracks runs the three-time-range score insert for one +// computeTrendingTracks runs the week/month score inserts for one // (trending_type, version) combination. Mirrors update_track_score_query // from apps' track strategies. func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, version string, p trackScoreParams) error { @@ -160,20 +172,21 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v } defer tx.Rollback(ctx) - // Mirror apps' update_track_score_query: clear this (type, version)'s rows - // and repopulate with plain bulk INSERTs, all inside the one transaction - // opened above so readers never observe an empty table (the DELETE isn't - // visible until COMMIT, by which point the INSERTs have committed too). + // Mirror apps' update_track_score_query shape: clear this (type, version)'s + // rolling rows and repopulate with plain bulk INSERTs, all inside the one + // transaction opened above so readers never observe missing rolling rows. // // An earlier temp-table + ON CONFLICT DO UPDATE reconcile turned this into a // ~4M-row-per-cycle upsert — one unique-index probe and IS DISTINCT FROM // comparison per row — which took 40+ minutes and could never finish inside // the refresh interval. The blunt DELETE + append matches the proven Python - // behavior. The score expressions below are intentionally kept aligned with - // the discovery templates, then filtered so we only persist tracks that can + // behavior. allTime is intentionally refreshed separately and less often. + // The score expressions below are intentionally kept aligned with the + // discovery templates, then filtered so we only persist tracks that can // actually appear ahead of the zero-score tail. if _, err := tx.Exec(ctx, - `DELETE FROM track_trending_scores WHERE type = $1 AND version = $2`, + `DELETE FROM track_trending_scores + WHERE type = $1 AND version = $2 AND time_range IN ('week', 'month')`, trendingType, version, ); err != nil { return fmt.Errorf("delete existing: %w", err) @@ -252,7 +265,46 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v } } - // All-time uses aggregate_plays.count rather than aggregate_interval_plays. + return tx.Commit(ctx) +} + +func (j *TrendingJob) computeTrendingTrackAllTimeIfDue(ctx context.Context, trendingType, version string, p trackScoreParams) error { + now := j.now().UTC() + lastRunUnix, err := getCheckpoint(ctx, j.pool, trendingAllTimeCheckpoint) + if err != nil { + return fmt.Errorf("read allTime checkpoint: %w", err) + } + if lastRunUnix > 0 { + lastRun := time.Unix(lastRunUnix, 0).UTC() + if now.Sub(lastRun) < trendingAllTimeInterval { + j.logger.Debug("Skipping allTime trending refresh", + zap.Time("last_run", lastRun), + zap.Duration("next_run_in", trendingAllTimeInterval-now.Sub(lastRun))) + return nil + } + } + + return j.computeTrendingTrackAllTime(ctx, trendingType, version, p, now) +} + +// computeTrendingTrackAllTime refreshes only the allTime range. It is kept out +// of the hourly week/month pass because it scans aggregate_plays and changes +// slowly compared with rolling windows. +func (j *TrendingJob) computeTrendingTrackAllTime(ctx context.Context, trendingType, version string, p trackScoreParams, checkpointTime time.Time) error { + tx, err := j.pool.Begin(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + if _, err := tx.Exec(ctx, + `DELETE FROM track_trending_scores + WHERE type = $1 AND version = $2 AND time_range = 'allTime'`, + trendingType, version, + ); err != nil { + return fmt.Errorf("delete existing allTime: %w", err) + } + q := fmt.Sprintf(` INSERT INTO track_trending_scores (track_id, genre, type, version, time_range, score, created_at) @@ -293,6 +345,14 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v return fmt.Errorf("allTime range: %w", err) } + if _, err := tx.Exec(ctx, ` + INSERT INTO indexing_checkpoints (tablename, last_checkpoint) + VALUES ($1, $2) + ON CONFLICT (tablename) DO UPDATE SET last_checkpoint = EXCLUDED.last_checkpoint + `, trendingAllTimeCheckpoint, checkpointTime.Unix()); err != nil { + return fmt.Errorf("save allTime checkpoint: %w", err) + } + return tx.Commit(ctx) } diff --git a/jobs/index_trending_test.go b/jobs/index_trending_test.go index 86f8f246..bcf7fe87 100644 --- a/jobs/index_trending_test.go +++ b/jobs/index_trending_test.go @@ -62,6 +62,88 @@ func TestTrendingJob_PopulatesScores(t *testing.T) { assert.Equal(t, nTracks, nAfter, "second run must not duplicate rows") } +func TestTrendingJob_AllTimeRunsDaily(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_jobs") + defer pool.Close() + ctx := context.Background() + + now := time.Now() + database.Seed(pool, database.FixtureMap{ + "users": { + {"user_id": 1, "wallet": "0x01", "name": "Alice"}, + { + "user_id": 2, "wallet": "0x02", "name": "Bob", + "cover_photo": "cover", "profile_picture": "profile", "bio": "bio", + }, + }, + "aggregate_user": { + {"user_id": 1, "follower_count": 10}, + {"user_id": 2, "follower_count": 100}, + }, + "tracks": {{ + "track_id": 300, "owner_id": 1, "title": "Daily AllTime", + "release_date": now.Add(-2 * 24 * time.Hour), + "created_at": now.Add(-2 * 24 * time.Hour), + }}, + "plays": { + {"id": 1, "user_id": 1, "play_item_id": 300, "created_at": now.Add(-1 * time.Hour)}, + }, + "reposts": { + {"user_id": 2, "repost_item_id": 300, "repost_type": "track", "created_at": now.Add(-1 * time.Hour)}, + }, + }) + + fixedNow := time.Unix(1_700_000_000, 0) + job := NewTrendingJob(newTestConfig(), pool) + job.now = func() time.Time { return fixedNow } + require.NoError(t, job.run(ctx)) + + var firstScore float64 + require.NoError(t, pool.QueryRow(ctx, ` + SELECT score FROM track_trending_scores + WHERE track_id = 300 AND type = 'TRACKS' AND version = 'pnagD' AND time_range = 'allTime' + `).Scan(&firstScore)) + var firstCheckpoint int64 + require.NoError(t, pool.QueryRow(ctx, ` + SELECT last_checkpoint FROM indexing_checkpoints WHERE tablename = $1 + `, trendingAllTimeCheckpoint).Scan(&firstCheckpoint)) + require.Equal(t, fixedNow.Unix(), firstCheckpoint) + + _, err := pool.Exec(ctx, "UPDATE aggregate_plays SET count = count + 100 WHERE play_item_id = 300") + require.NoError(t, err) + + job.now = func() time.Time { return fixedNow.Add(time.Hour) } + require.NoError(t, job.run(ctx)) + + var skippedScore float64 + require.NoError(t, pool.QueryRow(ctx, ` + SELECT score FROM track_trending_scores + WHERE track_id = 300 AND type = 'TRACKS' AND version = 'pnagD' AND time_range = 'allTime' + `).Scan(&skippedScore)) + assert.Equal(t, firstScore, skippedScore, "allTime should not be recomputed within 24 hours") + var skippedCheckpoint int64 + require.NoError(t, pool.QueryRow(ctx, ` + SELECT last_checkpoint FROM indexing_checkpoints WHERE tablename = $1 + `, trendingAllTimeCheckpoint).Scan(&skippedCheckpoint)) + assert.Equal(t, firstCheckpoint, skippedCheckpoint, "checkpoint should stay unchanged while allTime is skipped") + + dailyNow := fixedNow.Add(25 * time.Hour) + job.now = func() time.Time { return dailyNow } + require.NoError(t, job.run(ctx)) + + var refreshedScore float64 + require.NoError(t, pool.QueryRow(ctx, ` + SELECT score FROM track_trending_scores + WHERE track_id = 300 AND type = 'TRACKS' AND version = 'pnagD' AND time_range = 'allTime' + `).Scan(&refreshedScore)) + assert.Greater(t, refreshedScore, skippedScore, "allTime should recompute after 24 hours") + var refreshedCheckpoint int64 + require.NoError(t, pool.QueryRow(ctx, ` + SELECT last_checkpoint FROM indexing_checkpoints WHERE tablename = $1 + `, trendingAllTimeCheckpoint).Scan(&refreshedCheckpoint)) + assert.Equal(t, dailyNow.Unix(), refreshedCheckpoint) +} + func TestTrendingJob_SkipsZeroTrackScores(t *testing.T) { pool := database.CreateTestDatabase(t, "test_jobs") defer pool.Close() @@ -132,13 +214,16 @@ func TestTrendingJob_PrunesStaleRows(t *testing.T) { require.NoError(t, pool.QueryRow(ctx, "SELECT COUNT(*) FROM track_trending_scores WHERE track_id = 200").Scan(&nBefore)) require.Greater(t, nBefore, 0, "expected score rows for the seeded track before deletion") - // Remove the track from the trending source set, then re-run. The job must - // clear its now-stale score rows. + // Remove the track from the trending source set, then re-run. The hourly job + // must clear stale rolling score rows; allTime is pruned on its daily pass. _, err := pool.Exec(ctx, "UPDATE tracks SET is_delete = true WHERE track_id = 200") require.NoError(t, err) require.NoError(t, job.run(ctx)) var nAfter int - require.NoError(t, pool.QueryRow(ctx, "SELECT COUNT(*) FROM track_trending_scores WHERE track_id = 200").Scan(&nAfter)) - assert.Equal(t, 0, nAfter, "stale score rows must be pruned after the track leaves the source set") + require.NoError(t, pool.QueryRow(ctx, ` + SELECT COUNT(*) FROM track_trending_scores + WHERE track_id = 200 AND time_range IN ('week', 'month') + `).Scan(&nAfter)) + assert.Equal(t, 0, nAfter, "stale rolling score rows must be pruned after the track leaves the source set") }