Skip to content

Commit 9d5af5d

Browse files
authored
Merge pull request #35 from Lamparter/connections
Initial implementation of virtual event stream handler for Connections
2 parents 66c38b7 + 1021081 commit 9d5af5d

26 files changed

Lines changed: 525 additions & 189 deletions

src/IReadOnlyConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ public interface IReadOnlyConnection
1313
/// <summary>
1414
/// Gets the value of the connection.
1515
/// </summary>
16-
string Value { get; }
16+
Task<string> GetValueAsync(CancellationToken cancellationToken = default);
1717

1818
/// <summary>
19-
/// Raised when <see cref="Value"/> is updated.
19+
/// Raised when the value is updated.
2020
/// </summary>
2121
event EventHandler<string>? ValueUpdated;
2222
}
Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System.Collections.Generic;
2+
13
namespace WindowsAppCommunity.Sdk;
24

35
/// <summary>
@@ -6,17 +8,17 @@ namespace WindowsAppCommunity.Sdk;
68
public interface IReadOnlyConnectionsCollection
79
{
810
/// <summary>
9-
/// The connections associated with this entity.
11+
/// Gets the connections associated with this entity.
1012
/// </summary>
11-
IReadOnlyConnection[] Connections { get; }
13+
IAsyncEnumerable<IReadOnlyConnection> GetConnectionsAsync(CancellationToken cancellationToken = default);
1214

1315
/// <summary>
14-
/// Raised when items are added to the <see cref="Connections"/> collection.
16+
/// Raised when items are added to the collection.
1517
/// </summary>
1618
event EventHandler<IReadOnlyConnection[]>? ConnectionsAdded;
1719

1820
/// <summary>
19-
/// Raised when items are removed from the <see cref="Connections"/> collection.
21+
/// Raised when items are removed from the collection.
2022
/// </summary>
2123
event EventHandler<IReadOnlyConnection[]>? ConnectionsRemoved;
2224
}

src/Models/IConnections.cs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System.Collections.Generic;
2-
using Ipfs;
1+
using Ipfs;
32

43
namespace WindowsAppCommunity.Sdk.Models;
54

@@ -11,5 +10,21 @@ public interface IConnections
1110
/// <summary>
1211
/// Represents data about external application connections.
1312
/// </summary>
14-
Dictionary<string, DagCid> Connections { get; set; }
13+
Connection[] Connections { get; set; }
14+
}
15+
16+
/// <summary>
17+
/// Represents a published collection of key-value connection pairs.
18+
/// </summary>
19+
public record Connection
20+
{
21+
/// <summary>
22+
/// A unique identifier for this connection.
23+
/// </summary>
24+
public required string Id { get; init; }
25+
26+
/// <summary>
27+
/// A <see cref="DagCid"/> of the connection value.
28+
/// </summary>
29+
public required DagCid Value { get; set; }
1530
}

src/Models/Project.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public record Project : IEntity, IUserRoleCollection, IAccentColor, IProjectColl
6767
/// <summary>
6868
/// Holds information about project assets that have been published for consumption by an end user, such as a Microsoft Store app, a package on nuget.org, a git repo, etc.
6969
/// </summary>
70-
public Dictionary<string, DagCid> Connections { get; set; } = new();
70+
public Connection[] Connections { get; set; } = [];
7171

7272
/// <summary>
7373
/// A flag that indicates whether the profile has requested to be forgotten.

src/Models/Publisher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public record Publisher : IEntity, ILinkCollection, IProjectCollection, IUserRol
6262
/// <summary>
6363
/// Holds information about publisher assets that have been published for consumption by an end user, such as a Microsoft Store app, a package on nuget.org, a git repo, etc.
6464
/// </summary>
65-
public Dictionary<string, DagCid> Connections { get; set; } = [];
65+
public Connection[] Connections { get; set; } = [];
6666

6767
/// <summary>
6868
/// A flag that indicates whether the profile has requested to be forgotten.

src/Models/User.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public record User : IEntity, IConnections, ILinkCollection, IProjectRoleCollect
2727
/// <summary>
2828
/// Represents application connections added by the user.
2929
/// </summary>
30-
public Dictionary<string, DagCid> Connections { get; set; } = [];
30+
public Connection[] Connections { get; set; } = [];
3131

3232
/// <summary>
3333
/// Represents links to external profiles or resources added by the user.

src/Nomad/ModifiableAccentColor.cs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,24 @@ public class ModifiableAccentColor : NomadKuboEventStreamHandler<ValueUpdateEven
2424
/// <inheritdoc />
2525
public event EventHandler<string?>? AccentColorUpdated;
2626

