Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions ddl/migrations/0225_track_trending_scores_index_cleanup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- Keep the current score read paths covered while reducing write amplification
-- for TrendingJob's bulk track_trending_scores refresh. This file is
-- intentionally not wrapped in BEGIN/COMMIT because CREATE/DROP INDEX
-- CONCURRENTLY cannot run inside a transaction block.

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_tts_tracks_pnagd_genre_time_score_desc
ON track_trending_scores (genre, time_range, score DESC, track_id DESC)
WHERE type = 'TRACKS'
AND version = 'pnagD';

COMMENT ON INDEX idx_tts_tracks_pnagd_genre_time_score_desc IS
'Covers current TRACKS/pnagD genre-filtered trending reads with score desc, track_id desc ordering.';

-- track_trending_scores_pkey starts with track_id and still covers point
-- lookups, so this standalone index is redundant.
DROP INDEX CONCURRENTLY IF EXISTS public.ix_track_trending_scores_track_id;

-- The new partial genre index covers the active TRACKS/pnagD slice with the
-- correct descending tie-break, so the older broad genre indexes only add
-- refresh-time write cost for stale score types/versions.
DROP INDEX CONCURRENTLY IF EXISTS public.ix_track_trending_scores_genre;
DROP INDEX CONCURRENTLY IF EXISTS public.idx_tts_genre_time_score;
84 changes: 72 additions & 12 deletions jobs/index_trending.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,29 @@ type TrendingJob struct {

mutex sync.Mutex
isRunning bool
now func() time.Time
}

func NewTrendingJob(cfg config.Config, pool database.DbPool) *TrendingJob {
return &TrendingJob{
pool: pool,
logger: logging.NewZapLogger(cfg).Named("TrendingJob"),
now: time.Now,
}
}

// ScheduleEvery runs the job every `interval` until the context is cancelled.
// The interval is measured after each completed run, so a slow run does not
// leave a pending ticker event that immediately starts another DB-heavy pass.
func (j *TrendingJob) ScheduleEvery(ctx context.Context, interval time.Duration) *TrendingJob {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
timer := time.NewTimer(interval)
defer timer.Stop()
for {
select {
case <-ticker.C:
case <-timer.C:
j.Run(ctx)
timer.Reset(interval)
case <-ctx.Done():
j.logger.Info("Job shutting down")
return
Expand Down Expand Up @@ -103,6 +108,10 @@ func (j *TrendingJob) run(ctx context.Context) error {
return fmt.Errorf("trending playlists pnagD: %w", err)
}

if err := j.computeTrendingTrackAllTimeIfDue(ctx, "TRACKS", "pnagD", trackParamsPnagD); err != nil {
return fmt.Errorf("trending tracks pnagD allTime: %w", err)
}

j.logger.Info("Trending scores recomputed", zap.Duration("duration", time.Since(start)))
return nil
}
Expand Down Expand Up @@ -148,9 +157,12 @@ const (
trendingY = 3
trendingWk = 7
trendingMo = 30

trendingAllTimeCheckpoint = "track_trending_scores_all_time"
trendingAllTimeInterval = 24 * time.Hour
)

// computeTrendingTracks runs the three-time-range score insert for one
// computeTrendingTracks runs the week/month score inserts for one
// (trending_type, version) combination. Mirrors update_track_score_query
// from apps' track strategies.
func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, version string, p trackScoreParams) error {
Expand All @@ -160,20 +172,21 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v
}
defer tx.Rollback(ctx)

// Mirror apps' update_track_score_query: clear this (type, version)'s rows
// and repopulate with plain bulk INSERTs, all inside the one transaction
// opened above so readers never observe an empty table (the DELETE isn't
// visible until COMMIT, by which point the INSERTs have committed too).
// Mirror apps' update_track_score_query shape: clear this (type, version)'s
// rolling rows and repopulate with plain bulk INSERTs, all inside the one
// transaction opened above so readers never observe missing rolling rows.
//
// An earlier temp-table + ON CONFLICT DO UPDATE reconcile turned this into a
// ~4M-row-per-cycle upsert — one unique-index probe and IS DISTINCT FROM
// comparison per row — which took 40+ minutes and could never finish inside
// the refresh interval. The blunt DELETE + append matches the proven Python
// behavior. The score expressions below are intentionally kept aligned with
// the discovery templates, then filtered so we only persist tracks that can
// behavior. allTime is intentionally refreshed separately and less often.
// The score expressions below are intentionally kept aligned with the
// discovery templates, then filtered so we only persist tracks that can
// actually appear ahead of the zero-score tail.
if _, err := tx.Exec(ctx,
`DELETE FROM track_trending_scores WHERE type = $1 AND version = $2`,
`DELETE FROM track_trending_scores
WHERE type = $1 AND version = $2 AND time_range IN ('week', 'month')`,
trendingType, version,
); err != nil {
return fmt.Errorf("delete existing: %w", err)
Expand Down Expand Up @@ -252,7 +265,46 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v
}
}

