Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 122 additions & 42 deletions convex/engine/effects/collectionAttempt.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { WorkflowManager } from "@convex-dev/workflow";
import { v } from "convex/values";
import { internal } from "../../_generated/api";
import { components, internal } from "../../_generated/api";
import type { Id } from "../../_generated/dataModel";
import type { MutationCtx } from "../../_generated/server";
import { internalMutation } from "../../_generated/server";
Expand All @@ -13,6 +14,8 @@ import { executeTransition } from "../transition";
import type { CommandSource } from "../types";
import { effectPayloadValidator } from "../validators";

const workflow = new WorkflowManager(components.workflow);

const collectionAttemptEffectValidator = {
...effectPayloadValidator,
entityId: v.id("collectionAttempts"),
Expand Down Expand Up @@ -202,50 +205,38 @@ export const notifyAdmin = internalMutation({
});

/**
* Cross-entity effect: triggers cash ledger reversal cascade.
* Triggered when a collection attempt transitions to `reversed` via PAYMENT_REVERSED.
*
* Iterates all obligationIds in the plan entry, delegating reversal of each
* obligation's ledger entries to postPaymentReversalCascade(). Unlike
* emitPaymentReceived (which tracks partial amounts and breaks early), this
* unconditionally reverses every obligation — partial reversal would leave
* the cash ledger inconsistent.
* Mutation step: executes the per-obligation reversal cascade within a
* durable workflow. Each obligation's reversal is idempotent via
* postingGroupId, so retries are safe.
*
* Each call is idempotent via posting-group deduplication in the cash ledger.
* Return value (including clawbackRequired) is currently discarded —
* payout clawback handling is deferred to ENG-175+.
* Called by `reversalCascadeWorkflow` — not directly by the scheduler.
*/
export const emitPaymentReversed = internalMutation({
args: collectionAttemptEffectValidator,
export const executeReversalCascadeStep = internalMutation({
args: {
entityId: v.id("collectionAttempts"),
source: effectPayloadValidator.source,
reason: v.string(),
effectiveDate: v.string(),
},
handler: async (ctx, args) => {
const { planEntry } = await loadAttemptAndPlanEntry(
ctx,
args,
"emitPaymentReversed"
);

let reason: string;
if (typeof args.payload?.reason === "string") {
reason = args.payload.reason;
} else {
reason = "payment_reversed";
console.warn(
`[emitPaymentReversed] No valid reason in payload for attempt=${args.entityId}. Defaulting to "${reason}".`
const attempt = await ctx.db.get(args.entityId);
if (!attempt) {
throw new Error(
`[executeReversalCascadeStep] Collection attempt not found: ${args.entityId}`
);
}
const planEntry = await ctx.db.get(attempt.planEntryId);
if (!planEntry) {
throw new Error(
`[executeReversalCascadeStep] Plan entry not found: ${attempt.planEntryId} (attempt=${args.entityId})`
);
}

// Prefer effectiveDate from event payload (set at event-receive time);
// fall back to current date if not provided.
const effectiveDate =
typeof args.payload?.effectiveDate === "string"
? args.payload.effectiveDate
: new Date().toISOString().slice(0, 10);

for (const obligationId of planEntry.obligationIds) {
const obligation = await ctx.db.get(obligationId);
if (!obligation) {
throw new Error(
`[emitPaymentReversed] Obligation not found: ${obligationId} ` +
`[executeReversalCascadeStep] Obligation not found: ${obligationId} ` +
`(attempt=${args.entityId}). Cannot complete reversal — ` +
"the cash ledger would be left in an inconsistent state."
);
Expand All @@ -257,20 +248,20 @@ export const emitPaymentReversed = internalMutation({
const shouldCreateCorrective = obligation.status === "settled";

console.info(
`[emitPaymentReversed] Starting reversal cascade for attempt=${args.entityId}, obligation=${obligationId}`
`[executeReversalCascadeStep] Starting reversal cascade for attempt=${args.entityId}, obligation=${obligationId}`
);

const cascadeResult = await postPaymentReversalCascade(ctx, {
attemptId: args.entityId,
obligationId,
mortgageId: obligation.mortgageId,
effectiveDate,
effectiveDate: args.effectiveDate,
source: args.source,
reason,
reason: args.reason,
});

console.info(
`[emitPaymentReversed] Reversal cascade complete for attempt=${args.entityId}, obligation=${obligationId}`
`[executeReversalCascadeStep] Reversal cascade complete for attempt=${args.entityId}, obligation=${obligationId}`
);

// Schedule corrective obligation creation (ENG-180)
Expand All @@ -289,7 +280,7 @@ export const emitPaymentReversed = internalMutation({
);
if (!cashReceivedReversal) {
throw new Error(
"[emitPaymentReversed] No CASH_RECEIVED reversal entry found in cascade result " +
"[executeReversalCascadeStep] No CASH_RECEIVED reversal entry found in cascade result " +
`for attempt=${args.entityId}, obligation=${obligationId}. ` +
"Cannot determine reversedAmount for corrective obligation."
);
Expand All @@ -303,16 +294,105 @@ export const emitPaymentReversed = internalMutation({
{
originalObligationId: obligationId,
reversedAmount,
reason,
reason: args.reason,
postingGroupId: cascadeResult.postingGroupId,
source: args.source,
}
);

console.info(
`[emitPaymentReversed] Scheduled corrective obligation for attempt=${args.entityId}, obligation=${obligationId}`
`[executeReversalCascadeStep] Scheduled corrective obligation for attempt=${args.entityId}, obligation=${obligationId}`
);
}
}
},
});

/**
* Durable workflow for payment reversal cascade.
*
* Wraps executeReversalCascadeStep with automatic retries via the workflow
* component. The cascade is idempotent via postingGroupId, so retries are
* safe and will not create duplicate ledger entries.
*
* Follows the same pattern as hashChainJournalEntry in engine/hashChain.ts.
*/
export const reversalCascadeWorkflow = workflow.define({
args: {
entityId: v.id("collectionAttempts"),
source: effectPayloadValidator.source,
reason: v.string(),
effectiveDate: v.string(),
},
handler: async (step, args) => {
await step.runMutation(
internal.engine.effects.collectionAttempt.executeReversalCascadeStep,
{
entityId: args.entityId,
source: args.source,
reason: args.reason,
effectiveDate: args.effectiveDate,
}
);
},
});

/**
* Cross-entity effect: triggers cash ledger reversal cascade via durable workflow.
* Triggered when a collection attempt transitions to `reversed` via PAYMENT_REVERSED.
*
* Starts a durable workflow (with automatic retries) that iterates all
* obligationIds in the plan entry, delegating reversal of each obligation's
* ledger entries to postPaymentReversalCascade(). Unlike emitPaymentReceived
* (which tracks partial amounts and breaks early), the workflow unconditionally
* reverses every obligation — partial reversal would leave the cash ledger
* inconsistent.
*
* Each call is idempotent via posting-group deduplication in the cash ledger,
* so workflow retries are safe and will not create duplicate entries.
*
* Return value (including clawbackRequired) is currently discarded —
* payout clawback handling is deferred to ENG-175+.
*/
export const emitPaymentReversed = internalMutation({
args: collectionAttemptEffectValidator,
handler: async (ctx, args) => {
let reason: string;
if (typeof args.payload?.reason === "string") {
reason = args.payload.reason;
} else {
reason = "payment_reversed";
console.warn(
`[emitPaymentReversed] No valid reason in payload for attempt=${args.entityId}. Defaulting to "${reason}".`
);
}

// Prefer effectiveDate from event payload (set at event-receive time);
// fall back to current date if not provided.
const effectiveDate =
typeof args.payload?.effectiveDate === "string"
? args.payload.effectiveDate
: new Date().toISOString().slice(0, 10);

// Start durable workflow — the workflow component handles retries
// automatically if the reversal cascade step fails. The cascade is
// idempotent via postingGroupId so retries are safe.
await workflow.start(
ctx,
internal.engine.effects.collectionAttempt.reversalCascadeWorkflow,
{
entityId: args.entityId,
source: args.source,
reason,
effectiveDate,
},
{
startAsync: true,
}
);

console.info(
`[emitPaymentReversed] Started durable reversal cascade workflow for attempt=${args.entityId}`
);
},
});
13 changes: 13 additions & 0 deletions convex/http.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
import { httpRouter } from "convex/server";
import { authKit } from "./auth";
import { rotessaWebhook } from "./payments/webhooks/rotessa";
import { stripeWebhook } from "./payments/webhooks/stripe";

const http = httpRouter();

authKit.registerRoutes(http);

http.route({
path: "/webhooks/rotessa",
method: "POST",
handler: rotessaWebhook,
});
http.route({
path: "/webhooks/stripe",
method: "POST",
handler: stripeWebhook,
});

export default http;
120 changes: 120 additions & 0 deletions convex/payments/webhooks/__tests__/handleReversal.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import { createHmac } from "node:crypto";
import { describe, expect, it } from "vitest";
import { verifyRotessaSignature, verifyStripeSignature } from "../verification";

// ── Test helpers ─────────────────────────────────────────────────────

function createTestRotessaSignature(body: string, secret: string): string {
return createHmac("sha256", secret).update(body).digest("hex");
}

function createTestStripeSignature(
body: string,
secret: string,
timestamp?: number
): string {
const ts = timestamp ?? Math.floor(Date.now() / 1000);
return `t=${ts},v1=${createHmac("sha256", secret).update(`${ts}.${body}`).digest("hex")}`;
}

// ── Tests ────────────────────────────────────────────────────────────

describe("verifyRotessaSignature", () => {
const SECRET = "test-rotessa-secret-key";
const BODY = JSON.stringify({
event_type: "transaction.nsf",
data: { transaction_id: "txn_001", amount: 150.0 },
});

it("returns true for valid HMAC-SHA256 signature", () => {
const signature = createTestRotessaSignature(BODY, SECRET);
expect(verifyRotessaSignature(BODY, signature, SECRET)).toBe(true);
});

it("returns false for invalid signature", () => {
const wrongSignature = createTestRotessaSignature(BODY, "wrong-secret");
expect(verifyRotessaSignature(BODY, wrongSignature, SECRET)).toBe(false);
});

it("returns false for empty signature", () => {
// An empty string produces a zero-length buffer, which won't match the
// 32-byte expected buffer. The length check causes an early return false.
expect(verifyRotessaSignature(BODY, "", SECRET)).toBe(false);
});

it("returns false when body has been tampered with", () => {
const signature = createTestRotessaSignature(BODY, SECRET);
const tamperedBody = `${BODY} extra`;
expect(verifyRotessaSignature(tamperedBody, signature, SECRET)).toBe(false);
});

it("returns true for different valid body+signature pairs", () => {
const body2 = '{"event_type":"transaction.returned"}';
const sig2 = createTestRotessaSignature(body2, SECRET);
expect(verifyRotessaSignature(body2, sig2, SECRET)).toBe(true);
});
});

describe("verifyStripeSignature", () => {
const SECRET = "whsec_test_stripe_secret";
const BODY = JSON.stringify({
id: "evt_123",
type: "charge.refunded",
data: { object: { id: "ch_abc", amount: 5000 } },
});

it("returns true for valid stripe-signature header", () => {
const header = createTestStripeSignature(BODY, SECRET);
expect(verifyStripeSignature(BODY, header, SECRET)).toBe(true);
});

it("returns false for invalid signature", () => {
const header = createTestStripeSignature(BODY, "wrong-secret");
expect(verifyStripeSignature(BODY, header, SECRET)).toBe(false);
});

it("returns false for expired timestamp beyond tolerance", () => {
// Timestamp from 10 minutes ago, with 5 min tolerance
const staleTimestamp = Math.floor(Date.now() / 1000) - 600;
const header = createTestStripeSignature(BODY, SECRET, staleTimestamp);
expect(verifyStripeSignature(BODY, header, SECRET, 300)).toBe(false);
});

it("handles missing v1= prefix gracefully", () => {
const ts = Math.floor(Date.now() / 1000);
// Header with no v1= component
const header = `t=${ts}`;
expect(verifyStripeSignature(BODY, header, SECRET)).toBe(false);
});

it("returns false for missing timestamp", () => {
const sig = createHmac("sha256", SECRET)
.update(`12345.${BODY}`)
.digest("hex");
// Header with no t= component
const header = `v1=${sig}`;
expect(verifyStripeSignature(BODY, header, SECRET)).toBe(false);
});

it("returns false for completely malformed header", () => {
expect(verifyStripeSignature(BODY, "garbage-header", SECRET)).toBe(false);
});

it("accepts timestamp within tolerance", () => {
// Timestamp from 2 minutes ago, tolerance 5 minutes
const recentTimestamp = Math.floor(Date.now() / 1000) - 120;
const header = createTestStripeSignature(BODY, SECRET, recentTimestamp);
expect(verifyStripeSignature(BODY, header, SECRET, 300)).toBe(true);
});

it("returns false for future timestamp beyond tolerance", () => {
// Timestamp 10 minutes in the future
const futureTimestamp = Math.floor(Date.now() / 1000) + 600;
const header = createTestStripeSignature(BODY, SECRET, futureTimestamp);
expect(verifyStripeSignature(BODY, header, SECRET, 300)).toBe(false);
});

it("returns false for empty header string", () => {
expect(verifyStripeSignature(BODY, "", SECRET)).toBe(false);
});
});
Loading
Loading