27+
/// <inheritdoc />
28+
public async Task UpdateAccentColorAsync(string? accentColor, CancellationToken cancellationToken)
29+
{
30+
DagCid? valueCid = null;
31+
if (accentColor is not null)
32+
{
33+
Cid cid = await Client.Dag.PutAsync(accentColor, pin: KuboOptions.ShouldPin, cancel: cancellationToken);
34+
valueCid = (DagCid)cid;
35+
}
36+
37+
var updateEvent = new ValueUpdateEvent(null, valueCid, accentColor is null);
38+
39+
var appendedEntry = await AppendNewEntryAsync(targetId: Id, eventId: nameof(UpdateAccentColorAsync), updateEvent, DateTime.UtcNow, cancellationToken);
40+
await ApplyEntryUpdateAsync(appendedEntry, updateEvent, accentColor, cancellationToken);
41+
42+
EventStreamPosition = appendedEntry;
43+
}
44+
2745
/// <inheritdoc />
2846
public override async Task ApplyEntryUpdateAsync(EventStreamEntry<DagCid> eventStreamEntry, ValueUpdateEvent updateEvent, CancellationToken cancellationToken)
2947
{
@@ -70,22 +88,4 @@ public override Task ResetEventStreamPositionAsync(CancellationToken cancellatio
7088

7189
return Task.CompletedTask;
7290
}
73-
74-
/// <inheritdoc />
75-
public async Task UpdateAccentColorAsync(string? accentColor, CancellationToken cancellationToken)
76-
{
77-
DagCid? valueCid = null;
78-
if (accentColor is not null)
79-
{
80-
Cid cid = await Client.Dag.PutAsync(accentColor, pin: KuboOptions.ShouldPin, cancel: cancellationToken);
81-
valueCid = (DagCid)cid;
82-
}
83-
84-
var updateEvent = new ValueUpdateEvent(null, valueCid, accentColor is null);
85-
86-
var appendedEntry = await AppendNewEntryAsync(targetId: Id, eventId: nameof(UpdateAccentColorAsync), updateEvent, DateTime.UtcNow, cancellationToken);
87-
await ApplyEntryUpdateAsync(appendedEntry, updateEvent, accentColor, cancellationToken);
88-
89-
EventStreamPosition = appendedEntry;
90-
}
9191
}

