Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
git submodule update --init
cd ${GITHUB_WORKSPACE}/flex
mkdir build && cd build
cmake .. -DCMAKE_INSTALL_PREFIX=/opt/graphscope -DCMAKE_BUILD_TYPE=DEBUG && sudo make -j$(nproc)
cmake .. -DCMAKE_INSTALL_PREFIX=/opt/graphscope -DCMAKE_BUILD_TYPE=DEBUG -DLABEL_TYPE="uint16_t" && sudo make -j$(nproc)
# package the build artifacts
cd .. && tar -zcf build.tar.gz build

Expand Down Expand Up @@ -502,7 +502,7 @@ jobs:
git submodule update --init
cd flex/engines/graph_db/grin
mkdir build && cd build
cmake .. && sudo make -j$(nproc)
cmake .. -DLABEL_TYPE="uint16_t" && sudo make -j$(nproc)
export FLEX_DATA_DIR=../../../../interactive/examples/modern_graph/
${GITHUB_WORKSPACE}/flex/build/bin/bulk_loader -g ../../../../interactive/examples/modern_graph/graph.yaml -l ../../../../interactive/examples/modern_graph/bulk_load.yaml -d ./data/
rm -r ./data/wal
Expand Down
8 changes: 8 additions & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ option(USE_PTHASH "Whether to use pthash" OFF)
option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build optimized code on host
option(USE_STATIC_ARROW "Whether to use static arrow" OFF) # Whether to link arrow statically, default is OFF
option(BUILD_WITH_OTEL "Whether to build with opentelemetry-cpp" OFF) # Whether to build with opentelemetry-cpp, default is OFF
# option(LABEL_TYPE "The label type of the graph, valid values: uint16, uint8" "uint8") # The type of the label in the graph
SET(LABEL_TYPE "uint8_t" CACHE STRING "The label type of the graph, valid values: uint16_t, uint8_t")

#print options
message(STATUS "Build test: ${BUILD_TEST}")
Expand Down Expand Up @@ -58,6 +60,12 @@ if(USE_PTHASH)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/third_party/murmurhash)
endif()

# check label_type in {uint8, uint16}
if(NOT LABEL_TYPE STREQUAL "uint8_t" AND NOT LABEL_TYPE STREQUAL "uint16_t")
message(FATAL_ERROR "LABEL_TYPE must be uint8 or uint16, but got ${LABEL_TYPE}")
endif()
add_compile_definitions(FLEX_LABEL_TYPE=${LABEL_TYPE})

set(DEFAULT_BUILD_TYPE "Release")
if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
message(STATUS "Setting build type to '${DEFAULT_BUILD_TYPE}' as none was specified.")
Expand Down
161 changes: 79 additions & 82 deletions flex/engines/graph_db/database/update_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ UpdateTransaction::UpdateTransaction(const GraphDBSession& session,
extra_vertex_properties_[i].resize(4096);
}

size_t csr_num = 2 * vertex_label_num_ * vertex_label_num_ * edge_label_num_;
size_t csr_num = 2 * schema().get_edge_triplet_num();
added_edges_.resize(csr_num);
updated_edge_data_.resize(csr_num);
}
Expand Down Expand Up @@ -560,7 +560,6 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,
updated_edge_data;

size_t vertex_label_num = graph.schema().vertex_label_num();
size_t edge_label_num = graph.schema().edge_label_num();

