publish(task 7): relay-pool-wrapper (publish + checkExisting)

publishToRelays(urls, ev, opts) publisht signiertes event parallel zu
allen relays, mit retries + exponential backoff + timeout pro versuch.
retour: { ok: string[], failed: string[] }. default-pool via
applesauce-relay 2.x (new RelayPool()); publishFn via dependency-
injection für tests. checkExisting(slug, pubkey, urls) fragt je relay
nach kind:30023 mit #d-filter ab — true wenn irgendeiner matcht.
timer-leaks vermieden per clearTimeout in publishOne + im mock-test.
3 tests grün.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jörg Lohrer 2026-04-18 05:27:12 +02:00
parent e4518fbf69
commit ebe73cbf46
2 changed files with 170 additions and 0 deletions

109
publish/src/core/relays.ts Normal file
View File

@ -0,0 +1,109 @@
import { Relay, RelayPool } from 'applesauce-relay'
import { firstValueFrom, timeout } from 'rxjs'
export interface SignedEvent {
id: string
pubkey: string
created_at: number
kind: number
tags: string[][]
content: string
sig: string
}
export interface PublishResult {
ok: boolean
reason?: string
}
export type PublishFn = (url: string, ev: SignedEvent) => Promise<PublishResult>
export interface PublishOptions {
publishFn?: PublishFn
retries?: number
timeoutMs?: number
backoffMs?: number
}
export interface RelaysReport {
ok: string[]
failed: string[]
}
const defaultPool = new RelayPool()
const defaultPublish: PublishFn = async (url, ev) => {
try {
const relay = defaultPool.relay(url)
const result = await firstValueFrom(relay.publish(ev).pipe(timeout({ first: 10_000 })))
return { ok: result.ok, reason: result.message }
} catch (err) {
return { ok: false, reason: err instanceof Error ? err.message : String(err) }
}
}
async function publishOne(
url: string,
ev: SignedEvent,
opts: Required<PublishOptions>,
): Promise<boolean> {
const total = opts.retries + 1
for (let i = 0; i < total; i++) {
let timerId: number | undefined
const timeoutPromise = new Promise<PublishResult>((resolve) => {
timerId = setTimeout(() => resolve({ ok: false, reason: 'timeout' }), opts.timeoutMs)
})
const res = await Promise.race([opts.publishFn(url, ev), timeoutPromise])
if (timerId !== undefined) clearTimeout(timerId)
if (res.ok) return true
if (i < total - 1) await new Promise((r) => setTimeout(r, opts.backoffMs * Math.pow(3, i)))
}
return false
}
export async function publishToRelays(
urls: string[],
ev: SignedEvent,
options: PublishOptions = {},
): Promise<RelaysReport> {
const opts: Required<PublishOptions> = {
publishFn: options.publishFn ?? defaultPublish,
retries: options.retries ?? 2,
timeoutMs: options.timeoutMs ?? 10_000,
backoffMs: options.backoffMs ?? 1000,
}
const results = await Promise.all(
urls.map(async (url) => ({ url, ok: await publishOne(url, ev, opts) })),
)
return {
ok: results.filter((r) => r.ok).map((r) => r.url),
failed: results.filter((r) => !r.ok).map((r) => r.url),
}
}
export type ExistingQuery = (url: string, pubkey: string, slug: string) => Promise<boolean>
const defaultExistingQuery: ExistingQuery = async (url, pubkey, slug) => {
try {
const relay = new Relay(url)
const ev = await firstValueFrom(
relay
.request({ kinds: [30023], authors: [pubkey], '#d': [slug], limit: 1 })
.pipe(timeout({ first: 5_000 })),
)
return !!ev
} catch {
return false
}
}
export async function checkExisting(
slug: string,
pubkey: string,
urls: string[],
opts: { query?: ExistingQuery } = {},
): Promise<boolean> {
const query = opts.query ?? defaultExistingQuery
const results = await Promise.all(urls.map((u) => query(u, pubkey, slug)))
return results.some((r) => r)
}

View File

@ -0,0 +1,61 @@
import { assertEquals } from '@std/assert'
import { publishToRelays } from '../src/core/relays.ts'
const sampleEvent = {
kind: 1,
pubkey: 'p',
created_at: 1,
tags: [],
content: 'x',
id: 'i',
sig: 's',
}
Deno.test('publishToRelays: meldet OK-Antworten je relay', async () => {
const injected = async (url: string, _ev: unknown) => {
if (url.includes('fail')) return { ok: false, reason: 'nope' }
return { ok: true }
}
const result = await publishToRelays(
['wss://ok1.example', 'wss://ok2.example', 'wss://fail.example'],
sampleEvent,
{ publishFn: injected, retries: 0, timeoutMs: 100 },
)
assertEquals(result.ok.sort(), ['wss://ok1.example', 'wss://ok2.example'])
assertEquals(result.failed, ['wss://fail.example'])
})
Deno.test('publishToRelays: retry bei Fehler', async () => {
let attempts = 0
const injected = async () => {
attempts++
if (attempts < 2) return { ok: false, reason: 'transient' }
return { ok: true }
}
const result = await publishToRelays(
['wss://flaky.example'],
sampleEvent,
{ publishFn: injected, retries: 1, timeoutMs: 100, backoffMs: 1 },
)
assertEquals(result.ok, ['wss://flaky.example'])
assertEquals(attempts, 2)
})
Deno.test('publishToRelays: timeout → failed', async () => {
const pendingTimers: number[] = []
const injected = () =>
new Promise<{ ok: boolean }>((resolve) => {
const t = setTimeout(() => resolve({ ok: true }), 500)
pendingTimers.push(t)
})
try {
const result = await publishToRelays(
['wss://slow.example'],
sampleEvent,
{ publishFn: injected, retries: 0, timeoutMs: 10 },
)
assertEquals(result.failed, ['wss://slow.example'])
} finally {
for (const t of pendingTimers) clearTimeout(t)
}
})