src/Nomad/ModifiableConnection.cs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
using CommunityToolkit.Diagnostics;
2+
using Ipfs;
3+
using OwlCore.ComponentModel;
4+
using OwlCore.Nomad;
5+
using OwlCore.Nomad.Kubo;
6+
using OwlCore.Nomad.Kubo.Events;
7+
8+
namespace WindowsAppCommunity.Sdk.Nomad
9+
{
10+
/// <summary>
11+
/// Represents a modifiable connection.
12+
/// </summary>
13+
public class ModifiableConnection : NomadKuboEventStreamHandler<ValueUpdateEvent>, IModifiableConnection, IDelegable<ReadOnlyConnection>
14+
{
15+
/// <inheritdoc/>
16+
public required ReadOnlyConnection Inner { get; init; }
17+
18+
/// <inheritdoc/>
19+
public string Id => Inner.Id;
20+
21+
/// <inheritdoc/>
22+
public event EventHandler<string>? ValueUpdated;
23+
24+
/// <inheritdoc/>
25+
public async Task UpdateValueAsync(string newValue, CancellationToken cancellationToken = default)
26+
{
27+
cancellationToken.ThrowIfCancellationRequested();
28+
29+
var dagCid = (DagCid)await Client.Dag.PutAsync(newValue, pin: KuboOptions.ShouldPin, cancel: cancellationToken);
30+
var updateEvent = new ValueUpdateEvent(null, dagCid, false);
31+
32+
var appendedEntry = await AppendNewEntryAsync(targetId: Id, eventId: nameof(UpdateValueAsync), updateEvent, DateTime.UtcNow, cancellationToken);
33+
await ApplyEntryUpdateAsync(appendedEntry, updateEvent, newValue, cancellationToken);
34+
35+
EventStreamPosition = appendedEntry;
36+
}
37+
38+
/// <inheritdoc/>
39+
public Task<string> GetValueAsync(CancellationToken cancellationToken = default) => Inner.GetValueAsync(cancellationToken);
40+
41+
/// <inheritdoc />
42+
public override async Task ApplyEntryUpdateAsync(EventStreamEntry<DagCid> eventStreamEntry, ValueUpdateEvent updateEvent, CancellationToken cancellationToken)
43+
{
44+
cancellationToken.ThrowIfCancellationRequested();
45+
46+
if (eventStreamEntry.TargetId != Id)
47+
return;
48+
49+
Guard.IsNotNull(updateEvent.Value);
50+
var updatedValue = await Client.Dag.GetAsync<string>(updateEvent.Value, cancel: cancellationToken);
51+
52+
Guard.IsNotNull(updatedValue);
53+
await ApplyEntryUpdateAsync(eventStreamEntry, updateEvent, updatedValue, cancellationToken);
54+
}
55+
56+
/// <summary>
57+
/// Applies an event stream update event and raises the relevant events.
58+
/// </summary>
59+
/// <param name="eventStreamEntry">The event stream entry to apply.</param>
60+
/// <param name="updateEvent">The update event to apply.</param>
61+
/// <param name="newValue">The resolved new value data for this event.</param>
62+
/// <param name="cancellationToken">A token that can be used to cancel the ongoing operation.</param>
63+
public Task ApplyEntryUpdateAsync(EventStreamEntry<DagCid> eventStreamEntry, ValueUpdateEvent updateEvent, string newValue, CancellationToken cancellationToken)
64+
{
65+
cancellationToken.ThrowIfCancellationRequested();
66+
67+
if (eventStreamEntry.EventId is not nameof(UpdateValueAsync))
68+
return Task.CompletedTask;
69+
70+
Guard.IsNotNull(updateEvent.Value);
71+
72+
Inner.Inner.Value = updateEvent.Value;
73+
ValueUpdated?.Invoke(this, newValue);
74+
75+
return Task.CompletedTask;
76+
}
77+
78+
/// <inheritdoc />
79+
public override Task ResetEventStreamPositionAsync(CancellationToken cancellationToken)
80+
{
81+
EventStreamPosition = null;
82+
return Task.CompletedTask;
83+
}
84+
}
85+
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
using CommunityToolkit.Diagnostics;
4+
using Ipfs;
5+
using OwlCore.ComponentModel;
6+
using OwlCore.Kubo;
7+
using OwlCore.Nomad;
8+
using OwlCore.Nomad.Kubo;
9+
using OwlCore.Nomad.Kubo.Events;
10+
using WindowsAppCommunity.Sdk.Models;
11+
12+
namespace WindowsAppCommunity.Sdk.Nomad;
13+
14+
/// <summary>
15+
/// Represents a modifiable connection collection.
16+
/// </summary>
17+
public class ModifiableConnectionCollection : NomadKuboEventStreamHandler<ValueUpdateEvent>, IModifiableConnectionsCollection, IDelegable<ReadOnlyConnectionCollection>
18+
{
19+
/// <summary>
20+
/// Raised when items are added to the collection.
21+
/// </summary>
22+
public event EventHandler<IReadOnlyConnection[]>? ConnectionsAdded;
23+
24+
/// <summary>
25+
/// Raised when items are removed from the collection.
26+
/// </summary>
27+
public event EventHandler<IReadOnlyConnection[]>? ConnectionsRemoved;
28+
29+
/// <summary>
30+
/// The unique identifier for this collection.
31+
/// </summary>
32+
public required string Id { get; init; }
33+
34+
/// <summary>
35+
/// The inner read-only connections collection.
36+
/// </summary>
37+
public required ReadOnlyConnectionCollection Inner { get; init; }
38+
39+
/// <inheritdoc />
40+
public IAsyncEnumerable<IReadOnlyConnection> GetConnectionsAsync(CancellationToken cancellationToken = default) => Inner.GetConnectionsAsync(cancellationToken);
41+
42+
/// <summary>
43+
/// Adds a connection to this entity.
44+
/// </summary>
45+
public async Task AddConnectionAsync(IReadOnlyConnection connection, CancellationToken cancellationToken)
46+
{
47+
var value = await connection.GetValueAsync(cancellationToken);
48+
49+
var keyCid = (DagCid)await Client.Dag.PutAsync(connection.Id, pin: KuboOptions.ShouldPin, cancel: cancellationToken);
50+
var valueCid = (DagCid)await Client.Dag.PutAsync(value, pin: KuboOptions.ShouldPin, cancel: cancellationToken);
51+
52+
var updateEvent = new ValueUpdateEvent(keyCid, valueCid, Unset:false);
53+
54+
var appendedEntry = await AppendNewEntryAsync(targetId: Id, eventId: nameof(AddConnectionAsync), updateEvent, DateTime.UtcNow, cancellationToken);
55+
await ApplyEntryUpdateAsync(appendedEntry, updateEvent, new Connection { Id = connection.Id, Value = valueCid }, connection, cancellationToken);
56+
57+
EventStreamPosition = appendedEntry;
58+
}
59+
60+
/// <summary>
61+
/// Removes a connection from this entity.
62+
/// </summary>
63+
public async Task RemoveConnectionAsync(IReadOnlyConnection connection, CancellationToken cancellationToken)
64+
{
65+
var existing = Inner.Inner.Connections.FirstOrDefault(x => x.Id == connection.Id);
66+
if (existing == null)
67+
throw new ArgumentException("Connection not found in the collection.", nameof(connection));
68+
69+
var value = await connection.GetValueAsync(cancellationToken);
70+
71+
var keyCid = (DagCid)await Client.Dag.PutAsync(connection.Id, pin: KuboOptions.ShouldPin, cancel: cancellationToken);
72+
var valueCid = (DagCid)await Client.Dag.PutAsync(value, pin: KuboOptions.ShouldPin, cancel: cancellationToken);
73+
74+
var updateEvent = new ValueUpdateEvent(keyCid, valueCid, Unset: true);
75+
76+
var appendedEntry = await AppendNewEntryAsync(targetId: Id, eventId: nameof(RemoveConnectionAsync), updateEvent, DateTime.UtcNow, cancellationToken);
77+
await ApplyEntryUpdateAsync(appendedEntry, updateEvent, new Connection { Id = connection.Id, Value = valueCid }, connection, cancellationToken);
78+
79+
EventStreamPosition = appendedEntry;
80+
}
81+
82+
/// <summary>
83+
/// Applies an event stream update event and raises the relevant events.
84+
/// </summary>
85+
/// <remarks>
86+
/// This method will call <see cref="ReadOnlyConnectionCollection.GetAsync(string, CancellationToken)"/> and create a new instance to pass to the event handlers.
87+
/// <para/>
88+
/// If already have a resolved instance of <see cref="Connection"/>, you should call <see cref="ApplyEntryUpdateAsync(EventStreamEntry{DagCid}, ValueUpdateEvent, Connection, IReadOnlyConnection?, CancellationToken)"/> instead.
89+
/// </remarks>
90+
/// <param name="eventStreamEntry">The event stream entry to apply.</param>
91+
/// <param name="updateEvent">The update event to apply.</param>
92+
/// <param name="cancellationToken">A token that can be used to cancel the ongoing operation.</param>
93+
public override async Task ApplyEntryUpdateAsync(EventStreamEntry<DagCid> eventStreamEntry, ValueUpdateEvent updateEvent, CancellationToken cancellationToken)
94+
{
95+
cancellationToken.ThrowIfCancellationRequested();
96+
97+
if (eventStreamEntry.TargetId != Id)
98+
return;
99+
100+
Guard.IsNotNull(updateEvent.Value);
101+
var (connection, _) = await Client.ResolveDagCidAsync<Connection>(updateEvent.Value.Value, nocache: !KuboOptions.UseCache, cancellationToken);
102+
103+
Guard.IsNotNull(connection);
104+
await ApplyEntryUpdateAsync(eventStreamEntry, updateEvent, connection, null, cancellationToken);
105+
}
106+
107+
/// <summary>
108+
/// Applies an event stream update event and raises the relevant events.
109+
/// </summary>
110+
/// <param name="eventStreamEntry">The event stream entry to apply.</param>
111+
/// <param name="updateEvent">The update event to apply.</param>
112+
/// <param name="connection">The resolved connection data for this event.</param>
113+
/// <param name="connectionInstance">The existing instance of <see cref="IReadOnlyConnection"/> to emit on raised collection add/remove events.</param>
114+
/// <param name="cancellationToken">A token that can be used to cancel the ongoing operation.</param>
115+
public async Task ApplyEntryUpdateAsync(EventStreamEntry<DagCid> eventStreamEntry, ValueUpdateEvent updateEvent, Connection connection, IReadOnlyConnection? connectionInstance, CancellationToken cancellationToken)
116+
{
117+
cancellationToken.ThrowIfCancellationRequested();
118+
119+
switch (eventStreamEntry.EventId)
120+
{
121+
case nameof(AddConnectionAsync):
122+
{
123+
Inner.Inner.Connections = [.. Inner.Inner.Connections, connection];
124+
var connectionFile = connectionInstance ??= await Inner.GetAsync(connection.Id, cancellationToken);
125+
ConnectionsAdded?.Invoke(this, [connectionFile]);
126+
break;
127+
}
128+
case nameof(RemoveConnectionAsync):
129+
{
130+
Inner.Inner.Connections = [.. Inner.Inner.Connections.Except([connection])];
131+
var connectionFile = connectionInstance ??= await Inner.GetAsync(connection.Id, cancellationToken);
132+
ConnectionsRemoved?.Invoke(this, [connectionFile]);
133+
break;
134+
}
135+
}
136+
}
137+
138+
/// <inheritdoc/>
139+
public override Task ResetEventStreamPositionAsync(CancellationToken cancellationToken)
140+
{
141+
cancellationToken.ThrowIfCancellationRequested();
142+
143+
// TODO: Reset inner virtual event stream handlers
144+
// Connections, Links, Images
145+
throw new NotImplementedException();
146+
}
147+
}

0 commit comments

Comments
 (0)