Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit d334d8b

Browse files
authored
Remote registry minimized memory usage + node cluster increase metadata size (#272)
1 parent 34c2fb9 commit d334d8b

File tree

9 files changed

+198
-18
lines changed

9 files changed

+198
-18
lines changed

‎packages/cluster-nodejs/package.json‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"build-rollup": "rollup -c",
1717
"lint": "tslint '{src,tests}/**/*.{ts,tsx}' --fix",
1818
"prettier": "prettier --write '{src,tests}/**/*.{ts,tsx}'",
19-
"test": "jest --config jest.config.js"
19+
"test": "jest --config jest.config.js --forceExit"
2020
},
2121
"author": "Scalecube (https://github.com/scalecube/scalecube-js)",
2222
"devDependencies": {

‎packages/cluster-nodejs/src/Cluster/JoinCluster.ts‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export const joinCluster: ClusterApi.JoinCluster = (options: ClusterApi.ClusterO
2626
pingReqTimeout: 60, // optional
2727
pingReqGroupSize: 3, // optional
2828
suspectTimeout: 60, // optional
29-
udp: { maxDgramSize: 512 }, // optional
29+
udp: { maxDgramSize: 4096 }, // optional
3030
preferCurrentMeta: true, // optional
3131
};
3232

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import { joinCluster } from '../../src';
2+
import { ClusterEvent } from '@scalecube/api/lib/cluster';
3+
import { getFullAddress } from '@scalecube/utils';
4+
5+
describe('Member metadata size limit', () => {
6+
test('metadata size should not be limited', (done) => {
7+
const address1 = {
8+
protocol: 'ws',
9+
host: '127.0.0.1',
10+
port: 8125,
11+
path: '',
12+
};
13+
14+
const address2 = {
15+
protocol: 'ws',
16+
host: '127.0.0.1',
17+
port: 8126,
18+
path: '',
19+
};
20+
21+
const publish: any[] = [
22+
{
23+
'ws://192.168.0.1:1000/some/path': {
24+
greeting: {
25+
greet: 0,
26+
greet1: 0,
27+
greet2: 0,
28+
greet3: 0,
29+
greet4: 0,
30+
greet5: 0,
31+
greet6: 0,
32+
greet7: 0,
33+
greet8: 0,
34+
},
35+
stream: {
36+
greet0: 1,
37+
greet1: 1,
38+
greet2: 1,
39+
greet3: 1,
40+
greet4: 1,
41+
greet5: 1,
42+
greet6: 1,
43+
greet7: 1,
44+
greet8: 1,
45+
},
46+
service1: {
47+
greet: 0,
48+
greet1: 0,
49+
greet2: 0,
50+
greet3: 0,
51+
greet4: 0,
52+
greet5: 0,
53+
greet6: 0,
54+
greet7: 0,
55+
greet8: 0,
56+
},
57+
service2: {
58+
greet: 0,
59+
greet1: 0,
60+
greet2: 0,
61+
greet3: 0,
62+
greet4: 0,
63+
greet5: 0,
64+
greet6: 0,
65+
greet7: 0,
66+
greet8: 0,
67+
},
68+
},
69+
},
70+
];
71+
72+
joinCluster({
73+
address: address1,
74+
itemsToPublish: publish,
75+
});
76+
77+
const node2 = joinCluster({
78+
address: address2,
79+
seedAddress: [address1],
80+
itemsToPublish: [],
81+
});
82+
node2.listen$().subscribe((res: ClusterEvent) => {
83+
expect(res).toMatchObject({
84+
from: getFullAddress(address1),
85+
items: publish,
86+
type: 'ADDED',
87+
});
88+
done();
89+
});
90+
});
91+
});

‎packages/scalecube-microservice/src/Microservices/MicroserviceInstance.ts‎

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,35 @@
11
import { DiscoveryApi, MicroserviceApi } from '@scalecube/api';
22
import { saveToLogs } from '@scalecube/utils';
3-
import { tap } from 'rxjs/operators';
3+
import { map,tap } from 'rxjs/operators';
44
import { GetServiceFactoryOptions, SetMicroserviceInstanceOptions } from '../helpers/types';
55
import { createProxy } from '../Proxy/createProxy';
66
import { destroy } from './Destroy';
77
import { createServiceCall } from '../ServiceCall/ServiceCall';
8+
import { ServiceDiscoveryEvent } from '@scalecube/api/lib/discovery';
9+
import { restore } from './endpointsUtil';
810

911
export const setMicroserviceInstance = (options: SetMicroserviceInstanceOptions): MicroserviceApi.Microservice => {
1012
const { transportClient, serverStop, discoveryInstance, debug, defaultRouter, microserviceContext } = options;
1113

1214
const { remoteRegistry } = microserviceContext;
1315

14-
discoveryInstance &&
15-
discoveryInstance
16-
.discoveredItems$()
17-
.pipe(printLogs(microserviceContext.whoAmI, debug))
18-
.subscribe(remoteRegistry.update);
19-
2016
const serviceFactoryOptions = getServiceFactoryOptions({
2117
microserviceContext,
2218
transportClient,
2319
defaultRouter,
2420
});
21+
22+
discoveryInstance &&
23+
discoveryInstance
24+
.discoveredItems$()
25+
.pipe(
26+
map((i: ServiceDiscoveryEvent) => ({
27+
type: i.type,
28+
items: restore(i.items[0]),
29+
})),
30+
printLogs(microserviceContext.whoAmI, debug)
31+
)
32+
.subscribe(remoteRegistry.update);
2533
return Object.freeze({
2634
destroy: () =>
2735
destroy({

‎packages/scalecube-microservice/src/Microservices/Microservices.ts‎

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { flatteningServices } from '../helpers/serviceData';
1010
import { getServiceFactoryOptions, setMicroserviceInstance } from './MicroserviceInstance';
1111
import { ROUTER_NOT_PROVIDED } from '../helpers/constants';
1212
import { loggerUtil } from '../helpers/logger';
13+
import { minimized } from './endpointsUtil';
1314

1415
export const createMicroservice: MicroserviceApi.CreateMicroservice = (
1516
options: MicroserviceApi.MicroserviceOptions
@@ -85,7 +86,7 @@ export const createMicroservice: MicroserviceApi.CreateMicroservice = (
8586

8687
const discoveryInstance = createDiscovery({
8788
address: fallBackAddress,
88-
itemsToPublish: endPointsToPublishInCluster,
89+
itemsToPublish: [minimized(endPointsToPublishInCluster)],
8990
seedAddress,
9091
cluster,
9192
debug,
@@ -129,13 +130,8 @@ const createMicroserviceContext = ({ address, debug }: MicroserviceContextOption
129130
};
130131

131132
const multiSeedSupport = (seedAddress: string | Address | string[] | Address[]) => {
132-
let seeds = [];
133133
if (!check.isArray(seedAddress)) {
134-
seeds = check.isString(seedAddress) ? [getAddress(seedAddress as string)] : [seedAddress];
135-
} else {
136-
seeds = (seedAddress as []).map((val: string | Address) => {
137-
return check.isString(val) ? getAddress(val as string) : val;
138-
});
134+
return check.isString(seedAddress) ? [getAddress(seedAddress as string)] : [seedAddress];
139135
}
140-
return seeds;
136+
return (seedAddressas[]).map((val: string|Address)=>(check.isString(val) ? getAddress(valasstring) : val));
141137
};
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/// This util is minimize and restore Endpoint[]
2+
/// use for optimize endpoints transport
3+
import { MicroserviceApi } from '@scalecube/api';
4+
import { getAddress, getFullAddress } from '@scalecube/utils';
5+
6+
const eAsyncModel: { [key: number]: MicroserviceApi.AsyncModel } = {
7+
0: 'requestResponse',
8+
1: 'requestStream',
9+
};
10+
const sAsyncModel: { [key: string]: keyof typeof eAsyncModel } = {
11+
requestResponse: 0,
12+
requestStream: 1,
13+
};
14+
15+
export interface Endpoints {
16+
[address: string]: {
17+
[serviceName: string]: {
18+
[methodName: string]: keyof typeof eAsyncModel;
19+
};
20+
};
21+
}
22+
23+
export function minimized(endpoints: MicroserviceApi.Endpoint[]): Endpoints {
24+
const res: Endpoints = {};
25+
endpoints.forEach((e) => {
26+
res[getFullAddress(e.address)] = res[getFullAddress(e.address)] || {};
27+
res[getFullAddress(e.address)][e.serviceName] = res[getFullAddress(e.address)][e.serviceName] || {};
28+
res[getFullAddress(e.address)][e.serviceName][e.methodName] = sAsyncModel[e.asyncModel];
29+
});
30+
return res;
31+
}
32+
33+
export function restore(endpoints: Endpoints): MicroserviceApi.Endpoint[] {
34+
const res: MicroserviceApi.Endpoint[] = [];
35+
for (const address in endpoints) {
36+
for (const service in endpoints[address]) {
37+
for (const method in endpoints[address][service]) {
38+
res.push({
39+
asyncModel: eAsyncModel[endpoints[address][service][method]],
40+
methodName: method,
41+
serviceName: service,
42+
address: getAddress(address),
43+
qualifier: `${service}/${method}`,
44+
});
45+
}
46+
}
47+
}
48+
return res;
49+
}

‎packages/scalecube-microservice/tests/helper.ts‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import EventEmitter = require('events');
2+
// @ts-ignore
23
const myEmitter = new EventEmitter();
34

45
// @ts-ignore
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { restore, minimized } from '../../../src/Microservices/endpointsUtil';
2+
import { AsyncModel } from '@scalecube/api/lib/microservice';
3+
4+
describe('endpointsUtil', () => {
5+
test('Given endpoints when minimized & restore it should be the same', () => {
6+
const endpoints = [
7+
{
8+
qualifier: 'GreetingService/hello',
9+
serviceName: 'GreetingService',
10+
methodName: 'hello',
11+
asyncModel: 'requestResponse' as AsyncModel,
12+
address: {
13+
protocol: 'pm',
14+
host: 'defaultHost',
15+
port: 8080,
16+
path: 'B',
17+
},
18+
},
19+
{
20+
qualifier: 'GreetingService/greet$',
21+
serviceName: 'GreetingService',
22+
methodName: 'greet$',
23+
asyncModel: 'requestStream' as AsyncModel,
24+
address: {
25+
protocol: 'pm',
26+
host: 'defaultHost',
27+
port: 8080,
28+
path: 'B',
29+
},
30+
},
31+
];
32+
33+
expect(endpoints).toEqual(restore(minimized(endpoints)));
34+
});
35+
});

‎packages/utils/src/address.ts‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ export const getAddress = (address: string): Address => {
3737
address = buildAddress({ key: 'host', optionalValue: 'defaultHost', delimiter: ':', str: address, newAddress });
3838
address = buildAddress({ key: 'port', optionalValue: 8080, delimiter: '/', str: address, newAddress });
3939
newAddress.path = address;
40-
40+
newAddress.port=typeofnewAddress.port==='string' ? parseInt(newAddress.port,10) : newAddress.port;
4141
return newAddress as Address;
4242
};
4343

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /