Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
60f1ff1
Add ignore param to createDocuments for silent duplicate handling
premtsd-code Apr 9, 2026
bee42d4
Add e2e tests for createDocuments ignore mode
premtsd-code Apr 9, 2026
93a9136
Fix Mongo adapter ignore mode: pass ignoreDuplicates to client and fi…
premtsd-code Apr 10, 2026
2906dda
Revert "Fix Mongo adapter ignore mode: pass ignoreDuplicates to clien…
premtsd-code Apr 10, 2026
63d9902
Replace ignore param with skipDuplicates scope guard
premtsd-code Apr 10, 2026
b0a8392
Push skipDuplicates scope guard down to Adapter layer
premtsd-code Apr 12, 2026
d8f647c
Refactor: extract helpers and collapse conditional wraps
premtsd-code Apr 12, 2026
c88c6ac
fix: guard buildUidTenantLookup against empty UID set
premtsd-code Apr 12, 2026
16849fe
Add e2e tests for skipDuplicates edge cases
premtsd-code Apr 13, 2026
c6d566e
SQL adapter: remove redundant skipDuplicates pre-filter
premtsd-code Apr 13, 2026
fd37b69
Mongo adapter: use upsertWithCounts for race-safe accurate counts
premtsd-code Apr 13, 2026
a24358c
Merge remote-tracking branch 'origin/main' into csv-import-upsert-v2
premtsd-code Apr 13, 2026
3b783af
Revert "SQL adapter: remove redundant skipDuplicates pre-filter"
premtsd-code Apr 13, 2026
3a483f2
Mirror: forward only docs the source actually inserted
premtsd-code Apr 13, 2026
eb99cf1
skipDuplicates: simplify by moving pre-filter to orchestrator only
premtsd-code Apr 13, 2026
89e4cf8
skipDuplicates: drop deferred-relationships, inline pre-filter, tight…
premtsd-code Apr 13, 2026
41704eb
Address Jake's review on PR #852
premtsd-code Apr 13, 2026
e9e5e76
Mirror::createDocuments: bound skipDuplicates capture to O($batchSize)
premtsd-code Apr 14, 2026
ae929db
Restore per-tenant grouping for batch existing-doc lookups in tenantP…
premtsd-code Apr 14, 2026
431d378
skipDuplicates: drop pre-filter, rely on adapter-level dedup
premtsd-code Apr 14, 2026
0fd7c33
skipDuplicates: drop intra-batch dedup, trim verbose test comments
premtsd-code Apr 14, 2026
9baaa04
Group skipDuplicates / getInsert* into their logical clusters
premtsd-code Apr 15, 2026
3c85013
Mirror::createDocuments: re-fetch source after skipDuplicates insert
premtsd-code Apr 15, 2026
934ec04
Mirror::createDocuments: pre-filter existing ids + unify skip/non-ski…
premtsd-code Apr 15, 2026
fa0e373
Chunk id-lookup queries to respect RELATION_QUERY_CHUNK_SIZE
premtsd-code Apr 15, 2026
52b189b
Chunk id lookups by \$this->maxQueryValues, not RELATION_QUERY_CHUNK_…
premtsd-code Apr 15, 2026
fbe5117
Address Jake's follow-up review: tighten limits, reuse tenantKey, let…
premtsd-code Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Database/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -729,12 +729,13 @@ abstract public function createDocument(Document $collection, Document $document
*
* @param Document $collection
* @param array<Document> $documents
* @param bool $ignore If true, silently ignore duplicate documents instead of throwing
*
* @return array<Document>
*
* @throws DatabaseException
*/
abstract public function createDocuments(Document $collection, array $documents): array;
abstract public function createDocuments(Document $collection, array $documents, bool $ignore = false): array;

/**
* Update Document
Expand Down
93 changes: 91 additions & 2 deletions src/Database/Adapter/Mongo.php
Original file line number Diff line number Diff line change
Expand Up @@ -1460,11 +1460,11 @@ public function castingBefore(Document $collection, Document $document): Documen
* @throws DuplicateException
* @throws DatabaseException
*/
public function createDocuments(Document $collection, array $documents): array
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
{
$name = $this->getNamespace() . '_' . $this->filter($collection->getId());

$options = $this->getTransactionOptions();

$records = [];
$hasSequence = null;
$documents = \array_map(fn ($doc) => clone $doc, $documents);
Expand All @@ -1487,6 +1487,95 @@ public function createDocuments(Document $collection, array $documents): array
$records[] = $record;
}

// Pre-filter duplicates within the session to avoid aborting the transaction.
if ($ignore && !empty($records)) {
$existingKeys = [];

try {
if ($this->sharedTables && $this->tenantPerDocument) {
$idsByTenant = [];
foreach ($records as $record) {
$uid = $record['_uid'] ?? '';
if ($uid === '') {
continue;
}
$tenant = $record['_tenant'] ?? $this->getTenant();
$idsByTenant[$tenant][] = $uid;
}

foreach ($idsByTenant as $tenant => $tenantUids) {
$tenantUids = \array_values(\array_unique($tenantUids));
$findOptions = $this->getTransactionOptions([
'projection' => ['_uid' => 1],
'batchSize' => \count($tenantUids),
]);
$filters = ['_uid' => ['$in' => $tenantUids], '_tenant' => $tenant];
$response = $this->client->find($name, $filters, $findOptions);
foreach ($response->cursor->firstBatch ?? [] as $doc) {
$existingKeys[$tenant . ':' . $doc->_uid] = true;
}
$cursorId = $response->cursor->id ?? 0;
while ($cursorId != 0) {
$more = $this->client->getMore($cursorId, $name, \count($tenantUids));
foreach ($more->cursor->nextBatch ?? [] as $doc) {
$existingKeys[$tenant . ':' . $doc->_uid] = true;
}
$cursorId = $more->cursor->id ?? 0;
}
}
} else {
$uids = \array_filter(\array_map(fn ($r) => $r['_uid'] ?? null, $records), fn ($v) => $v !== null);
if (!empty($uids)) {
$uidValues = \array_values(\array_unique($uids));
$findOptions = $this->getTransactionOptions([
'projection' => ['_uid' => 1],
'batchSize' => \count($uidValues),
]);
$filters = ['_uid' => ['$in' => $uidValues]];
if ($this->sharedTables) {
$filters['_tenant'] = $this->getTenantFilters($collection->getId());
}
$response = $this->client->find($name, $filters, $findOptions);
foreach ($response->cursor->firstBatch ?? [] as $doc) {
$existingKeys[$doc->_uid] = true;
}
$cursorId = $response->cursor->id ?? 0;
while ($cursorId != 0) {
$more = $this->client->getMore($cursorId, $name, \count($uidValues));
foreach ($more->cursor->nextBatch ?? [] as $doc) {
$existingKeys[$doc->_uid] = true;
}
$cursorId = $more->cursor->id ?? 0;
}
}
}
} catch (MongoException $e) {
throw $this->processException($e);
}

if (!empty($existingKeys)) {
$filteredRecords = [];
$filteredDocuments = [];
$tenantPerDoc = $this->sharedTables && $this->tenantPerDocument;
foreach ($records as $i => $record) {
$uid = $record['_uid'] ?? '';
$key = $tenantPerDoc
? ($record['_tenant'] ?? $this->getTenant()) . ':' . $uid
: $uid;
if (!isset($existingKeys[$key])) {
$filteredRecords[] = $record;
$filteredDocuments[] = $documents[$i];
}
}
$records = $filteredRecords;
$documents = $filteredDocuments;
}

if (empty($records)) {
return [];
}
}

try {
$documents = $this->client->insertMany($name, $records, $options);
} catch (MongoException $e) {
Expand Down
2 changes: 1 addition & 1 deletion src/Database/Adapter/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public function createDocument(Document $collection, Document $document): Docume
return $this->delegate(__FUNCTION__, \func_get_args());
}

public function createDocuments(Document $collection, array $documents): array
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
{
return $this->delegate(__FUNCTION__, \func_get_args());
}
Expand Down
29 changes: 29 additions & 0 deletions src/Database/Adapter/Postgres.php
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,35 @@ public function updateDocument(Document $collection, string $id, Document $docum
return $document;
}

protected function getInsertKeyword(bool $ignore): string
{
return 'INSERT INTO';
}

protected function getInsertSuffix(bool $ignore, string $table): string
{
if (!$ignore) {
return '';
}

$conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

protected function getInsertPermissionsSuffix(bool $ignore): string
{
if (!$ignore) {
return '';
}

$conflictTarget = $this->sharedTables
? '("_type", "_permission", "_document", "_tenant")'
: '("_type", "_permission", "_document")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

/**
* @param string $tableName
* @param string $columns
Expand Down
Loading
Loading