Skip to content

Commit ee17a33

Browse files
committed
Add particle patches to streaming example, using local_value
1 parent 8a7ec62 commit ee17a33

2 files changed

Lines changed: 98 additions & 5 deletions

File tree

examples/10_streaming_read.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include "openPMD/auxiliary/StringManip.hpp"
12
#include <openPMD/openPMD.hpp>
23

34
#include <algorithm>
@@ -52,6 +53,24 @@ int main()
5253
extents[i] = rc.getExtent();
5354
}
5455

56+
auto e_patches = iteration.particles["e"].particlePatches;
57+
for (auto key :
58+
{"numParticles", "numParticlesOffset", "offset", "extent"})
59+
{
60+
for (auto &rc : e_patches[key])
61+
{
62+
std::cout << "Chunks for '" << rc.second.myPath().openPMDPath()
63+
<< "':";
64+
for (auto const &chunk : rc.second.availableChunks())
65+
{
66+
std::cout << "\n\tRank " << chunk.sourceID << "\t"
67+
<< auxiliary::format_vec(chunk.offset) << "\t"
68+
<< auxiliary::format_vec(chunk.extent);
69+
}
70+
std::cout << std::endl;
71+
}
72+
}
73+
5574
// The iteration can be closed in order to help free up resources.
5675
// The iteration's content will be flushed automatically.
5776
// An iteration once closed cannot (yet) be reopened.

examples/10_streaming_write.cpp

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <mpi.h>
12
#include <openPMD/openPMD.hpp>
23

34
#include <algorithm>
@@ -19,8 +20,22 @@ int main()
1920
return 0;
2021
}
2122

23+
int mpi_rank{0}, mpi_size{1};
24+
25+
#if openPMD_HAVE_MPI
26+
MPI_Init(nullptr, nullptr);
27+
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
28+
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
29+
#endif
30+
2231
// open file for writing
23-
Series series = Series("electrons.sst", Access::CREATE, R"(
32+
Series series = Series(
33+
"electrons.sst",
34+
Access::CREATE,
35+
#if openPMD_HAVE_MPI
36+
MPI_COMM_WORLD,
37+
#endif
38+
R"(
2439
{
2540
"adios2": {
2641
"engine": {
@@ -29,11 +44,13 @@ int main()
2944
}
3045
}
3146
}
32-
})");
47+
})"
48+
49+
);
3350

3451
Datatype datatype = determineDatatype<position_t>();
3552
constexpr unsigned long length = 10ul;
36-
Extent global_extent = {length};
53+
Extent global_extent = {mpi_size * length};
3754
Dataset dataset = Dataset(datatype, global_extent);
3855
std::shared_ptr<position_t> local_data(
3956
new position_t[length], [](position_t const *ptr) { delete[] ptr; });
@@ -49,13 +66,66 @@ int main()
4966
Iteration iteration = iterations[i];
5067
Record electronPositions = iteration.particles["e"]["position"];
5168

52-
std::iota(local_data.get(), local_data.get() + length, i * length);
69+
std::iota(
70+
local_data.get(),
71+
local_data.get() + length,
72+
i * length * mpi_size + mpi_rank * length);
5373
for (auto const &dim : {"x", "y", "z"})
5474
{
5575
RecordComponent pos = electronPositions[dim];
5676
pos.resetDataset(dataset);
57-
pos.storeChunk(local_data, Offset{0}, global_extent);
77+
pos.storeChunk(local_data, Offset{length * mpi_rank}, {length});
78+
}
79+
80+
// Use the `local_value` ADIOS2 dataset shape to send a dataset not via
81+
// the data plane, but the control plane of ADIOS2 SST. This is
82+
// advisable for datasets where each rank contributes only a single item
83+
// since the control plane performs data aggregation, thus avoiding
84+
// fully interconnected communication meshes for data that needs to be
85+
// read by each reader. A local value dataset can only contain a single
86+
// item per MPI rank, forming an array of length equal to the MPI size.
87+
88+
auto e_patches = iteration.particles["e"].particlePatches;
89+
auto numParticles = e_patches["numParticles"];
90+
auto numParticlesOffset = e_patches["numParticlesOffset"];
91+
for (auto rc : {&numParticles, &numParticlesOffset})
92+
{
93+
rc->resetDataset(
94+
{Datatype::ULONG,
95+
{Extent::value_type(mpi_size)},
96+
R"(adios2.dataset.shape = "local_value")"});
5897
}
98+
numParticles.storeChunk(
99+
std::make_unique<unsigned long>(10), {size_t(mpi_rank)}, {1});
100+
numParticlesOffset.storeChunk(
101+
std::make_unique<unsigned long>(10 * ((unsigned long)mpi_rank)),
102+
{size_t(mpi_rank)},
103+
{1});
104+
auto offset = e_patches["offset"];
105+
for (auto const &dim : {"x", "y", "z"})
106+
{
107+
auto rc = offset[dim];
108+
rc.resetDataset(
109+
{Datatype::ULONG,
110+
{Extent::value_type(mpi_size)},
111+
R"(adios2.dataset.shape = "local_value")"});
112+
rc.storeChunk(
113+
std::make_unique<unsigned long>((unsigned long)mpi_rank),
114+
{size_t(mpi_rank)},
115+
{1});
116+
}
117+
auto extent = e_patches["extent"];
118+
for (auto const &dim : {"x", "y", "z"})
119+
{
120+
auto rc = extent[dim];
121+
rc.resetDataset(
122+
{Datatype::ULONG,
123+
{Extent::value_type(mpi_size)},
124+
R"(adios2.dataset.shape = "local_value")"});
125+
rc.storeChunk(
126+
std::make_unique<unsigned long>(1), {size_t(mpi_rank)}, {1});
127+
}
128+
59129
iteration.close();
60130
}
61131

@@ -67,6 +137,10 @@ int main()
67137
*/
68138
series.close();
69139

140+
#if openPMD_HAVE_MPI
141+
MPI_Finalize();
142+
#endif
143+
70144
return 0;
71145
#else
72146
std::cout << "The streaming example requires that openPMD has been built "

0 commit comments

Comments
 (0)