@@ -19,6 +19,16 @@ public class ModifiablePublisherRoleCollection : NomadKuboEventStreamHandler<Val
1919 /// <inheritdoc/>
2020 public required ReadOnlyPublisherRoleCollection Inner { get ; init ; }
2121
22+ /// <summary>
23+ /// A unique identifier for add events, persistent across machines and reruns.
24+ /// </summary>
25+ public string AddPublisherRoleEventId { get ; init ; } = "AddPublisherRoleAsync" ;
26+
27+ /// <summary>
28+ /// A unique identifier for remove events, persistent across machines and reruns.
29+ /// </summary>
30+ public string RemovePublisherRoleEventId { get ; init ; } = "RemovePublisherRoleAsync" ;
31+
2232 /// <summary>
2333 /// The repository to use for getting modifiable or readonly publisher instances.
2434 /// </summary>
@@ -67,95 +77,96 @@ public override async Task ApplyEntryUpdateAsync(EventStreamEntry<DagCid> stream
6777 throw new ArgumentNullException ( "Key or Value in updateEvent cannot be null." ) ;
6878 }
6979
70- switch ( streamEntry . EventId )
80+ if ( streamEntry . EventId == AddPublisherRoleEventId )
7181 {
72- case "AddPublisherRoleAsync" :
73- var publisherId = await Client . Dag . GetAsync < string > ( updateEvent . Key , cancel : cancellationToken ) ;
74- var publisher = await PublisherRepository . GetAsync ( publisherId , cancellationToken ) ;
75- if ( publisher is ModifiablePublisher modifiablePublisher )
76- {
77- var publisherRole = await Client . Dag . GetAsync < Role > ( updateEvent . Value , cancel : cancellationToken ) ;
78- var addedPublisherRole = new ModifiablePublisherRole
79- {
80- InnerPublisher = modifiablePublisher ,
81- Role = publisherRole
82- } ;
83-
84- await ApplyAddPublisherRoleEntryAsync ( streamEntry , updateEvent , addedPublisherRole , cancellationToken ) ;
85- }
86- else if ( publisher is ReadOnlyPublisher readOnlyPublisher )
82+ var publisherId = await Client . Dag . GetAsync < string > ( updateEvent . Key , cancel : cancellationToken ) ;
83+ var publisher = await PublisherRepository . GetAsync ( publisherId , cancellationToken ) ;
84+ if ( publisher is ModifiablePublisher modifiablePublisher )
85+ {
86+ var publisherRole = await Client . Dag . GetAsync < Role > ( updateEvent . Value , cancel : cancellationToken ) ;
87+ var addedPublisherRole = new ModifiablePublisherRole
8788 {
88- var publisherRole = await Client . Dag . GetAsync < Role > ( updateEvent . Value , cancel : cancellationToken ) ;
89- var addedPublisherRole = new ReadOnlyPublisherRole
90- {
91- InnerPublisher = readOnlyPublisher ,
92- Role = publisherRole
93- } ;
94-
95- await ApplyAddPublisherRoleEntryAsync ( streamEntry , updateEvent , addedPublisherRole , cancellationToken ) ;
96- }
97- else
89+ InnerPublisher = modifiablePublisher ,
90+ Role = publisherRole
91+ } ;
92+
93+ await ApplyAddPublisherRoleEntryAsync ( streamEntry , updateEvent , addedPublisherRole , cancellationToken ) ;
94+ }
95+ else if ( publisher is ReadOnlyPublisher readOnlyPublisher )
96+ {
97+ var publisherRole = await Client . Dag . GetAsync < Role > ( updateEvent . Value , cancel : cancellationToken ) ;
98+ var addedPublisherRole = new ReadOnlyPublisherRole
9899 {
99- throw new InvalidOperationException ( "Publisher is of an unsupported type." ) ;
100- }
101- break ;
102-
103- case "RemovePublisherRoleAsync" :
104- var removedPublisherId = await Client . Dag . GetAsync < string > ( updateEvent . Key , cancel : cancellationToken ) ;
105- var removedPublisher = await PublisherRepository . GetAsync ( removedPublisherId , cancellationToken ) ;
106- if ( removedPublisher is ModifiablePublisher modifiableRemovedPublisher )
107- {
108- var removedPublisherRole = await Client . Dag . GetAsync < Role > ( updateEvent . Value , cancel : cancellationToken ) ;
109- var removedPublisherRoleInstance = new ModifiablePublisherRole
110- {
111- InnerPublisher = modifiableRemovedPublisher ,
112- Role = removedPublisherRole
113- } ;
114-
115- await ApplyRemovePublisherRoleEntryAsync ( streamEntry , updateEvent , removedPublisherRoleInstance , cancellationToken ) ;
116- }
117- else if ( removedPublisher is ReadOnlyPublisher readOnlyRemovedPublisher )
100+ InnerPublisher = readOnlyPublisher ,
101+ Role = publisherRole
102+ } ;
103+
104+ await ApplyAddPublisherRoleEntryAsync ( streamEntry , updateEvent , addedPublisherRole , cancellationToken ) ;
105+ }
106+ else
107+ {
108+ throw new InvalidOperationException ( "Publisher is of an unsupported type." ) ;
109+ }
110+
111+ return ;
112+ }
113+
114+ if ( streamEntry . EventId == RemovePublisherRoleEventId )
115+ {
116+ var removedPublisherId = await Client . Dag . GetAsync < string > ( updateEvent . Key , cancel : cancellationToken ) ;
117+ var removedPublisher = await PublisherRepository . GetAsync ( removedPublisherId , cancellationToken ) ;
118+ if ( removedPublisher is ModifiablePublisher modifiableRemovedPublisher )
119+ {
120+ var removedPublisherRole = await Client . Dag . GetAsync < Role > ( updateEvent . Value , cancel : cancellationToken ) ;
121+ var removedPublisherRoleInstance = new ModifiablePublisherRole
118122 {
119- var removedPublisherRole = await Client . Dag . GetAsync < Role > ( updateEvent . Value , cancel : cancellationToken ) ;
120- var removedPublisherRoleInstance = new ReadOnlyPublisherRole
121- {
122- InnerPublisher = readOnlyRemovedPublisher ,
123- Role = removedPublisherRole
124- } ;
125-
126- await ApplyRemovePublisherRoleEntryAsync ( streamEntry , updateEvent , removedPublisherRoleInstance , cancellationToken ) ;
127- }
128- else
123+ InnerPublisher = modifiableRemovedPublisher ,
124+ Role = removedPublisherRole
125+ } ;
126+
127+ await ApplyRemovePublisherRoleEntryAsync ( streamEntry , updateEvent , removedPublisherRoleInstance , cancellationToken ) ;
128+ }
129+ else if ( removedPublisher is ReadOnlyPublisher readOnlyRemovedPublisher )
130+ {
131+ var removedPublisherRole = await Client . Dag . GetAsync < Role > ( updateEvent . Value , cancel : cancellationToken ) ;
132+ var removedPublisherRoleInstance = new ReadOnlyPublisherRole
129133 {
130- throw new InvalidOperationException ( "Publisher is of an unsupported type." ) ;
131- }
132- break ;
133-
134- default :
135- throw new InvalidOperationException ( $ "Unknown event id: { streamEntry . EventId } ") ;
134+ InnerPublisher = readOnlyRemovedPublisher ,
135+ Role = removedPublisherRole
136+ } ;
137+
138+ await ApplyRemovePublisherRoleEntryAsync ( streamEntry , updateEvent , removedPublisherRoleInstance , cancellationToken ) ;
139+ }
140+ else
141+ {
142+ throw new InvalidOperationException ( "Publisher is of an unsupported type." ) ;
143+ }
144+ return ;
136145 }
146+
147+ throw new InvalidOperationException ( $ "Unknown event id: { streamEntry . EventId } ") ;
137148 }
138149
139150 /// <inheritdoc/>
140151 public async Task ApplyAddPublisherRoleEntryAsync ( EventStreamEntry < DagCid > streamEntry , ValueUpdateEvent updateEvent , IReadOnlyPublisherRole publisher , CancellationToken cancellationToken )
141152 {
142153 var roleCid = await Client . Dag . PutAsync ( publisher . Role , pin : KuboOptions . ShouldPin , cancel : cancellationToken ) ;
143- Inner . Inner . Publishers = [ .. Inner . Inner . Publishers , new PublisherRole { PublisherId = publisher . Id , Role = ( DagCid ) roleCid } ] ;
154+ Inner . Inner = [ .. Inner . Inner , new PublisherRole { PublisherId = publisher . Id , Role = ( DagCid ) roleCid } ] ;
144155 PublishersAdded ? . Invoke ( this , [ publisher ] ) ;
145156 }
146157
147158 /// <inheritdoc/>
148159 public async Task ApplyRemovePublisherRoleEntryAsync ( EventStreamEntry < DagCid > streamEntry , ValueUpdateEvent updateEvent , IReadOnlyPublisherRole publisher , CancellationToken cancellationToken )
149160 {
150161 var roleCid = await Client . Dag . PutAsync ( publisher . Role , pin : KuboOptions . ShouldPin , cancel : cancellationToken ) ;
151- Inner . Inner . Publishers = [ .. Inner . Inner . Publishers . Where ( x => x . PublisherId != publisher . Id && x . Role != ( DagCid ) roleCid ) ] ;
162+ Inner . Inner = [ .. Inner . Inner . Where ( x => x . PublisherId != publisher . Id && x . Role != ( DagCid ) roleCid ) ] ;
152163 PublishersRemoved ? . Invoke ( this , [ publisher ] ) ;
153164 }
154165
155166 /// <inheritdoc/>
156167 public override Task ResetEventStreamPositionAsync ( CancellationToken cancellationToken )
157168 {
158- Inner . Inner . Publishers = [ ] ;
169+ Inner . Inner = [ ] ;
159170 return Task . CompletedTask ;
160171 }
161172}
0 commit comments