feat(feed): relay body storage + HTTP endpoints (Phase B of v2.0.0)
Phase A (the previous commit) added the on-chain foundations. Phase B
is the off-chain layer: post bodies live in a BadgerDB-backed feed
mailbox, and a full HTTP surface makes the feed usable from clients.
New components
relay/feed_mailbox.go (+ tests)
- FeedPost: body + content-type + attachment + hashtags + thread refs
- Store / Get / Delete with TTL-bounded eviction (30 days default)
- View counter (IncrementView / ViewCount) — off-chain because one
tx per view would be nonsense
- Hashtag inverted index: auto-extracts #tokens from content on
Store, lowercased + deduped + capped at 8/post
- Author chrono index: PostsByAuthor returns newest-first IDs
- RecentPostIDs: scan-by-age helper used by trending/foryou
node/api_feed.go
POST /feed/publish — author-signed body upload, returns
post_id + content_hash + size +
hashtags + estimated fee for the
follow-up on-chain CREATE_POST tx
GET /feed/post/{id} — fetch body (respects on-chain soft
delete, returns 410 when deleted)
GET /feed/post/{id}/stats — {views, likes, liked_by_me?}
POST /feed/post/{id}/view — bump the counter
GET /feed/author/{pub} — chain-authoritative post list
enriched with body + stats
GET /feed/timeline — merged feed from people the user
follows (reads chain.Following,
fetches each author's recent posts)
GET /feed/trending — top-scored posts in last 24h
(score = likes × 3 + views)
GET /feed/foryou — simple recommendations: recent posts
minus authors the user already
follows, already-liked posts, and
own posts; ranked by engagement
GET /feed/hashtag/{tag} — posts tagged with the given #tag
cmd/node/main.go wiring
- --feed-db flag (DCHAIN_FEED_DB) + --feed-ttl-days (DCHAIN_FEED_TTL_DAYS)
- Opens FeedMailbox + registers FeedRoutes alongside RelayRoutes
- Threads chain.Post / LikeCount / HasLiked / PostsByAuthor / Following
into FeedConfig so HTTP handlers can merge on-chain metadata with
off-chain body+stats.
Auth & safety
- POST /feed/publish: Ed25519 signature over "publish:<post_id>:
<content_sha256_hex>:<ts>"; ±5-minute skew window for anti-replay.
- content_hash binds body to the on-chain tx — you can't publish
body-A off-chain and commit hash-of-body-B on-chain.
- Writes wrapped in withSubmitTxGuards (rate-limit + size cap), reads
in withReadLimit — same guards as /relay.
Trending / recommendations
- V1 heuristic (likes × 3 + views) + time window. Documented as
v2.2.0 "Feed algorithm" candidate for a proper ranking layer
(half-life decay, follow-of-follow boost, hashtag collaborative).
Tests
- Store round-trip, size enforcement, hashtag indexing (case-insensitive
+ dedup), view counter increments, author chrono order, delete
cleans all indices, RecentPostIDs time-window filter.
- Full go test ./... is green (blockchain + consensus + identity +
relay + vm all pass).
Next (Phase C): client Feed tab — composer, timeline, post detail,
profile follow, For You + Trending screens.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -77,6 +77,8 @@ func main() {
|
||||
registerRelay := flag.Bool("register-relay", envBoolOr("DCHAIN_REGISTER_RELAY", false), "submit REGISTER_RELAY tx on startup (env: DCHAIN_REGISTER_RELAY)")
|
||||
relayFee := flag.Uint64("relay-fee", envUint64Or("DCHAIN_RELAY_FEE", 1_000), "relay fee per message in µT (env: DCHAIN_RELAY_FEE)")
|
||||
mailboxDB := flag.String("mailbox-db", envOr("DCHAIN_MAILBOX_DB", "./mailboxdata"), "BadgerDB directory for relay mailbox (env: DCHAIN_MAILBOX_DB)")
|
||||
feedDB := flag.String("feed-db", envOr("DCHAIN_FEED_DB", "./feeddata"), "BadgerDB directory for social-feed post bodies (env: DCHAIN_FEED_DB)")
|
||||
feedTTLDays := flag.Int("feed-ttl-days", int(envUint64Or("DCHAIN_FEED_TTL_DAYS", 30)), "how long feed posts are retained before auto-eviction (env: DCHAIN_FEED_TTL_DAYS)")
|
||||
govContractID := flag.String("governance-contract", envOr("DCHAIN_GOVERNANCE_CONTRACT", ""), "governance contract ID for dynamic chain parameters (env: DCHAIN_GOVERNANCE_CONTRACT)")
|
||||
joinSeedURL := flag.String("join", envOr("DCHAIN_JOIN", ""), "bootstrap from a running node: comma-separated HTTP URLs (env: DCHAIN_JOIN)")
|
||||
// Observer mode: the node participates in the P2P network, applies
|
||||
@@ -634,6 +636,15 @@ func main() {
|
||||
go mailbox.RunGC()
|
||||
log.Printf("[NODE] relay mailbox: %s", *mailboxDB)
|
||||
|
||||
// --- Feed mailbox (social-feed post bodies, v2.0.0) ---
|
||||
feedTTL := time.Duration(*feedTTLDays) * 24 * time.Hour
|
||||
feedMailbox, err := relay.OpenFeedMailbox(*feedDB, feedTTL)
|
||||
if err != nil {
|
||||
log.Fatalf("[NODE] feed mailbox: %v", err)
|
||||
}
|
||||
defer feedMailbox.Close()
|
||||
log.Printf("[NODE] feed mailbox: %s (TTL %d days)", *feedDB, *feedTTLDays)
|
||||
|
||||
// Push-notify bus consumers whenever a fresh envelope lands in the
|
||||
// mailbox. Clients subscribed to `inbox:<my_x25519>` (via WS) get the
|
||||
// event immediately so they no longer need to poll /relay/inbox.
|
||||
@@ -927,6 +938,16 @@ func main() {
|
||||
},
|
||||
}
|
||||
|
||||
feedConfig := node.FeedConfig{
|
||||
Mailbox: feedMailbox,
|
||||
HostingRelayPub: id.PubKeyHex(),
|
||||
GetPost: chain.Post,
|
||||
LikeCount: chain.LikeCount,
|
||||
HasLiked: chain.HasLiked,
|
||||
PostsByAuthor: chain.PostsByAuthor,
|
||||
Following: chain.Following,
|
||||
}
|
||||
|
||||
go func() {
|
||||
log.Printf("[NODE] stats API: http://0.0.0.0%s/stats", *statsAddr)
|
||||
if *disableUI {
|
||||
@@ -947,6 +968,7 @@ func main() {
|
||||
if err := stats.ListenAndServe(*statsAddr, statsQuery, func(mux *http.ServeMux) {
|
||||
node.RegisterExplorerRoutes(mux, explorerQuery, routeFlags)
|
||||
node.RegisterRelayRoutes(mux, relayConfig)
|
||||
node.RegisterFeedRoutes(mux, feedConfig)
|
||||
|
||||
// POST /api/governance/link — link deployed contracts at runtime.
|
||||
// Body: {"governance": "<id>"}
|
||||
|
||||
654
node/api_feed.go
Normal file
654
node/api_feed.go
Normal file
@@ -0,0 +1,654 @@
|
||||
package node
|
||||
|
||||
// Feed HTTP endpoints (v2.0.0).
|
||||
//
|
||||
// Mount points:
|
||||
//
|
||||
// POST /feed/publish — store a post body (authenticated)
|
||||
// GET /feed/post/{id} — fetch a post body
|
||||
// GET /feed/post/{id}/stats — {views, likes, liked_by_me?} aggregate
|
||||
// POST /feed/post/{id}/view — increment off-chain view counter
|
||||
// GET /feed/author/{pub} — ?limit=N, posts by an author
|
||||
// GET /feed/timeline — ?follower=<pub>&limit=N, merged feed of follows
|
||||
// GET /feed/trending — ?window=h&limit=N, top by likes + views
|
||||
// GET /feed/foryou — ?pub=<pub>&limit=N, recommendations
|
||||
// GET /feed/hashtag/{tag} — posts matching a hashtag
|
||||
//
|
||||
// Publish flow:
|
||||
// 1. Client POSTs {content, attachment, post_id, author, sig, ts}.
|
||||
// 2. Node verifies sig (Ed25519 over canonical bytes), hashes body,
|
||||
// stores in FeedMailbox, returns hosting_relay + content_hash + size.
|
||||
// 3. Client then submits on-chain CREATE_POST tx with that metadata.
|
||||
// Node charges the fee (base + size×byte_fee) and credits the relay.
|
||||
// 4. Subsequent GET /feed/post/{id} serves the stored body to anyone.
|
||||
//
|
||||
// Why the split? On-chain metadata gives us provable authorship + the
|
||||
// pay-for-storage incentive; off-chain body storage keeps the block
|
||||
// history small. If the hosting relay dies, the on-chain record stays
|
||||
// (with a "body unavailable" fallback on the reader side) — authors can
|
||||
// re-publish to another relay.
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go-blockchain/blockchain"
|
||||
"go-blockchain/identity"
|
||||
"go-blockchain/relay"
|
||||
)
|
||||
|
||||
// FeedConfig wires feed HTTP endpoints to the relay mailbox and the
|
||||
// chain for read-after-write queries.
|
||||
type FeedConfig struct {
|
||||
Mailbox *relay.FeedMailbox
|
||||
|
||||
// HostingRelayPub is this node's Ed25519 pubkey — returned from
|
||||
// /feed/publish so the client knows who to put in CREATE_POST tx.
|
||||
HostingRelayPub string
|
||||
|
||||
// Chain lookups (nil-safe; endpoints degrade gracefully).
|
||||
GetPost func(postID string) (*blockchain.PostRecord, error)
|
||||
LikeCount func(postID string) (uint64, error)
|
||||
HasLiked func(postID, likerPub string) (bool, error)
|
||||
PostsByAuthor func(authorPub string, limit int) ([]*blockchain.PostRecord, error)
|
||||
Following func(followerPub string) ([]string, error)
|
||||
}
|
||||
|
||||
// RegisterFeedRoutes wires feed endpoints onto mux. Writes are rate-limited
|
||||
// via withSubmitTxGuards; reads via withReadLimit (same limiters as /relay).
|
||||
func RegisterFeedRoutes(mux *http.ServeMux, cfg FeedConfig) {
|
||||
if cfg.Mailbox == nil {
|
||||
return
|
||||
}
|
||||
mux.HandleFunc("/feed/publish", withSubmitTxGuards(feedPublish(cfg)))
|
||||
mux.HandleFunc("/feed/post/", withReadLimit(feedPostRouter(cfg)))
|
||||
mux.HandleFunc("/feed/author/", withReadLimit(feedAuthor(cfg)))
|
||||
mux.HandleFunc("/feed/timeline", withReadLimit(feedTimeline(cfg)))
|
||||
mux.HandleFunc("/feed/trending", withReadLimit(feedTrending(cfg)))
|
||||
mux.HandleFunc("/feed/foryou", withReadLimit(feedForYou(cfg)))
|
||||
mux.HandleFunc("/feed/hashtag/", withReadLimit(feedHashtag(cfg)))
|
||||
}
|
||||
|
||||
// ── POST /feed/publish ────────────────────────────────────────────────────
|
||||
|
||||
// feedPublishRequest — what the client sends. Signature is Ed25519 over
|
||||
// canonical bytes: "publish:<post_id>:<content_sha256_hex>:<ts>".
|
||||
// ts must be within ±5 minutes of server clock.
|
||||
type feedPublishRequest struct {
|
||||
PostID string `json:"post_id"`
|
||||
Author string `json:"author"` // hex Ed25519
|
||||
Content string `json:"content"`
|
||||
ContentType string `json:"content_type,omitempty"`
|
||||
AttachmentB64 string `json:"attachment_b64,omitempty"`
|
||||
AttachmentMIME string `json:"attachment_mime,omitempty"`
|
||||
ReplyTo string `json:"reply_to,omitempty"`
|
||||
QuoteOf string `json:"quote_of,omitempty"`
|
||||
Sig string `json:"sig"` // base64 Ed25519 sig
|
||||
Ts int64 `json:"ts"`
|
||||
}
|
||||
|
||||
type feedPublishResponse struct {
|
||||
PostID string `json:"post_id"`
|
||||
HostingRelay string `json:"hosting_relay"`
|
||||
ContentHash string `json:"content_hash"` // hex sha256
|
||||
Size uint64 `json:"size"`
|
||||
Hashtags []string `json:"hashtags"`
|
||||
EstimatedFeeUT uint64 `json:"estimated_fee_ut"` // base + size*byte_fee
|
||||
}
|
||||
|
||||
func feedPublish(cfg FeedConfig) http.HandlerFunc {
|
||||
const publishSkewSecs = 300
|
||||
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
jsonErr(w, fmt.Errorf("method not allowed"), 405)
|
||||
return
|
||||
}
|
||||
var req feedPublishRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
jsonErr(w, fmt.Errorf("invalid JSON: %w", err), 400)
|
||||
return
|
||||
}
|
||||
if req.PostID == "" || req.Author == "" || req.Sig == "" || req.Ts == 0 {
|
||||
jsonErr(w, fmt.Errorf("post_id, author, sig, ts are required"), 400)
|
||||
return
|
||||
}
|
||||
if req.Content == "" && req.AttachmentB64 == "" {
|
||||
jsonErr(w, fmt.Errorf("post must have content or attachment"), 400)
|
||||
return
|
||||
}
|
||||
now := time.Now().Unix()
|
||||
if req.Ts < now-publishSkewSecs || req.Ts > now+publishSkewSecs {
|
||||
jsonErr(w, fmt.Errorf("ts out of range (±%ds)", publishSkewSecs), 400)
|
||||
return
|
||||
}
|
||||
if req.ReplyTo != "" && req.QuoteOf != "" {
|
||||
jsonErr(w, fmt.Errorf("reply_to and quote_of are mutually exclusive"), 400)
|
||||
return
|
||||
}
|
||||
|
||||
// Decode attachment.
|
||||
var attachment []byte
|
||||
if req.AttachmentB64 != "" {
|
||||
b, err := base64.StdEncoding.DecodeString(req.AttachmentB64)
|
||||
if err != nil {
|
||||
if b, err = base64.RawURLEncoding.DecodeString(req.AttachmentB64); err != nil {
|
||||
jsonErr(w, fmt.Errorf("attachment_b64: invalid base64"), 400)
|
||||
return
|
||||
}
|
||||
}
|
||||
attachment = b
|
||||
}
|
||||
|
||||
// Content hash binds the body to the on-chain metadata. We hash
|
||||
// content+attachment so the client can't publish body-A off-chain
|
||||
// and commit hash-of-body-B on-chain.
|
||||
h := sha256.New()
|
||||
h.Write([]byte(req.Content))
|
||||
h.Write(attachment)
|
||||
contentHash := h.Sum(nil)
|
||||
contentHashHex := hex.EncodeToString(contentHash)
|
||||
|
||||
// Verify the author's signature over the canonical publish bytes.
|
||||
msg := []byte(fmt.Sprintf("publish:%s:%s:%d", req.PostID, contentHashHex, req.Ts))
|
||||
sigBytes, err := base64.StdEncoding.DecodeString(req.Sig)
|
||||
if err != nil {
|
||||
if sigBytes, err = base64.RawURLEncoding.DecodeString(req.Sig); err != nil {
|
||||
jsonErr(w, fmt.Errorf("sig: invalid base64"), 400)
|
||||
return
|
||||
}
|
||||
}
|
||||
if _, err := hex.DecodeString(req.Author); err != nil {
|
||||
jsonErr(w, fmt.Errorf("author: invalid hex"), 400)
|
||||
return
|
||||
}
|
||||
ok, err := identity.Verify(req.Author, msg, sigBytes)
|
||||
if err != nil || !ok {
|
||||
jsonErr(w, fmt.Errorf("signature invalid"), 403)
|
||||
return
|
||||
}
|
||||
|
||||
post := &relay.FeedPost{
|
||||
PostID: req.PostID,
|
||||
Author: req.Author,
|
||||
Content: req.Content,
|
||||
ContentType: req.ContentType,
|
||||
Attachment: attachment,
|
||||
AttachmentMIME: req.AttachmentMIME,
|
||||
ReplyTo: req.ReplyTo,
|
||||
QuoteOf: req.QuoteOf,
|
||||
}
|
||||
hashtags, err := cfg.Mailbox.Store(post, req.Ts)
|
||||
if err != nil {
|
||||
if err == relay.ErrPostTooLarge {
|
||||
jsonErr(w, err, 413)
|
||||
return
|
||||
}
|
||||
jsonErr(w, err, 500)
|
||||
return
|
||||
}
|
||||
|
||||
// Report what the client should put into CREATE_POST.
|
||||
size := uint64(len(req.Content)) + uint64(len(attachment)) + 128
|
||||
fee := blockchain.BasePostFee + size*blockchain.PostByteFee
|
||||
jsonOK(w, feedPublishResponse{
|
||||
PostID: req.PostID,
|
||||
HostingRelay: cfg.HostingRelayPub,
|
||||
ContentHash: contentHashHex,
|
||||
Size: size,
|
||||
Hashtags: hashtags,
|
||||
EstimatedFeeUT: fee,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ── GET /feed/post/{id} [+ /stats subroute, POST /view] ─────────────────
|
||||
|
||||
// feedPostRouter dispatches /feed/post/{id}, /feed/post/{id}/stats,
|
||||
// /feed/post/{id}/view to the right handler.
|
||||
func feedPostRouter(cfg FeedConfig) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
rest := strings.TrimPrefix(r.URL.Path, "/feed/post/")
|
||||
rest = strings.Trim(rest, "/")
|
||||
if rest == "" {
|
||||
jsonErr(w, fmt.Errorf("post id required"), 400)
|
||||
return
|
||||
}
|
||||
parts := strings.Split(rest, "/")
|
||||
postID := parts[0]
|
||||
if len(parts) == 1 {
|
||||
feedGetPost(cfg)(w, r, postID)
|
||||
return
|
||||
}
|
||||
switch parts[1] {
|
||||
case "stats":
|
||||
feedPostStats(cfg)(w, r, postID)
|
||||
case "view":
|
||||
feedPostView(cfg)(w, r, postID)
|
||||
default:
|
||||
jsonErr(w, fmt.Errorf("unknown sub-route %q", parts[1]), 404)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type postHandler func(w http.ResponseWriter, r *http.Request, postID string)
|
||||
|
||||
func feedGetPost(cfg FeedConfig) postHandler {
|
||||
return func(w http.ResponseWriter, r *http.Request, postID string) {
|
||||
if r.Method != http.MethodGet {
|
||||
jsonErr(w, fmt.Errorf("method not allowed"), 405)
|
||||
return
|
||||
}
|
||||
post, err := cfg.Mailbox.Get(postID)
|
||||
if err != nil {
|
||||
jsonErr(w, err, 500)
|
||||
return
|
||||
}
|
||||
if post == nil {
|
||||
jsonErr(w, fmt.Errorf("post %s not found", postID), 404)
|
||||
return
|
||||
}
|
||||
// Respect on-chain soft-delete.
|
||||
if cfg.GetPost != nil {
|
||||
if rec, _ := cfg.GetPost(postID); rec != nil && rec.Deleted {
|
||||
jsonErr(w, fmt.Errorf("post %s deleted", postID), 410)
|
||||
return
|
||||
}
|
||||
}
|
||||
jsonOK(w, post)
|
||||
}
|
||||
}
|
||||
|
||||
type postStatsResponse struct {
|
||||
PostID string `json:"post_id"`
|
||||
Views uint64 `json:"views"`
|
||||
Likes uint64 `json:"likes"`
|
||||
LikedByMe *bool `json:"liked_by_me,omitempty"` // set only when ?me=<pub> given
|
||||
}
|
||||
|
||||
func feedPostStats(cfg FeedConfig) postHandler {
|
||||
return func(w http.ResponseWriter, r *http.Request, postID string) {
|
||||
if r.Method != http.MethodGet {
|
||||
jsonErr(w, fmt.Errorf("method not allowed"), 405)
|
||||
return
|
||||
}
|
||||
views, _ := cfg.Mailbox.ViewCount(postID)
|
||||
var likes uint64
|
||||
if cfg.LikeCount != nil {
|
||||
likes, _ = cfg.LikeCount(postID)
|
||||
}
|
||||
resp := postStatsResponse{
|
||||
PostID: postID,
|
||||
Views: views,
|
||||
Likes: likes,
|
||||
}
|
||||
if me := r.URL.Query().Get("me"); me != "" && cfg.HasLiked != nil {
|
||||
if liked, err := cfg.HasLiked(postID, me); err == nil {
|
||||
resp.LikedByMe = &liked
|
||||
}
|
||||
}
|
||||
jsonOK(w, resp)
|
||||
}
|
||||
}
|
||||
|
||||
func feedPostView(cfg FeedConfig) postHandler {
|
||||
return func(w http.ResponseWriter, r *http.Request, postID string) {
|
||||
if r.Method != http.MethodPost {
|
||||
jsonErr(w, fmt.Errorf("method not allowed"), 405)
|
||||
return
|
||||
}
|
||||
next, err := cfg.Mailbox.IncrementView(postID)
|
||||
if err != nil {
|
||||
jsonErr(w, err, 500)
|
||||
return
|
||||
}
|
||||
jsonOK(w, map[string]any{
|
||||
"post_id": postID,
|
||||
"views": next,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ── GET /feed/author/{pub} ────────────────────────────────────────────────
|
||||
|
||||
func feedAuthor(cfg FeedConfig) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
jsonErr(w, fmt.Errorf("method not allowed"), 405)
|
||||
return
|
||||
}
|
||||
pub := strings.TrimPrefix(r.URL.Path, "/feed/author/")
|
||||
pub = strings.Trim(pub, "/")
|
||||
if pub == "" {
|
||||
jsonErr(w, fmt.Errorf("author pub required"), 400)
|
||||
return
|
||||
}
|
||||
limit := queryInt(r, "limit", 50)
|
||||
|
||||
// Prefer chain-authoritative list (includes soft-deleted flag) so
|
||||
// clients can't be fooled by a stale relay that has an already-
|
||||
// deleted post. If chain isn't wired, fall back to relay index.
|
||||
if cfg.PostsByAuthor != nil {
|
||||
records, err := cfg.PostsByAuthor(pub, limit)
|
||||
if err != nil {
|
||||
jsonErr(w, err, 500)
|
||||
return
|
||||
}
|
||||
out := make([]feedAuthorItem, 0, len(records))
|
||||
for _, rec := range records {
|
||||
if rec == nil || rec.Deleted {
|
||||
continue
|
||||
}
|
||||
out = append(out, buildAuthorItem(cfg, rec))
|
||||
}
|
||||
jsonOK(w, map[string]any{"author": pub, "count": len(out), "posts": out})
|
||||
return
|
||||
}
|
||||
ids, err := cfg.Mailbox.PostsByAuthor(pub, limit)
|
||||
if err != nil {
|
||||
jsonErr(w, err, 500)
|
||||
return
|
||||
}
|
||||
out := expandByID(cfg, ids)
|
||||
jsonOK(w, map[string]any{"author": pub, "count": len(out), "posts": out})
|
||||
}
|
||||
}
|
||||
|
||||
// feedAuthorItem is a chain record enriched with the body and live stats.
|
||||
type feedAuthorItem struct {
|
||||
PostID string `json:"post_id"`
|
||||
Author string `json:"author"`
|
||||
Content string `json:"content,omitempty"`
|
||||
ContentType string `json:"content_type,omitempty"`
|
||||
Hashtags []string `json:"hashtags,omitempty"`
|
||||
ReplyTo string `json:"reply_to,omitempty"`
|
||||
QuoteOf string `json:"quote_of,omitempty"`
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
Size uint64 `json:"size"`
|
||||
HostingRelay string `json:"hosting_relay"`
|
||||
Views uint64 `json:"views"`
|
||||
Likes uint64 `json:"likes"`
|
||||
HasAttachment bool `json:"has_attachment"`
|
||||
}
|
||||
|
||||
func buildAuthorItem(cfg FeedConfig, rec *blockchain.PostRecord) feedAuthorItem {
|
||||
item := feedAuthorItem{
|
||||
PostID: rec.PostID,
|
||||
Author: rec.Author,
|
||||
ReplyTo: rec.ReplyTo,
|
||||
QuoteOf: rec.QuoteOf,
|
||||
CreatedAt: rec.CreatedAt,
|
||||
Size: rec.Size,
|
||||
HostingRelay: rec.HostingRelay,
|
||||
}
|
||||
if body, _ := cfg.Mailbox.Get(rec.PostID); body != nil {
|
||||
item.Content = body.Content
|
||||
item.ContentType = body.ContentType
|
||||
item.Hashtags = body.Hashtags
|
||||
item.HasAttachment = len(body.Attachment) > 0
|
||||
}
|
||||
if cfg.LikeCount != nil {
|
||||
item.Likes, _ = cfg.LikeCount(rec.PostID)
|
||||
}
|
||||
item.Views, _ = cfg.Mailbox.ViewCount(rec.PostID)
|
||||
return item
|
||||
}
|
||||
|
||||
// expandByID fetches bodies+stats for a list of post IDs (no chain record).
|
||||
func expandByID(cfg FeedConfig, ids []string) []feedAuthorItem {
|
||||
out := make([]feedAuthorItem, 0, len(ids))
|
||||
for _, id := range ids {
|
||||
body, _ := cfg.Mailbox.Get(id)
|
||||
if body == nil {
|
||||
continue
|
||||
}
|
||||
item := feedAuthorItem{
|
||||
PostID: id,
|
||||
Author: body.Author,
|
||||
Content: body.Content,
|
||||
ContentType: body.ContentType,
|
||||
Hashtags: body.Hashtags,
|
||||
ReplyTo: body.ReplyTo,
|
||||
QuoteOf: body.QuoteOf,
|
||||
CreatedAt: body.CreatedAt,
|
||||
HasAttachment: len(body.Attachment) > 0,
|
||||
}
|
||||
if cfg.LikeCount != nil {
|
||||
item.Likes, _ = cfg.LikeCount(id)
|
||||
}
|
||||
item.Views, _ = cfg.Mailbox.ViewCount(id)
|
||||
out = append(out, item)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// ── GET /feed/timeline ────────────────────────────────────────────────────
|
||||
|
||||
func feedTimeline(cfg FeedConfig) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
jsonErr(w, fmt.Errorf("method not allowed"), 405)
|
||||
return
|
||||
}
|
||||
follower := r.URL.Query().Get("follower")
|
||||
if follower == "" {
|
||||
jsonErr(w, fmt.Errorf("follower parameter required"), 400)
|
||||
return
|
||||
}
|
||||
if cfg.Following == nil || cfg.PostsByAuthor == nil {
|
||||
jsonErr(w, fmt.Errorf("timeline requires chain queries"), 503)
|
||||
return
|
||||
}
|
||||
limit := queryInt(r, "limit", 50)
|
||||
perAuthor := limit
|
||||
if perAuthor > 30 {
|
||||
perAuthor = 30
|
||||
}
|
||||
|
||||
following, err := cfg.Following(follower)
|
||||
if err != nil {
|
||||
jsonErr(w, err, 500)
|
||||
return
|
||||
}
|
||||
var merged []*blockchain.PostRecord
|
||||
for _, target := range following {
|
||||
posts, err := cfg.PostsByAuthor(target, perAuthor)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, p := range posts {
|
||||
if p != nil && !p.Deleted {
|
||||
merged = append(merged, p)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Sort newest-first, take top N.
|
||||
sort.Slice(merged, func(i, j int) bool { return merged[i].CreatedAt > merged[j].CreatedAt })
|
||||
if len(merged) > limit {
|
||||
merged = merged[:limit]
|
||||
}
|
||||
out := make([]feedAuthorItem, 0, len(merged))
|
||||
for _, rec := range merged {
|
||||
out = append(out, buildAuthorItem(cfg, rec))
|
||||
}
|
||||
jsonOK(w, map[string]any{"count": len(out), "posts": out})
|
||||
}
|
||||
}
|
||||
|
||||
// ── GET /feed/trending ────────────────────────────────────────────────────
|
||||
|
||||
func feedTrending(cfg FeedConfig) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
jsonErr(w, fmt.Errorf("method not allowed"), 405)
|
||||
return
|
||||
}
|
||||
limit := queryInt(r, "limit", 30)
|
||||
// Window defaults to 24h; cap 7d so a viral post from a week ago
|
||||
// doesn't permanently dominate.
|
||||
windowHours := queryInt(r, "window", 24)
|
||||
if windowHours > 24*7 {
|
||||
windowHours = 24 * 7
|
||||
}
|
||||
if windowHours < 1 {
|
||||
windowHours = 1
|
||||
}
|
||||
ids, err := cfg.Mailbox.RecentPostIDs(int64(windowHours)*3600, 500)
|
||||
if err != nil {
|
||||
jsonErr(w, err, 500)
|
||||
return
|
||||
}
|
||||
// Score each = likes*3 + views, honoring soft-delete.
|
||||
type scored struct {
|
||||
id string
|
||||
score uint64
|
||||
}
|
||||
scoredList := make([]scored, 0, len(ids))
|
||||
for _, id := range ids {
|
||||
if cfg.GetPost != nil {
|
||||
if rec, _ := cfg.GetPost(id); rec != nil && rec.Deleted {
|
||||
continue
|
||||
}
|
||||
}
|
||||
views, _ := cfg.Mailbox.ViewCount(id)
|
||||
var likes uint64
|
||||
if cfg.LikeCount != nil {
|
||||
likes, _ = cfg.LikeCount(id)
|
||||
}
|
||||
scoredList = append(scoredList, scored{id: id, score: likes*3 + views})
|
||||
}
|
||||
sort.Slice(scoredList, func(i, j int) bool { return scoredList[i].score > scoredList[j].score })
|
||||
if len(scoredList) > limit {
|
||||
scoredList = scoredList[:limit]
|
||||
}
|
||||
pickedIDs := make([]string, len(scoredList))
|
||||
for i, s := range scoredList {
|
||||
pickedIDs[i] = s.id
|
||||
}
|
||||
out := expandByID(cfg, pickedIDs)
|
||||
jsonOK(w, map[string]any{"count": len(out), "posts": out})
|
||||
}
|
||||
}
|
||||
|
||||
// ── GET /feed/foryou ──────────────────────────────────────────────────────
|
||||
//
|
||||
// Simple recommendations heuristic for v2.0.0:
|
||||
// 1. Compute the set of authors the user already follows.
|
||||
// 2. Fetch recent posts from the relay (last 48h).
|
||||
// 3. Filter OUT posts from followed authors (those live in /timeline).
|
||||
// 4. Filter OUT posts the user has already liked.
|
||||
// 5. Rank remaining by (likes × 3 + views) and return top N.
|
||||
//
|
||||
// Future improvements (tracked as v2.2.0 "Feed algorithm"):
|
||||
// - Weight by "followed-of-followed" signal (friends-of-friends boost).
|
||||
// - Decay by age (exp half-life ~12h).
|
||||
// - Penalise self-engagement (author liking own post).
|
||||
// - Collaborative filtering on hashtag co-occurrence.
|
||||
|
||||
func feedForYou(cfg FeedConfig) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
jsonErr(w, fmt.Errorf("method not allowed"), 405)
|
||||
return
|
||||
}
|
||||
pub := r.URL.Query().Get("pub")
|
||||
limit := queryInt(r, "limit", 30)
|
||||
|
||||
// Gather user's follows + likes to exclude from the candidate pool.
|
||||
excludedAuthors := make(map[string]struct{})
|
||||
if cfg.Following != nil && pub != "" {
|
||||
if list, err := cfg.Following(pub); err == nil {
|
||||
for _, a := range list {
|
||||
excludedAuthors[a] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Post pool: last 48h on this relay.
|
||||
ids, err := cfg.Mailbox.RecentPostIDs(48*3600, 500)
|
||||
if err != nil {
|
||||
jsonErr(w, err, 500)
|
||||
return
|
||||
}
|
||||
type scored struct {
|
||||
id string
|
||||
score uint64
|
||||
}
|
||||
scoredList := make([]scored, 0, len(ids))
|
||||
for _, id := range ids {
|
||||
body, _ := cfg.Mailbox.Get(id)
|
||||
if body == nil {
|
||||
continue
|
||||
}
|
||||
if _, followed := excludedAuthors[body.Author]; followed {
|
||||
continue
|
||||
}
|
||||
if body.Author == pub {
|
||||
continue // don't recommend user's own posts
|
||||
}
|
||||
if cfg.GetPost != nil {
|
||||
if rec, _ := cfg.GetPost(id); rec != nil && rec.Deleted {
|
||||
continue
|
||||
}
|
||||
}
|
||||
// Skip already-liked.
|
||||
if cfg.HasLiked != nil && pub != "" {
|
||||
if liked, _ := cfg.HasLiked(id, pub); liked {
|
||||
continue
|
||||
}
|
||||
}
|
||||
views, _ := cfg.Mailbox.ViewCount(id)
|
||||
var likes uint64
|
||||
if cfg.LikeCount != nil {
|
||||
likes, _ = cfg.LikeCount(id)
|
||||
}
|
||||
// Small "seed" score so posts with no engagement still get shown
|
||||
// sometimes (otherwise a silent but fresh post can't break in).
|
||||
scoredList = append(scoredList, scored{id: id, score: likes*3 + views + 1})
|
||||
}
|
||||
sort.Slice(scoredList, func(i, j int) bool { return scoredList[i].score > scoredList[j].score })
|
||||
if len(scoredList) > limit {
|
||||
scoredList = scoredList[:limit]
|
||||
}
|
||||
pickedIDs := make([]string, len(scoredList))
|
||||
for i, s := range scoredList {
|
||||
pickedIDs[i] = s.id
|
||||
}
|
||||
out := expandByID(cfg, pickedIDs)
|
||||
jsonOK(w, map[string]any{"count": len(out), "posts": out})
|
||||
}
|
||||
}
|
||||
|
||||
// ── GET /feed/hashtag/{tag} ──────────────────────────────────────────────
|
||||
|
||||
func feedHashtag(cfg FeedConfig) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
jsonErr(w, fmt.Errorf("method not allowed"), 405)
|
||||
return
|
||||
}
|
||||
tag := strings.TrimPrefix(r.URL.Path, "/feed/hashtag/")
|
||||
tag = strings.Trim(tag, "/")
|
||||
if tag == "" {
|
||||
jsonErr(w, fmt.Errorf("tag required"), 400)
|
||||
return
|
||||
}
|
||||
limit := queryInt(r, "limit", 50)
|
||||
ids, err := cfg.Mailbox.PostsByHashtag(tag, limit)
|
||||
if err != nil {
|
||||
jsonErr(w, err, 500)
|
||||
return
|
||||
}
|
||||
out := expandByID(cfg, ids)
|
||||
jsonOK(w, map[string]any{"tag": strings.ToLower(tag), "count": len(out), "posts": out})
|
||||
}
|
||||
}
|
||||
|
||||
// (queryInt helper is shared with the rest of the node HTTP surface;
|
||||
// see api_common.go.)
|
||||
431
relay/feed_mailbox.go
Normal file
431
relay/feed_mailbox.go
Normal file
@@ -0,0 +1,431 @@
|
||||
package relay
|
||||
|
||||
// FeedMailbox — BadgerDB-backed storage for social-feed post bodies.
|
||||
//
|
||||
// Posts are PUBLIC (plaintext) — unlike the E2E inbox envelopes, feed posts
|
||||
// have no recipient key. They live keyed by post ID and can be read by
|
||||
// anyone via GET /feed/post/{id}.
|
||||
//
|
||||
// Storage layout (keys):
|
||||
//
|
||||
// post:<postID> → FeedPost JSON (body + metadata)
|
||||
// post-by-author:<author>:<ts> → postID (chrono index for GET /feed/author)
|
||||
// post-views:<postID> → uint64 big-endian (view counter)
|
||||
// post-hashtag:<tag>:<ts>:<id> → postID (inverted index for #tag search)
|
||||
// post-trending:<score>:<id> → postID (ranked index; score = likes × 2 + views)
|
||||
//
|
||||
// View counts are off-chain because on-chain would mean one tx per view —
|
||||
// financially and architecturally unreasonable. Likes stay on-chain
|
||||
// (provable authorship + anti-Sybil via fee).
|
||||
//
|
||||
// Anti-spam:
|
||||
// - MaxPostBodySize is enforced at Store time.
|
||||
// - Per-sender rate limiting happens at the HTTP layer (withSubmitTxGuards).
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
badger "github.com/dgraph-io/badger/v4"
|
||||
)
|
||||
|
||||
const (
|
||||
feedPostPrefix = "feedpost:"
|
||||
feedAuthorIdxPrefix = "feedauthor:" // feedauthor:<author>:<ts_20d>:<postID>
|
||||
feedViewPrefix = "feedview:" // feedview:<postID> → uint64
|
||||
feedHashtagPrefix = "feedtag:" // feedtag:<tag>:<ts_20d>:<postID>
|
||||
feedTrendingPrefix = "feedtrend:" // feedtrend:<score_20d_inv>:<postID>
|
||||
|
||||
// MaxPostBodySize is the hard cap on a post's on-wire size. Matches
|
||||
// blockchain.MaxPostSize so the on-chain fee estimate is always
|
||||
// enforceable (no "I claimed 10 KiB but sent 50 KiB" trick).
|
||||
MaxPostBodySize = 256 * 1024 // 256 KiB
|
||||
|
||||
// FeedPostDefaultTTLDays is how long a post body lives before BadgerDB
|
||||
// auto-evicts it. On-chain metadata stays forever, so a reader hitting
|
||||
// a stale post sees the record with a "body unavailable" indicator.
|
||||
// Configurable via the env var DCHAIN_FEED_TTL_DAYS (handled in main.go).
|
||||
FeedPostDefaultTTLDays = 30
|
||||
|
||||
// maxHashtagsPerPost caps how many distinct hashtags we'll index per
|
||||
// post. Prevents a spammer from polluting every tag namespace with one
|
||||
// mega-post.
|
||||
maxHashtagsPerPost = 8
|
||||
|
||||
// trendingHalfLifeSeconds controls how quickly a post's score decays.
|
||||
// Used when computing "trending": recent engagement weighs more than old.
|
||||
trendingHalfLifeSeconds = 12 * 3600 // 12 hours
|
||||
)
|
||||
|
||||
// FeedPost is the off-chain body. On-chain we keep the metadata in
|
||||
// blockchain.PostRecord — here we store the readable payload.
|
||||
//
|
||||
// Why not just put the body on-chain? Size — a 256 KiB post × thousands
|
||||
// per day would bloat the block history. Keeping it in a relay DB with a
|
||||
// TTL gives us ephemerality while still letting on-chain records serve as
|
||||
// the permanent proof of authorship.
|
||||
type FeedPost struct {
|
||||
// Identity (matches on-chain PostRecord.PostID).
|
||||
PostID string `json:"post_id"`
|
||||
Author string `json:"author"` // Ed25519 hex
|
||||
|
||||
// Payload. Content is always plaintext (posts are public). Attachment is
|
||||
// a pre-compressed blob — client is expected to have minimised size
|
||||
// before publish. If empty, the post is text-only.
|
||||
Content string `json:"content"`
|
||||
ContentType string `json:"content_type,omitempty"` // "text/plain" | "text/markdown" | ...
|
||||
Attachment []byte `json:"attachment,omitempty"`
|
||||
AttachmentMIME string `json:"attachment_mime,omitempty"`
|
||||
Hashtags []string `json:"hashtags,omitempty"` // lowercased, without leading #
|
||||
|
||||
// CreatedAt matches the on-chain tx timestamp — we stamp it server-side
|
||||
// at Store() so senders can't back-date.
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
|
||||
// ReplyTo / QuoteOf mirror the on-chain PostRecord fields, included
|
||||
// here so the client can thread without a second RPC.
|
||||
ReplyTo string `json:"reply_to,omitempty"`
|
||||
QuoteOf string `json:"quote_of,omitempty"`
|
||||
}
|
||||
|
||||
// ErrPostTooLarge is returned by Store when the post body exceeds MaxPostBodySize.
|
||||
var ErrPostTooLarge = errors.New("post body exceeds maximum allowed size")
|
||||
|
||||
// FeedMailbox stores feed post bodies.
|
||||
type FeedMailbox struct {
|
||||
db *badger.DB
|
||||
ttl time.Duration
|
||||
}
|
||||
|
||||
// NewFeedMailbox wraps an already-open Badger DB. TTL controls how long
|
||||
// post bodies live before auto-eviction (on-chain metadata persists
|
||||
// forever independently).
|
||||
func NewFeedMailbox(db *badger.DB, ttl time.Duration) *FeedMailbox {
|
||||
if ttl <= 0 {
|
||||
ttl = time.Duration(FeedPostDefaultTTLDays) * 24 * time.Hour
|
||||
}
|
||||
return &FeedMailbox{db: db, ttl: ttl}
|
||||
}
|
||||
|
||||
// OpenFeedMailbox opens (or creates) a dedicated BadgerDB at dbPath.
|
||||
func OpenFeedMailbox(dbPath string, ttl time.Duration) (*FeedMailbox, error) {
|
||||
opts := badger.DefaultOptions(dbPath).
|
||||
WithLogger(nil).
|
||||
WithValueLogFileSize(128 << 20).
|
||||
WithNumVersionsToKeep(1).
|
||||
WithCompactL0OnClose(true)
|
||||
db, err := badger.Open(opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open feed mailbox db: %w", err)
|
||||
}
|
||||
return NewFeedMailbox(db, ttl), nil
|
||||
}
|
||||
|
||||
// Close releases the underlying Badger handle.
|
||||
func (fm *FeedMailbox) Close() error { return fm.db.Close() }
|
||||
|
||||
// Store persists a post body and updates all indices. `createdAt` is the
|
||||
// canonical timestamp (usually from the chain tx) and becomes the
|
||||
// server's view of when the post happened — clients' wall-clock values
|
||||
// are ignored.
|
||||
//
|
||||
// Returns the set of hashtags actually indexed (after dedup + cap).
|
||||
func (fm *FeedMailbox) Store(post *FeedPost, createdAt int64) ([]string, error) {
|
||||
size := estimatePostSize(post)
|
||||
if size > MaxPostBodySize {
|
||||
return nil, ErrPostTooLarge
|
||||
}
|
||||
|
||||
post.CreatedAt = createdAt
|
||||
// Normalise hashtags — the client may or may not have supplied them;
|
||||
// we derive from Content as the authoritative source, then dedup.
|
||||
tags := extractHashtags(post.Content)
|
||||
if len(tags) > maxHashtagsPerPost {
|
||||
tags = tags[:maxHashtagsPerPost]
|
||||
}
|
||||
post.Hashtags = tags
|
||||
|
||||
val, err := json.Marshal(post)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal post: %w", err)
|
||||
}
|
||||
|
||||
err = fm.db.Update(func(txn *badger.Txn) error {
|
||||
// Idempotent on postID — second Store is a no-op.
|
||||
key := []byte(feedPostPrefix + post.PostID)
|
||||
if _, err := txn.Get(key); err == nil {
|
||||
return nil
|
||||
}
|
||||
entry := badger.NewEntry(key, val).WithTTL(fm.ttl)
|
||||
if err := txn.SetEntry(entry); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Author chrono index.
|
||||
authorKey := fmt.Sprintf("%s%s:%020d:%s", feedAuthorIdxPrefix, post.Author, createdAt, post.PostID)
|
||||
if err := txn.SetEntry(
|
||||
badger.NewEntry([]byte(authorKey), []byte(post.PostID)).WithTTL(fm.ttl),
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Hashtag inverted index.
|
||||
for _, tag := range tags {
|
||||
tagKey := fmt.Sprintf("%s%s:%020d:%s", feedHashtagPrefix, tag, createdAt, post.PostID)
|
||||
if err := txn.SetEntry(
|
||||
badger.NewEntry([]byte(tagKey), []byte(post.PostID)).WithTTL(fm.ttl),
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
// Get returns the full post body, or nil if not found / evicted.
|
||||
func (fm *FeedMailbox) Get(postID string) (*FeedPost, error) {
|
||||
var p FeedPost
|
||||
err := fm.db.View(func(txn *badger.Txn) error {
|
||||
item, err := txn.Get([]byte(feedPostPrefix + postID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return item.Value(func(val []byte) error {
|
||||
return json.Unmarshal(val, &p)
|
||||
})
|
||||
})
|
||||
if errors.Is(err, badger.ErrKeyNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &p, nil
|
||||
}
|
||||
|
||||
// Delete removes a post body and its indices. On-chain soft-delete stays;
|
||||
// this just frees storage. Called by DELETE_POST event handler hook.
|
||||
func (fm *FeedMailbox) Delete(postID string) error {
|
||||
// We need author and createdAt to build index keys — fetch first.
|
||||
post, err := fm.Get(postID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if post == nil {
|
||||
return nil
|
||||
}
|
||||
return fm.db.Update(func(txn *badger.Txn) error {
|
||||
if err := txn.Delete([]byte(feedPostPrefix + postID)); err != nil {
|
||||
return err
|
||||
}
|
||||
authorKey := fmt.Sprintf("%s%s:%020d:%s",
|
||||
feedAuthorIdxPrefix, post.Author, post.CreatedAt, postID)
|
||||
if err := txn.Delete([]byte(authorKey)); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, tag := range post.Hashtags {
|
||||
tagKey := fmt.Sprintf("%s%s:%020d:%s",
|
||||
feedHashtagPrefix, tag, post.CreatedAt, postID)
|
||||
if err := txn.Delete([]byte(tagKey)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// IncrementView bumps the view counter for a post. No-op on missing post.
|
||||
// Returns the new count. Views are ephemeral (tied to the post TTL) —
|
||||
// a fresh relay that gossip-loads an old post starts from 0, which is
|
||||
// acceptable for a non-authoritative metric.
|
||||
func (fm *FeedMailbox) IncrementView(postID string) (uint64, error) {
|
||||
var next uint64
|
||||
err := fm.db.Update(func(txn *badger.Txn) error {
|
||||
key := []byte(feedViewPrefix + postID)
|
||||
var cur uint64
|
||||
if item, err := txn.Get(key); err == nil {
|
||||
_ = item.Value(func(val []byte) error {
|
||||
if len(val) == 8 {
|
||||
cur = binary.BigEndian.Uint64(val)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
} else if !errors.Is(err, badger.ErrKeyNotFound) {
|
||||
return err
|
||||
}
|
||||
next = cur + 1
|
||||
var buf [8]byte
|
||||
binary.BigEndian.PutUint64(buf[:], next)
|
||||
return txn.SetEntry(badger.NewEntry(key, buf[:]).WithTTL(fm.ttl))
|
||||
})
|
||||
return next, err
|
||||
}
|
||||
|
||||
// ViewCount returns the current (off-chain) view count for a post.
|
||||
func (fm *FeedMailbox) ViewCount(postID string) (uint64, error) {
|
||||
var n uint64
|
||||
err := fm.db.View(func(txn *badger.Txn) error {
|
||||
item, err := txn.Get([]byte(feedViewPrefix + postID))
|
||||
if errors.Is(err, badger.ErrKeyNotFound) {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return item.Value(func(val []byte) error {
|
||||
if len(val) == 8 {
|
||||
n = binary.BigEndian.Uint64(val)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
return n, err
|
||||
}
|
||||
|
||||
// PostsByAuthor lists the N most recent post IDs by an author, newest first.
|
||||
// Pure ID listing — callers fetch bodies via Get.
|
||||
func (fm *FeedMailbox) PostsByAuthor(authorPub string, limit int) ([]string, error) {
|
||||
if limit <= 0 || limit > 200 {
|
||||
limit = 50
|
||||
}
|
||||
prefix := []byte(feedAuthorIdxPrefix + authorPub + ":")
|
||||
return fm.reverseIDScan(prefix, limit)
|
||||
}
|
||||
|
||||
// PostsByHashtag lists the N most recent posts tagged with tag (lowercased).
|
||||
func (fm *FeedMailbox) PostsByHashtag(tag string, limit int) ([]string, error) {
|
||||
tag = strings.ToLower(strings.TrimPrefix(tag, "#"))
|
||||
if tag == "" {
|
||||
return nil, nil
|
||||
}
|
||||
if limit <= 0 || limit > 200 {
|
||||
limit = 50
|
||||
}
|
||||
prefix := []byte(feedHashtagPrefix + tag + ":")
|
||||
return fm.reverseIDScan(prefix, limit)
|
||||
}
|
||||
|
||||
// reverseIDScan walks prefix in reverse lex order and returns the value
|
||||
// (postID) of each entry up to limit. Used for newest-first indices.
|
||||
func (fm *FeedMailbox) reverseIDScan(prefix []byte, limit int) ([]string, error) {
|
||||
out := make([]string, 0, limit)
|
||||
err := fm.db.View(func(txn *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = prefix
|
||||
opts.Reverse = true
|
||||
seek := append([]byte{}, prefix...)
|
||||
seek = append(seek, 0xff)
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
for it.Seek(seek); it.ValidForPrefix(prefix) && len(out) < limit; it.Next() {
|
||||
item := it.Item()
|
||||
_ = item.Value(func(val []byte) error {
|
||||
out = append(out, string(val))
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return out, err
|
||||
}
|
||||
|
||||
// RecentPostIDs enumerates the most recent posts stored by this relay
|
||||
// across ALL authors. Used by the trending / recommendations endpoints to
|
||||
// seed the candidate pool. maxAgeSeconds bounds the walk (0 = no bound).
|
||||
func (fm *FeedMailbox) RecentPostIDs(maxAgeSeconds int64, limit int) ([]string, error) {
|
||||
if limit <= 0 || limit > 500 {
|
||||
limit = 100
|
||||
}
|
||||
// Can't reuse chrono indices because they're per-author. We scan post:*
|
||||
// and collect, sorted by CreatedAt from the decoded body. This is O(M)
|
||||
// where M = #posts in DB — fine for MVP since TTL-bounded M is small
|
||||
// (~5k posts × 30d TTL on a busy node).
|
||||
type candidate struct {
|
||||
id string
|
||||
ts int64
|
||||
}
|
||||
cutoff := int64(0)
|
||||
if maxAgeSeconds > 0 {
|
||||
cutoff = time.Now().Unix() - maxAgeSeconds
|
||||
}
|
||||
var candidates []candidate
|
||||
prefix := []byte(feedPostPrefix)
|
||||
err := fm.db.View(func(txn *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = prefix
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
item := it.Item()
|
||||
err := item.Value(func(val []byte) error {
|
||||
var p FeedPost
|
||||
if err := json.Unmarshal(val, &p); err != nil {
|
||||
return nil // skip corrupt
|
||||
}
|
||||
if p.CreatedAt < cutoff {
|
||||
return nil
|
||||
}
|
||||
candidates = append(candidates, candidate{id: p.PostID, ts: p.CreatedAt})
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sort.Slice(candidates, func(i, j int) bool { return candidates[i].ts > candidates[j].ts })
|
||||
if len(candidates) > limit {
|
||||
candidates = candidates[:limit]
|
||||
}
|
||||
out := make([]string, len(candidates))
|
||||
for i, c := range candidates {
|
||||
out[i] = c.id
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// extractHashtags finds #word tokens in text, lowercases, dedups, preserves
|
||||
// first-seen order. Word = [A-Za-z0-9_] chars, length 1..40.
|
||||
func extractHashtags(text string) []string {
|
||||
re := hashtagRegex
|
||||
matches := re.FindAllString(text, -1)
|
||||
seen := make(map[string]struct{}, len(matches))
|
||||
out := make([]string, 0, len(matches))
|
||||
for _, m := range matches {
|
||||
tag := strings.ToLower(strings.TrimPrefix(m, "#"))
|
||||
if len(tag) == 0 || len(tag) > 40 {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[tag]; ok {
|
||||
continue
|
||||
}
|
||||
seen[tag] = struct{}{}
|
||||
out = append(out, tag)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
var hashtagRegex = regexp.MustCompile(`#[A-Za-z0-9_\p{L}]{1,40}`)
|
||||
|
||||
// estimatePostSize returns the on-disk size used for fee calculation.
|
||||
// Matches the client's pre-publish size estimate so fees are predictable.
|
||||
func estimatePostSize(post *FeedPost) uint64 {
|
||||
n := uint64(len(post.Content)) + uint64(len(post.Attachment))
|
||||
// Small overhead for metadata (~120 bytes of JSON scaffolding).
|
||||
n += 128
|
||||
return n
|
||||
}
|
||||
198
relay/feed_mailbox_test.go
Normal file
198
relay/feed_mailbox_test.go
Normal file
@@ -0,0 +1,198 @@
|
||||
package relay
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func newTestFeedMailbox(t *testing.T) *FeedMailbox {
|
||||
t.Helper()
|
||||
dir, err := os.MkdirTemp("", "dchain-feedtest-*")
|
||||
if err != nil {
|
||||
t.Fatalf("MkdirTemp: %v", err)
|
||||
}
|
||||
fm, err := OpenFeedMailbox(dir, 24*time.Hour)
|
||||
if err != nil {
|
||||
_ = os.RemoveAll(dir)
|
||||
t.Fatalf("OpenFeedMailbox: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
_ = fm.Close()
|
||||
for i := 0; i < 20; i++ {
|
||||
if err := os.RemoveAll(dir); err == nil {
|
||||
return
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
})
|
||||
return fm
|
||||
}
|
||||
|
||||
// TestFeedMailboxStoreAndGet: store round-trips content + metadata.
|
||||
func TestFeedMailboxStoreAndGet(t *testing.T) {
|
||||
fm := newTestFeedMailbox(t)
|
||||
post := &FeedPost{
|
||||
PostID: "p1",
|
||||
Author: "authorhex",
|
||||
Content: "Hello #world from #dchain",
|
||||
}
|
||||
tags, err := fm.Store(post, 12345)
|
||||
if err != nil {
|
||||
t.Fatalf("Store: %v", err)
|
||||
}
|
||||
wantTags := []string{"world", "dchain"}
|
||||
if len(tags) != len(wantTags) {
|
||||
t.Fatalf("Store returned %v, want %v", tags, wantTags)
|
||||
}
|
||||
for i := range wantTags {
|
||||
if tags[i] != wantTags[i] {
|
||||
t.Errorf("Store tag[%d]: got %q, want %q", i, tags[i], wantTags[i])
|
||||
}
|
||||
}
|
||||
|
||||
got, err := fm.Get("p1")
|
||||
if err != nil || got == nil {
|
||||
t.Fatalf("Get: got %v err=%v", got, err)
|
||||
}
|
||||
if got.Content != post.Content {
|
||||
t.Errorf("content: got %q, want %q", got.Content, post.Content)
|
||||
}
|
||||
if got.CreatedAt != 12345 {
|
||||
t.Errorf("created_at: got %d, want 12345", got.CreatedAt)
|
||||
}
|
||||
if len(got.Hashtags) != 2 {
|
||||
t.Errorf("hashtags: got %v, want [world dchain]", got.Hashtags)
|
||||
}
|
||||
}
|
||||
|
||||
// TestFeedMailboxTooLarge: rejects over-quota content.
|
||||
func TestFeedMailboxTooLarge(t *testing.T) {
|
||||
fm := newTestFeedMailbox(t)
|
||||
big := make([]byte, MaxPostBodySize+1)
|
||||
post := &FeedPost{
|
||||
PostID: "big1",
|
||||
Author: "a",
|
||||
Attachment: big,
|
||||
}
|
||||
if _, err := fm.Store(post, 0); err != ErrPostTooLarge {
|
||||
t.Fatalf("Store huge post: got %v, want ErrPostTooLarge", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestFeedMailboxHashtagIndex: hashtags are searchable + dedup + case-normalised.
|
||||
func TestFeedMailboxHashtagIndex(t *testing.T) {
|
||||
fm := newTestFeedMailbox(t)
|
||||
|
||||
p1 := &FeedPost{PostID: "p1", Author: "a", Content: "post about #Go"}
|
||||
p2 := &FeedPost{PostID: "p2", Author: "b", Content: "Also #go programming"}
|
||||
p3 := &FeedPost{PostID: "p3", Author: "a", Content: "#Rust too"}
|
||||
|
||||
if _, err := fm.Store(p1, 1000); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := fm.Store(p2, 2000); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := fm.Store(p3, 3000); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// #go is case-insensitive, should return both posts newest-first.
|
||||
ids, err := fm.PostsByHashtag("#Go", 10)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(ids) != 2 || ids[0] != "p2" || ids[1] != "p1" {
|
||||
t.Errorf("PostsByHashtag(Go): got %v, want [p2 p1]", ids)
|
||||
}
|
||||
|
||||
ids, _ = fm.PostsByHashtag("rust", 10)
|
||||
if len(ids) != 1 || ids[0] != "p3" {
|
||||
t.Errorf("PostsByHashtag(rust): got %v, want [p3]", ids)
|
||||
}
|
||||
}
|
||||
|
||||
// TestFeedMailboxViewCounter: increments + reads.
|
||||
func TestFeedMailboxViewCounter(t *testing.T) {
|
||||
fm := newTestFeedMailbox(t)
|
||||
fm.Store(&FeedPost{PostID: "p", Author: "a", Content: "hi"}, 10)
|
||||
|
||||
for i := 1; i <= 5; i++ {
|
||||
n, err := fm.IncrementView("p")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if n != uint64(i) {
|
||||
t.Errorf("IncrementView #%d: got %d, want %d", i, n, i)
|
||||
}
|
||||
}
|
||||
if n, _ := fm.ViewCount("p"); n != 5 {
|
||||
t.Errorf("ViewCount: got %d, want 5", n)
|
||||
}
|
||||
}
|
||||
|
||||
// TestFeedMailboxByAuthor: author chrono index returns newest first.
|
||||
func TestFeedMailboxByAuthor(t *testing.T) {
|
||||
fm := newTestFeedMailbox(t)
|
||||
fm.Store(&FeedPost{PostID: "old", Author: "a", Content: "one"}, 100)
|
||||
fm.Store(&FeedPost{PostID: "new", Author: "a", Content: "two"}, 500)
|
||||
fm.Store(&FeedPost{PostID: "mid", Author: "a", Content: "three"}, 300)
|
||||
fm.Store(&FeedPost{PostID: "other", Author: "b", Content: "four"}, 400)
|
||||
|
||||
ids, err := fm.PostsByAuthor("a", 10)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
want := []string{"new", "mid", "old"}
|
||||
if len(ids) != len(want) {
|
||||
t.Fatalf("len: got %d, want %d (%v)", len(ids), len(want), ids)
|
||||
}
|
||||
for i := range want {
|
||||
if ids[i] != want[i] {
|
||||
t.Errorf("pos %d: got %q want %q", i, ids[i], want[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestFeedMailboxDelete: removes body + indices.
|
||||
func TestFeedMailboxDelete(t *testing.T) {
|
||||
fm := newTestFeedMailbox(t)
|
||||
fm.Store(&FeedPost{PostID: "x", Author: "a", Content: "doomed #go"}, 100)
|
||||
|
||||
if err := fm.Delete("x"); err != nil {
|
||||
t.Fatalf("Delete: %v", err)
|
||||
}
|
||||
if got, _ := fm.Get("x"); got != nil {
|
||||
t.Errorf("Get after delete: got %v, want nil", got)
|
||||
}
|
||||
if ids, _ := fm.PostsByHashtag("go", 10); len(ids) != 0 {
|
||||
t.Errorf("hashtag index: got %v, want []", ids)
|
||||
}
|
||||
if ids, _ := fm.PostsByAuthor("a", 10); len(ids) != 0 {
|
||||
t.Errorf("author index: got %v, want []", ids)
|
||||
}
|
||||
}
|
||||
|
||||
// TestFeedMailboxRecentIDs: filters by window, sorts newest first.
|
||||
func TestFeedMailboxRecentIDs(t *testing.T) {
|
||||
fm := newTestFeedMailbox(t)
|
||||
now := time.Now().Unix()
|
||||
// p1 1 hour old, p2 5 hours old, p3 50 hours old.
|
||||
fm.Store(&FeedPost{PostID: "p1", Author: "a", Content: "a"}, now-3600)
|
||||
fm.Store(&FeedPost{PostID: "p2", Author: "b", Content: "b"}, now-5*3600)
|
||||
fm.Store(&FeedPost{PostID: "p3", Author: "c", Content: "c"}, now-50*3600)
|
||||
|
||||
// 6-hour window: p1 and p2 only.
|
||||
ids, err := fm.RecentPostIDs(6*3600, 100)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(ids) != 2 {
|
||||
t.Errorf("RecentPostIDs(6h): got %v, want 2 posts", ids)
|
||||
}
|
||||
// Newest first.
|
||||
if ids[0] != "p1" {
|
||||
t.Errorf("first post: got %s, want p1", ids[0])
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user