From ebe73cbf46fafddb03facd87cdc4c45695f038a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Lohrer?= Date: Sat, 18 Apr 2026 05:27:12 +0200 Subject: [PATCH] publish(task 7): relay-pool-wrapper (publish + checkExisting) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit publishToRelays(urls, ev, opts) publisht signiertes event parallel zu allen relays, mit retries + exponential backoff + timeout pro versuch. retour: { ok: string[], failed: string[] }. default-pool via applesauce-relay 2.x (new RelayPool()); publishFn via dependency- injection für tests. checkExisting(slug, pubkey, urls) fragt je relay nach kind:30023 mit #d-filter ab — true wenn irgendeiner matcht. timer-leaks vermieden per clearTimeout in publishOne + im mock-test. 3 tests grün. Co-Authored-By: Claude Opus 4.6 (1M context) --- publish/src/core/relays.ts | 109 +++++++++++++++++++++++++++++++++++ publish/tests/relays_test.ts | 61 ++++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 publish/src/core/relays.ts create mode 100644 publish/tests/relays_test.ts diff --git a/publish/src/core/relays.ts b/publish/src/core/relays.ts new file mode 100644 index 0000000..3cdc1bd --- /dev/null +++ b/publish/src/core/relays.ts @@ -0,0 +1,109 @@ +import { Relay, RelayPool } from 'applesauce-relay' +import { firstValueFrom, timeout } from 'rxjs' + +export interface SignedEvent { + id: string + pubkey: string + created_at: number + kind: number + tags: string[][] + content: string + sig: string +} + +export interface PublishResult { + ok: boolean + reason?: string +} + +export type PublishFn = (url: string, ev: SignedEvent) => Promise + +export interface PublishOptions { + publishFn?: PublishFn + retries?: number + timeoutMs?: number + backoffMs?: number +} + +export interface RelaysReport { + ok: string[] + failed: string[] +} + +const defaultPool = new RelayPool() + +const defaultPublish: PublishFn = async (url, ev) => { + try { + const relay = defaultPool.relay(url) + const result = await firstValueFrom(relay.publish(ev).pipe(timeout({ first: 10_000 }))) + return { ok: result.ok, reason: result.message } + } catch (err) { + return { ok: false, reason: err instanceof Error ? err.message : String(err) } + } +} + +async function publishOne( + url: string, + ev: SignedEvent, + opts: Required, +): Promise { + const total = opts.retries + 1 + for (let i = 0; i < total; i++) { + let timerId: number | undefined + const timeoutPromise = new Promise((resolve) => { + timerId = setTimeout(() => resolve({ ok: false, reason: 'timeout' }), opts.timeoutMs) + }) + const res = await Promise.race([opts.publishFn(url, ev), timeoutPromise]) + if (timerId !== undefined) clearTimeout(timerId) + if (res.ok) return true + if (i < total - 1) await new Promise((r) => setTimeout(r, opts.backoffMs * Math.pow(3, i))) + } + return false +} + +export async function publishToRelays( + urls: string[], + ev: SignedEvent, + options: PublishOptions = {}, +): Promise { + const opts: Required = { + publishFn: options.publishFn ?? defaultPublish, + retries: options.retries ?? 2, + timeoutMs: options.timeoutMs ?? 10_000, + backoffMs: options.backoffMs ?? 1000, + } + const results = await Promise.all( + urls.map(async (url) => ({ url, ok: await publishOne(url, ev, opts) })), + ) + return { + ok: results.filter((r) => r.ok).map((r) => r.url), + failed: results.filter((r) => !r.ok).map((r) => r.url), + } +} + +export type ExistingQuery = (url: string, pubkey: string, slug: string) => Promise + +const defaultExistingQuery: ExistingQuery = async (url, pubkey, slug) => { + try { + const relay = new Relay(url) + const ev = await firstValueFrom( + relay + .request({ kinds: [30023], authors: [pubkey], '#d': [slug], limit: 1 }) + .pipe(timeout({ first: 5_000 })), + ) + return !!ev + } catch { + return false + } +} + +export async function checkExisting( + slug: string, + pubkey: string, + urls: string[], + opts: { query?: ExistingQuery } = {}, +): Promise { + const query = opts.query ?? defaultExistingQuery + const results = await Promise.all(urls.map((u) => query(u, pubkey, slug))) + return results.some((r) => r) +} diff --git a/publish/tests/relays_test.ts b/publish/tests/relays_test.ts new file mode 100644 index 0000000..a25de46 --- /dev/null +++ b/publish/tests/relays_test.ts @@ -0,0 +1,61 @@ +import { assertEquals } from '@std/assert' +import { publishToRelays } from '../src/core/relays.ts' + +const sampleEvent = { + kind: 1, + pubkey: 'p', + created_at: 1, + tags: [], + content: 'x', + id: 'i', + sig: 's', +} + +Deno.test('publishToRelays: meldet OK-Antworten je relay', async () => { + const injected = async (url: string, _ev: unknown) => { + if (url.includes('fail')) return { ok: false, reason: 'nope' } + return { ok: true } + } + const result = await publishToRelays( + ['wss://ok1.example', 'wss://ok2.example', 'wss://fail.example'], + sampleEvent, + { publishFn: injected, retries: 0, timeoutMs: 100 }, + ) + assertEquals(result.ok.sort(), ['wss://ok1.example', 'wss://ok2.example']) + assertEquals(result.failed, ['wss://fail.example']) +}) + +Deno.test('publishToRelays: retry bei Fehler', async () => { + let attempts = 0 + const injected = async () => { + attempts++ + if (attempts < 2) return { ok: false, reason: 'transient' } + return { ok: true } + } + const result = await publishToRelays( + ['wss://flaky.example'], + sampleEvent, + { publishFn: injected, retries: 1, timeoutMs: 100, backoffMs: 1 }, + ) + assertEquals(result.ok, ['wss://flaky.example']) + assertEquals(attempts, 2) +}) + +Deno.test('publishToRelays: timeout → failed', async () => { + const pendingTimers: number[] = [] + const injected = () => + new Promise<{ ok: boolean }>((resolve) => { + const t = setTimeout(() => resolve({ ok: true }), 500) + pendingTimers.push(t) + }) + try { + const result = await publishToRelays( + ['wss://slow.example'], + sampleEvent, + { publishFn: injected, retries: 0, timeoutMs: 10 }, + ) + assertEquals(result.failed, ['wss://slow.example']) + } finally { + for (const t of pendingTimers) clearTimeout(t) + } +})