for (label_t idx = 0; idx < vertex_label_num; ++idx) {
if (graph.lf_indexers_[idx].get_type() == PropertyType::kInt64) {
Expand Down Expand Up @@ -603,7 +602,7 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,
extra_vertex_properties[i].resize(4096);
}

size_t csr_num = 2 * vertex_label_num * vertex_label_num * edge_label_num;
size_t csr_num = 2 * graph.schema().get_edge_triplet_num();
added_edges.resize(csr_num);
updated_edge_data.resize(csr_num);

Expand Down Expand Up @@ -681,16 +680,14 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,

size_t UpdateTransaction::get_in_csr_index(label_t src_label, label_t dst_label,
label_t edge_label) const {
return src_label * vertex_label_num_ * edge_label_num_ +
dst_label * edge_label_num_ + edge_label;
return graph_.schema().get_edge_triplet_id(src_label, dst_label, edge_label);
}

size_t UpdateTransaction::get_out_csr_index(label_t src_label,
label_t dst_label,
label_t edge_label) const {
return src_label * vertex_label_num_ * edge_label_num_ +
dst_label * edge_label_num_ + edge_label +
vertex_label_num_ * vertex_label_num_ * edge_label_num_;
return graph_.schema().get_edge_triplet_id(src_label, dst_label, edge_label) +
graph_.schema().get_edge_triplet_num();
}

bool UpdateTransaction::oid_to_lid(label_t label, const Any& oid,
Expand Down Expand Up @@ -820,89 +817,89 @@ void UpdateTransaction::applyVerticesUpdates() {
}

void UpdateTransaction::applyEdgesUpdates() {
for (label_t src_label = 0; src_label < vertex_label_num_; ++src_label) {
for (label_t dst_label = 0; dst_label < vertex_label_num_; ++dst_label) {
for (label_t edge_label = 0; edge_label < edge_label_num_; ++edge_label) {
size_t oe_csr_index =
get_out_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[oe_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}
for (size_t index = 0; index < graph_.schema().get_edge_triplet_num();
++index) {
auto edge_triplet = graph_.schema().get_edge_triplet(index);
label_t src_label = std::get<0>(edge_triplet);
label_t dst_label = std::get<1>(edge_triplet);
label_t edge_label = std::get<2>(edge_triplet);
size_t oe_csr_index = get_out_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[oe_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}

std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_outgoing_edges_mut(src_label, pair.first, dst_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_outgoing_edges_mut(src_label, pair.first, dst_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
}
}

for (auto& pair : added_edges_[oe_csr_index]) {
vid_t v = pair.first;
auto& add_list = pair.second;
for (auto& pair : added_edges_[oe_csr_index]) {
vid_t v = pair.first;
auto& add_list = pair.second;

if (add_list.empty()) {
continue;
}
std::sort(add_list.begin(), add_list.end());
auto& edge_data = updated_edge_data_[oe_csr_index].at(v);
for (size_t idx = 0; idx < add_list.size(); ++idx) {
if (idx && add_list[idx] == add_list[idx - 1])
continue;
auto u = add_list[idx];
auto value = edge_data.at(u).first;
grape::InArchive iarc;
serialize_field(iarc, value);
grape::OutArchive oarc(std::move(iarc));
graph_.IngestEdge(src_label, v, dst_label, u, edge_label,
timestamp_, oarc, alloc_);
}
}
if (add_list.empty()) {
continue;
}
std::sort(add_list.begin(), add_list.end());
auto& edge_data = updated_edge_data_[oe_csr_index].at(v);
for (size_t idx = 0; idx < add_list.size(); ++idx) {
if (idx && add_list[idx] == add_list[idx - 1])
continue;
auto u = add_list[idx];
auto value = edge_data.at(u).first;
grape::InArchive iarc;
serialize_field(iarc, value);
grape::OutArchive oarc(std::move(iarc));
graph_.IngestEdge(src_label, v, dst_label, u, edge_label, timestamp_,
oarc, alloc_);
}
}
}

for (label_t src_label = 0; src_label < vertex_label_num_; ++src_label) {
for (label_t dst_label = 0; dst_label < vertex_label_num_; ++dst_label) {
for (label_t edge_label = 0; edge_label < edge_label_num_; ++edge_label) {
size_t ie_csr_index =
get_in_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[ie_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}
std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_incoming_edges_mut(dst_label, pair.first, src_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
for (size_t index = 0; index < graph_.schema().get_edge_triplet_num();
++index) {
auto edge_triplet = graph_.schema().get_edge_triplet(index);
label_t src_label = std::get<0>(edge_triplet);
label_t dst_label = std::get<1>(edge_triplet);
label_t edge_label = std::get<2>(edge_triplet);
size_t ie_csr_index = get_in_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[ie_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}
std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_incoming_edges_mut(dst_label, pair.first, src_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions flex/engines/graph_db/grin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ set(GRIN_READER_VERSION ${GRIN_READER_MAJOR_VERSION}.${GRIN_READER_MINOR_VERSION

project(grin_reader LANGUAGES C CXX VERSION ${GRIN_READER_VERSION})

# This option should be aligned with flex/CMakeLists.txt
SET(LABEL_TYPE "uint8_t" CACHE STRING "The label type of the graph, valid values: uint16_t, uint8_t")

# check label_type in {uint8, uint16}
if(NOT LABEL_TYPE STREQUAL "uint8_t" AND NOT LABEL_TYPE STREQUAL "uint16_t")
message(FATAL_ERROR "LABEL_TYPE must be uint8 or uint16, but got ${LABEL_TYPE}")
endif()
add_compile_definitions(FLEX_LABEL_TYPE=${LABEL_TYPE})

# Set flags
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
Expand Down
7 changes: 4 additions & 3 deletions flex/engines/graph_db/grin/src/property/property.cc
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,12 @@ void grin_destroy_edge_property(GRIN_GRAPH g, GRIN_EDGE_PROPERTY ep) {}
GRIN_DATATYPE grin_get_edge_property_datatype(GRIN_GRAPH g,
GRIN_EDGE_PROPERTY ep) {
auto _g = static_cast<GRIN_GRAPH_T*>(g);
auto src_label_i = (ep >> 16) & 0xff;
auto edge_triplet_tuple = _g->g.schema().get_edge_triplet(ep);
auto src_label_i = std::get<0>(edge_triplet_tuple);
auto dst_label_i = std::get<1>(edge_triplet_tuple);
auto edge_label_i = std::get<2>(edge_triplet_tuple);
const auto& src_label = _g->g.schema().get_vertex_label_name(src_label_i);
auto dst_label_i = (ep >> 8) & 0xff;
const auto& dst_label = _g->g.schema().get_vertex_label_name(dst_label_i);
auto edge_label_i = ep & 0xff;
const auto& edge_label = _g->g.schema().get_edge_label_name(edge_label_i);
const auto& type =
_g->g.schema().get_edge_properties(src_label, dst_label, edge_label);
Expand Down
8 changes: 5 additions & 3 deletions flex/engines/graph_db/grin/src/property/propertylist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ GRIN_EDGE_PROPERTY_LIST grin_get_edge_property_list_by_type(GRIN_GRAPH g,
GRIN_EDGE_PROPERTY_LIST_T* p = new GRIN_EDGE_PROPERTY_LIST_T();

auto _g = static_cast<GRIN_GRAPH_T*>(g);
auto src_label_i = et >> 16;
auto edge_triplet_tuple = _g->g.schema().get_edge_triplet(et);
auto src_label_i = std::get<0>(edge_triplet_tuple);
auto dst_label_i = std::get<1>(edge_triplet_tuple);
auto edge_label_i = std::get<2>(edge_triplet_tuple);

auto src_label = _g->g.schema().get_vertex_label_name(src_label_i);
auto dst_label_i = (et >> 8) & (0xff);
auto dst_label = _g->g.schema().get_vertex_label_name(dst_label_i);
auto edge_label_i = et & 0xff;
auto edge_label = _g->g.schema().get_edge_label_name(edge_label_i);
auto sz = _g->g.schema()
.get_edge_properties(src_label, dst_label, edge_label)
Expand Down
8 changes: 4 additions & 4 deletions flex/engines/graph_db/grin/src/property/topology.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ size_t grin_get_vertex_num_by_type(GRIN_GRAPH g, GRIN_VERTEX_TYPE vt) {

#ifdef GRIN_WITH_EDGE_PROPERTY
size_t grin_get_edge_num_by_type(GRIN_GRAPH g, GRIN_EDGE_TYPE et) {
auto src_label = et >> 16;
auto dst_label = (et >> 8) & (0xff);
auto edge_label = et & (0xff);
auto _g = static_cast<GRIN_GRAPH_T*>(g);

auto edge_triplet_tuple = _g->g.schema().get_edge_triplet(et);
auto src_label = std::get<0>(edge_triplet_tuple);
auto dst_label = std::get<1>(edge_triplet_tuple);
auto edge_label = std::get<2>(edge_triplet_tuple);
auto oe = _g->g.get_oe_csr(src_label, dst_label, edge_label);
auto vertex_num = _g->g.vertex_num(src_label);
size_t edge_num = 0;
Expand Down
Loading