Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
60f1ff1
Add ignore param to createDocuments for silent duplicate handling
premtsd-code Apr 9, 2026
bee42d4
Add e2e tests for createDocuments ignore mode
premtsd-code Apr 9, 2026
93a9136
Fix Mongo adapter ignore mode: pass ignoreDuplicates to client and fi…
premtsd-code Apr 10, 2026
2906dda
Revert "Fix Mongo adapter ignore mode: pass ignoreDuplicates to clien…
premtsd-code Apr 10, 2026
63d9902
Replace ignore param with skipDuplicates scope guard
premtsd-code Apr 10, 2026
b0a8392
Push skipDuplicates scope guard down to Adapter layer
premtsd-code Apr 12, 2026
d8f647c
Refactor: extract helpers and collapse conditional wraps
premtsd-code Apr 12, 2026
c88c6ac
fix: guard buildUidTenantLookup against empty UID set
premtsd-code Apr 12, 2026
16849fe
Add e2e tests for skipDuplicates edge cases
premtsd-code Apr 13, 2026
c6d566e
SQL adapter: remove redundant skipDuplicates pre-filter
premtsd-code Apr 13, 2026
fd37b69
Mongo adapter: use upsertWithCounts for race-safe accurate counts
premtsd-code Apr 13, 2026
a24358c
Merge remote-tracking branch 'origin/main' into csv-import-upsert-v2
premtsd-code Apr 13, 2026
3b783af
Revert "SQL adapter: remove redundant skipDuplicates pre-filter"
premtsd-code Apr 13, 2026
3a483f2
Mirror: forward only docs the source actually inserted
premtsd-code Apr 13, 2026
eb99cf1
skipDuplicates: simplify by moving pre-filter to orchestrator only
premtsd-code Apr 13, 2026
89e4cf8
skipDuplicates: drop deferred-relationships, inline pre-filter, tight…
premtsd-code Apr 13, 2026
41704eb
Address Jake's review on PR #852
premtsd-code Apr 13, 2026
e9e5e76
Mirror::createDocuments: bound skipDuplicates capture to O($batchSize)
premtsd-code Apr 14, 2026
ae929db
Restore per-tenant grouping for batch existing-doc lookups in tenantP…
premtsd-code Apr 14, 2026
431d378
skipDuplicates: drop pre-filter, rely on adapter-level dedup
premtsd-code Apr 14, 2026
0fd7c33
skipDuplicates: drop intra-batch dedup, trim verbose test comments
premtsd-code Apr 14, 2026
9baaa04
Group skipDuplicates / getInsert* into their logical clusters
premtsd-code Apr 15, 2026
3c85013
Mirror::createDocuments: re-fetch source after skipDuplicates insert
premtsd-code Apr 15, 2026
934ec04
Mirror::createDocuments: pre-filter existing ids + unify skip/non-ski…
premtsd-code Apr 15, 2026
fa0e373
Chunk id-lookup queries to respect RELATION_QUERY_CHUNK_SIZE
premtsd-code Apr 15, 2026
52b189b
Chunk id lookups by \$this->maxQueryValues, not RELATION_QUERY_CHUNK_…
premtsd-code Apr 15, 2026
fbe5117
Address Jake's follow-up review: tighten limits, reuse tenantKey, let…
premtsd-code Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/Database/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,36 @@ abstract class Adapter

protected bool $alterLocks = false;

protected bool $skipDuplicates = false;

/**
* Run a callback with skipDuplicates enabled.
* Duplicate key errors during createDocuments() will be silently skipped
* instead of thrown. Nestable — saves and restores previous state.
* Pass $enable = false to run the callback without toggling the flag
* (useful for conditional forwarding from wrappers like Pool/Mirror).
*
* @template T
* @param callable(): T $callback
* @param bool $enable
* @return T
*/
public function skipDuplicates(callable $callback, bool $enable = true): mixed
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the $enable flag and check if skipDuplicates instead, otherwise we add the overhead of calling an extra callable when it's false

{
if (!$enable) {
return $callback();
}

$previous = $this->skipDuplicates;
$this->skipDuplicates = true;

try {
return $callback();
} finally {
$this->skipDuplicates = $previous;
}
}

/**
* @var array<string, mixed>
*/
Expand Down
102 changes: 102 additions & 0 deletions src/Database/Adapter/Mongo.php
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,42 @@ public function castingBefore(Document $collection, Document $document): Documen
return $document;
}

/**
* Find existing `_uid` values matching the given filters, fully walking the cursor.
*
* @param string $name
* @param array<string, mixed> $filters
* @param int $batchSize Hint: expected max result count — lets MongoDB return everything in one batch
* @return array<int, string>
*
* @throws MongoException
*/
private function findExistingUids(string $name, array $filters, int $batchSize): array
{
$options = $this->getTransactionOptions([
'projection' => ['_uid' => 1],
'batchSize' => $batchSize,
]);

$response = $this->client->find($name, $filters, $options);
$uids = [];

foreach ($response->cursor->firstBatch ?? [] as $doc) {
$uids[] = $doc->_uid;
}

$cursorId = $response->cursor->id ?? 0;
while ($cursorId != 0) {
$more = $this->client->getMore($cursorId, $name, $batchSize);
foreach ($more->cursor->nextBatch ?? [] as $doc) {
$uids[] = $doc->_uid;
}
$cursorId = $more->cursor->id ?? 0;
}

return $uids;
}

/**
* Create Documents in batches
*
Expand Down Expand Up @@ -1487,6 +1523,72 @@ public function createDocuments(Document $collection, array $documents): array
$records[] = $record;
}

// Pre-filter duplicates within the session — MongoDB has no INSERT IGNORE,
// so sending a duplicate would abort the entire transaction.
if ($this->skipDuplicates && !empty($records)) {
$tenantPerDoc = $this->sharedTables && $this->tenantPerDocument;
$recordKey = fn (array $record): string => $tenantPerDoc
? ($record['_tenant'] ?? $this->getTenant()) . ':' . ($record['_uid'] ?? '')
: ($record['_uid'] ?? '');

$existingKeys = [];

try {
if ($tenantPerDoc) {
$idsByTenant = [];
foreach ($records as $record) {
$uid = $record['_uid'] ?? '';
if ($uid === '') {
continue;
}
$tenant = $record['_tenant'] ?? $this->getTenant();
$idsByTenant[$tenant][] = $uid;
}

foreach ($idsByTenant as $tenant => $tenantUids) {
$tenantUids = \array_values(\array_unique($tenantUids));
$filters = ['_uid' => ['$in' => $tenantUids], '_tenant' => $tenant];
foreach ($this->findExistingUids($name, $filters, \count($tenantUids)) as $uid) {
$existingKeys[$tenant . ':' . $uid] = true;
}
}
} else {
$uids = \array_values(\array_unique(\array_filter(
\array_map(fn ($r) => $r['_uid'] ?? null, $records),
fn ($v) => $v !== null
)));
if (!empty($uids)) {
$filters = ['_uid' => ['$in' => $uids]];
if ($this->sharedTables) {
$filters['_tenant'] = $this->getTenantFilters($collection->getId());
}
foreach ($this->findExistingUids($name, $filters, \count($uids)) as $uid) {
$existingKeys[$uid] = true;
}
}
}
} catch (MongoException $e) {
throw $this->processException($e);
}

if (!empty($existingKeys)) {
$filteredRecords = [];
$filteredDocuments = [];
foreach ($records as $i => $record) {
if (!isset($existingKeys[$recordKey($record)])) {
$filteredRecords[] = $record;
$filteredDocuments[] = $documents[$i];
}
}
$records = $filteredRecords;
$documents = $filteredDocuments;
}

if (empty($records)) {
return [];
}
}

try {
$documents = $this->client->insertMany($name, $records, $options);
} catch (MongoException $e) {
Expand Down
15 changes: 12 additions & 3 deletions src/Database/Adapter/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ public function __construct(UtopiaPool $pool)
public function delegate(string $method, array $args): mixed
{
if ($this->pinnedAdapter !== null) {
return $this->pinnedAdapter->{$method}(...$args);
return $this->pinnedAdapter->skipDuplicates(
fn () => $this->pinnedAdapter->{$method}(...$args),
$this->skipDuplicates
);
}

return $this->pool->use(function (Adapter $adapter) use ($method, $args) {
Expand All @@ -66,7 +69,10 @@ public function delegate(string $method, array $args): mixed
$adapter->setMetadata($key, $value);
}

return $adapter->{$method}(...$args);
return $adapter->skipDuplicates(
fn () => $adapter->{$method}(...$args),
$this->skipDuplicates
);
});
}

Expand Down Expand Up @@ -146,7 +152,10 @@ public function withTransaction(callable $callback): mixed

$this->pinnedAdapter = $adapter;
try {
return $adapter->withTransaction($callback);
return $adapter->skipDuplicates(
fn () => $adapter->withTransaction($callback),
$this->skipDuplicates
);
} finally {
$this->pinnedAdapter = null;
}
Expand Down
29 changes: 29 additions & 0 deletions src/Database/Adapter/Postgres.php
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,35 @@ public function updateDocument(Document $collection, string $id, Document $docum
return $document;
}

protected function getInsertKeyword(): string
{
return 'INSERT INTO';
}

protected function getInsertSuffix(string $table): string
{
if (!$this->skipDuplicates) {
return '';
}

$conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

protected function getInsertPermissionsSuffix(): string
{
if (!$this->skipDuplicates) {
return '';
}

$conflictTarget = $this->sharedTables
? '("_type", "_permission", "_document", "_tenant")'
: '("_type", "_permission", "_document")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

/**
* @param string $tableName
* @param string $columns
Expand Down
Loading
Loading