Skip to content

Commit be21baa

Browse files
committed
nodepp V1.4.0_4
1 parent 0dc8378 commit be21baa

5 files changed

Lines changed: 237 additions & 42 deletions

File tree

examples/7-worker-channel.cpp

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#include <nodepp/nodepp.h>
2+
#include <nodepp/worker.h>
3+
#include <nodepp/channel.h>
4+
5+
using namespace nodepp;
6+
7+
channel_t<coroutine_t> ch1, ch2;
8+
9+
int worker_1(){
10+
coStart
11+
12+
while( true ){
13+
14+
ch2.write( coroutine_t( COROUTINE(){
15+
console::log( "worker1 -> worker2", process::now() );
16+
return -1; }) );
17+
18+
if ( !ch1.empty() ){
19+
for( auto x: ch1.read() ){
20+
process::add( x );
21+
}}
22+
23+
coDelay(1000); }
24+
25+
coStop
26+
}
27+
28+
int worker_2(){
29+
coStart
30+
31+
while( true ){
32+
33+
ch1.write( coroutine_t( COROUTINE(){
34+
console::log( "worker2 -> worker1", process::now() );
35+
return -1; }) );
36+
37+
if ( !ch2.empty() ){
38+
for( auto x: ch2.read() ){
39+
process::add( x );
40+
}}
41+
42+
coDelay(1000); }
43+
44+
coStop
45+
}
46+
47+
void onMain(){
48+
49+
worker::add([=](){
50+
process::add([=](){ return worker_1(); });
51+
process::wait();
52+
return -1; });
53+
54+
worker::add([=](){
55+
process::add([=](){ return worker_2(); });
56+
process::wait();
57+
return -1; });
58+
59+
}

include/nodepp/channel.h

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2023 The Nodepp Project Authors. All Rights Reserved.
3+
*
4+
* Licensed under the MIT (the "License"). You may not use
5+
* this file except in compliance with the License. You can obtain a copy
6+
* in the file LICENSE in the source distribution or at
7+
* https://github.com/NodeppOfficial/nodepp/blob/main/LICENSE
8+
*/
9+
10+
/*────────────────────────────────────────────────────────────────────────────*/
11+
12+
#ifndef NODEPP_CHANNEL
13+
#define NODEPP_CHANNEL
14+
15+
/*────────────────────────────────────────────────────────────────────────────*/
16+
17+
#include "any.h"
18+
#include "mutex.h"
19+
#include "worker.h"
20+
21+
/*────────────────────────────────────────────────────────────────────────────*/
22+
23+
namespace nodepp { template< class T > class channel_t {
24+
private:
25+
26+
struct NODE {
27+
/*--------*/ queue_t<T> queue;
28+
ulong limit; mutex_t mut;
29+
}; ptr_t<NODE> obj;
30+
31+
public:
32+
33+
channel_t( ulong limit=0 ) noexcept : obj( new NODE() ){ obj->limit=limit; }
34+
35+
/*─······································································─*/
36+
37+
bool empty() const noexcept { return obj->queue.empty(); }
38+
ulong size() const noexcept { return obj->queue.size (); }
39+
40+
void free () const noexcept { clear(); }
41+
void clear() const noexcept { obj->mut.lock([&](){
42+
obj->queue.clear();
43+
}); }
44+
45+
/*─······································································─*/
46+
47+
template< class... V >
48+
int write( const V&... args ) const noexcept {
49+
int x=0; obj->mut.lock([&](){ iterator::map( [&]( T clb ){
50+
if( obj->limit>0 && obj->queue.size()>=obj->limit )
51+
{ return; } obj->queue.push( clb );
52+
}, args... ); }); return x; }
53+
54+
ptr_t<T> read() const noexcept {
55+
ptr_t<T> out( size() ); obj->mut.lock([&](){
56+
if( obj->queue.empty() ){ return; } int x=0;
57+
obj->queue.map([&]( T& data ){ out[x] = data; x++; });
58+
obj->queue.clear(); }); return out; }
59+
60+
};}
61+
62+
/*────────────────────────────────────────────────────────────────────────────*/
63+
64+
#endif