// All-time uses aggregate_plays.count rather than aggregate_interval_plays.
return tx.Commit(ctx)
}

func (j *TrendingJob) computeTrendingTrackAllTimeIfDue(ctx context.Context, trendingType, version string, p trackScoreParams) error {
now := j.now().UTC()
lastRunUnix, err := getCheckpoint(ctx, j.pool, trendingAllTimeCheckpoint)
if err != nil {
return fmt.Errorf("read allTime checkpoint: %w", err)
}
if lastRunUnix > 0 {
lastRun := time.Unix(lastRunUnix, 0).UTC()
if now.Sub(lastRun) < trendingAllTimeInterval {
j.logger.Debug("Skipping allTime trending refresh",
zap.Time("last_run", lastRun),
zap.Duration("next_run_in", trendingAllTimeInterval-now.Sub(lastRun)))
return nil
}
}

return j.computeTrendingTrackAllTime(ctx, trendingType, version, p, now)
}

// computeTrendingTrackAllTime refreshes only the allTime range. It is kept out
// of the hourly week/month pass because it scans aggregate_plays and changes
// slowly compared with rolling windows.
func (j *TrendingJob) computeTrendingTrackAllTime(ctx context.Context, trendingType, version string, p trackScoreParams, checkpointTime time.Time) error {
tx, err := j.pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)

if _, err := tx.Exec(ctx,
`DELETE FROM track_trending_scores
WHERE type = $1 AND version = $2 AND time_range = 'allTime'`,
trendingType, version,
); err != nil {
return fmt.Errorf("delete existing allTime: %w", err)
}