include/nodepp/coroutine.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ namespace nodepp { class coroutine_t {
3030

3131
public:
3232

33-
coroutine_t( T callback ) noexcept : obj( 0UL, NODE() ) { obj->callback = callback; }
33+
template< class T >
34+
coroutine_t( const T& callback ) noexcept : obj( new NODE() ) { obj->callback = callback; }
3435

35-
coroutine_t() noexcept : obj( 0UL, NODE() ) { obj->alive = 0; }
36+
coroutine_t() noexcept : obj( new NODE() ) { obj->alive = 0; }
3637

3738
/*─······································································─*/
3839

main.cpp

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,59 @@
11
#include <nodepp/nodepp.h>
22
#include <nodepp/worker.h>
3-
#include <nodepp/http.h>
4-
#include <nodepp/date.h>
5-
6-
/*────────────────────────────────────────────────────────────────────────────*/
3+
#include <nodepp/channel.h>
74

85
using namespace nodepp;
96

10-
/*────────────────────────────────────────────────────────────────────────────*/
7+
channel_t<coroutine_t> ch1, ch2;
8+
9+
int worker_1(){
10+
coStart
1111

12-
string_t format( string_t method, string_t path, string_t data ) {
13-
return regex::format( ".${0}.${1}.${2}.",
14-
!method.empty() ? encoder::base64::atob(method) : "",
15-
!path .empty() ? encoder::base64::atob(path) : "",
16-
!data .empty() ? encoder::base64::atob(data) : ""
17-
);
18-
}
12+
while( true ){
13+
14+
ch2.write( coroutine_t( COROUTINE(){
15+
console::log( "worker1 -> worker2", process::now() );
16+
return -1; }) );
1917

20-
/*────────────────────────────────────────────────────────────────────────────*/
18+
if ( !ch1.empty() ){
19+
for( auto x: ch1.read() ){
20+
process::add( x );
21+
}}
2122

22-
void next( string_t message ) {
23-
24-
queue_t<ulong> raw;
23+
coDelay(1000); }
2524

26-
for( ulong x=0; x<message.size(); x++ ){
27-
if ( message[x] != '.' ){ continue; } raw.push( x ); }
25+
coStop
26+
}
2827

29-
auto idx = array_t<ulong>( raw.data() );
28+
int worker_2(){
29+
coStart
3030

31-
while( !idx.empty() ){ auto tmp = idx.splice( 0, 4 );
32-
if ( tmp.size()<4 ){ break; }
31+
while( true ){
3332

34-
auto method = encoder::base64::btoa( message.slice_view( tmp[0]+1, tmp[1] ) );
35-
auto path = encoder::base64::btoa( message.slice_view( tmp[1]+1, tmp[2] ) );
36-
auto msg = encoder::base64::btoa( message.slice_view( tmp[2]+1, tmp[3] ) );
33+
ch1.write( coroutine_t( COROUTINE(){
34+
console::log( "worker2 -> worker1", process::now() );
35+
return -1; }) );
3736

38-
console::log(
39-
tmp[0], tmp[3],
40-
"mth:", method,
41-
"pth:", path ,
42-
"msg:", msg
43-
);
37+
if ( !ch2.empty() ){
38+
for( auto x: ch2.read() ){
39+
process::add( x );
40+
}}
4441

45-
}
42+
coDelay(1000); }
4643

44+
coStop
4745
}
4846

49-
/*────────────────────────────────────────────────────────────────────────────*/
50-
5147
void onMain(){
5248

53-
next(
54-
format( "aa", "bb", "cc" )+
55-
format( "aa", "bb", "cc" )+
56-
format( "aa", "bb", "cc" )
57-
);
49+
worker::add([=](){
50+
process::add([=](){ return worker_1(); });
51+
process::wait();
52+
return -1; });
5853

59-
}
54+
worker::add([=](){
55+
process::add([=](){ return worker_2(); });
56+
process::wait();
57+
return -1; });
6058

61-
/*────────────────────────────────────────────────────────────────────────────*/
59+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
#include <nodepp/nodepp.h>
2+
#include <nodepp/worker.h>
3+
#include <nodepp/event.h>
4+
5+
using namespace nodepp;
6+
7+
event_t<coroutine_t> ev1, ev2;
8+
9+
int worker_1(){
10+
static queue_t<coroutine_t> que;
11+
static mutex_t /*--------*/ mut;
12+
coStart
13+
14+
ev1.on([=]( coroutine_t clb ){ mut.lock([&](){
15+
que.push( clb );
16+
}); });
17+
18+
while( true ){
19+
20+
ev2.emit( coroutine_t( COROUTINE(){
21+
console::log( "worker1 -> worker2", process::now() );
22+
return -1; }) );
23+
24+
if(!que.empty() ){ mut.lock([&](){
25+
que.map([=]( coroutine_t clb ){
26+
process::add( clb );
27+
}); que.clear();
28+
}); }
29+
30+
coDelay(1000); }
31+
32+
coStop
33+
}
34+
35+
int worker_2(){
36+
static queue_t<coroutine_t> que;
37+
static mutex_t /*--------*/ mut;
38+
coStart
39+
40+
ev2.on([=]( coroutine_t clb ){ mut.lock([&](){
41+
que.push( clb );
42+
}); });
43+
44+
while( true ){
45+
46+
ev1.emit( coroutine_t( COROUTINE(){
47+
console::log( "worker2 -> worker1", process::now() );
48+
return -1; }) );
49+
50+
if(!que.empty() ){ mut.lock([&](){
51+
que.map([=]( coroutine_t clb ){
52+
process::add( clb );
53+
}); que.clear();
54+
}); }
55+
56+
coDelay(1000); }
57+
58+
coStop
59+
}
60+
61+
void onMain(){
62+
63+
worker::add([=](){
64+
process::add([=](){ return worker_1(); });
65+
process::wait();
66+
return -1; });
67+
68+
worker::add([=](){
69+
process::add([=](){ return worker_2(); });
70+
process::wait();
71+
return -1; });
72+
73+
}

0 commit comments

Comments
 (0)