q := fmt.Sprintf(`
INSERT INTO track_trending_scores
(track_id, genre, type, version, time_range, score, created_at)
Expand Down Expand Up @@ -293,6 +345,14 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v
return fmt.Errorf("allTime range: %w", err)
}

if _, err := tx.Exec(ctx, `
INSERT INTO indexing_checkpoints (tablename, last_checkpoint)
VALUES ($1, $2)
ON CONFLICT (tablename) DO UPDATE SET last_checkpoint = EXCLUDED.last_checkpoint
`, trendingAllTimeCheckpoint, checkpointTime.Unix()); err != nil {
return fmt.Errorf("save allTime checkpoint: %w", err)
}

return tx.Commit(ctx)
}

Expand Down
93 changes: 89 additions & 4 deletions jobs/index_trending_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,88 @@ func TestTrendingJob_PopulatesScores(t *testing.T) {
assert.Equal(t, nTracks, nAfter, "second run must not duplicate rows")
}

func TestTrendingJob_AllTimeRunsDaily(t *testing.T) {
pool := database.CreateTestDatabase(t, "test_jobs")
defer pool.Close()
ctx := context.Background()

now := time.Now()
database.Seed(pool, database.FixtureMap{
"users": {
{"user_id": 1, "wallet": "0x01", "name": "Alice"},
{
"user_id": 2, "wallet": "0x02", "name": "Bob",
"cover_photo": "cover", "profile_picture": "profile", "bio": "bio",
},
},
"aggregate_user": {
{"user_id": 1, "follower_count": 10},
{"user_id": 2, "follower_count": 100},
},
"tracks": {{
"track_id": 300, "owner_id": 1, "title": "Daily AllTime",
"release_date": now.Add(-2 * 24 * time.Hour),
"created_at": now.Add(-2 * 24 * time.Hour),
}},
"plays": {
{"id": 1, "user_id": 1, "play_item_id": 300, "created_at": now.Add(-1 * time.Hour)},
},
"reposts": {
{"user_id": 2, "repost_item_id": 300, "repost_type": "track", "created_at": now.Add(-1 * time.Hour)},
},
})

fixedNow := time.Unix(1_700_000_000, 0)
job := NewTrendingJob(newTestConfig(), pool)
job.now = func() time.Time { return fixedNow }
require.NoError(t, job.run(ctx))

var firstScore float64
require.NoError(t, pool.QueryRow(ctx, `
SELECT score FROM track_trending_scores
WHERE track_id = 300 AND type = 'TRACKS' AND version = 'pnagD' AND time_range = 'allTime'
`).Scan(&firstScore))
var firstCheckpoint int64
require.NoError(t, pool.QueryRow(ctx, `
SELECT last_checkpoint FROM indexing_checkpoints WHERE tablename = $1
`, trendingAllTimeCheckpoint).Scan(&firstCheckpoint))
require.Equal(t, fixedNow.Unix(), firstCheckpoint)

_, err := pool.Exec(ctx, "UPDATE aggregate_plays SET count = count + 100 WHERE play_item_id = 300")
require.NoError(t, err)

job.now = func() time.Time { return fixedNow.Add(time.Hour) }
require.NoError(t, job.run(ctx))

var skippedScore float64
require.NoError(t, pool.QueryRow(ctx, `
SELECT score FROM track_trending_scores
WHERE track_id = 300 AND type = 'TRACKS' AND version = 'pnagD' AND time_range = 'allTime'
`).Scan(&skippedScore))
assert.Equal(t, firstScore, skippedScore, "allTime should not be recomputed within 24 hours")
var skippedCheckpoint int64
require.NoError(t, pool.QueryRow(ctx, `
SELECT last_checkpoint FROM indexing_checkpoints WHERE tablename = $1
`, trendingAllTimeCheckpoint).Scan(&skippedCheckpoint))
assert.Equal(t, firstCheckpoint, skippedCheckpoint, "checkpoint should stay unchanged while allTime is skipped")

dailyNow := fixedNow.Add(25 * time.Hour)
job.now = func() time.Time { return dailyNow }
require.NoError(t, job.run(ctx))

var refreshedScore float64
require.NoError(t, pool.QueryRow(ctx, `
SELECT score FROM track_trending_scores
WHERE track_id = 300 AND type = 'TRACKS' AND version = 'pnagD' AND time_range = 'allTime'
`).Scan(&refreshedScore))
assert.Greater(t, refreshedScore, skippedScore, "allTime should recompute after 24 hours")
var refreshedCheckpoint int64
require.NoError(t, pool.QueryRow(ctx, `
SELECT last_checkpoint FROM indexing_checkpoints WHERE tablename = $1
`, trendingAllTimeCheckpoint).Scan(&refreshedCheckpoint))
assert.Equal(t, dailyNow.Unix(), refreshedCheckpoint)
}

func TestTrendingJob_SkipsZeroTrackScores(t *testing.T) {
pool := database.CreateTestDatabase(t, "test_jobs")
defer pool.Close()
Expand Down Expand Up @@ -132,13 +214,16 @@ func TestTrendingJob_PrunesStaleRows(t *testing.T) {
require.NoError(t, pool.QueryRow(ctx, "SELECT COUNT(*) FROM track_trending_scores WHERE track_id = 200").Scan(&nBefore))
require.Greater(t, nBefore, 0, "expected score rows for the seeded track before deletion")

// Remove the track from the trending source set, then re-run. The job must
// clear its now-stale score rows.
// Remove the track from the trending source set, then re-run. The hourly job
// must clear stale rolling score rows; allTime is pruned on its daily pass.
_, err := pool.Exec(ctx, "UPDATE tracks SET is_delete = true WHERE track_id = 200")
require.NoError(t, err)
require.NoError(t, job.run(ctx))

var nAfter int
require.NoError(t, pool.QueryRow(ctx, "SELECT COUNT(*) FROM track_trending_scores WHERE track_id = 200").Scan(&nAfter))
assert.Equal(t, 0, nAfter, "stale score rows must be pruned after the track leaves the source set")
require.NoError(t, pool.QueryRow(ctx, `
SELECT COUNT(*) FROM track_trending_scores
WHERE track_id = 200 AND time_range IN ('week', 'month')
`).Scan(&nAfter))
assert.Equal(t, 0, nAfter, "stale rolling score rows must be pruned after the track leaves the source set")
}
Loading