From 12d64fe55349a37800d04ce6070a01ea7098dda0 Mon Sep 17 00:00:00 2001 From: cfaur09 Date: Tue, 14 Oct 2025 11:20:24 +0300 Subject: [PATCH 1/7] processes 50 tokens concurrently using ConcurrencyUtils --- src/endpoints/tokens/token.service.ts | 52 ++++++++++++++++----------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/src/endpoints/tokens/token.service.ts b/src/endpoints/tokens/token.service.ts index 2677ba8fd..7cd687b99 100644 --- a/src/endpoints/tokens/token.service.ts +++ b/src/endpoints/tokens/token.service.ts @@ -21,6 +21,7 @@ import { TokenWithRoles } from "./entities/token.with.roles"; import { TokenWithRolesFilter } from "./entities/token.with.roles.filter"; import { AddressUtils, BinaryUtils, NumberUtils, TokenUtils } from "@multiversx/sdk-nestjs-common"; import { ApiService, ApiUtils } from "@multiversx/sdk-nestjs-http"; +import { ConcurrencyUtils } from "src/utils/concurrency.utils"; import { CacheService } from "@multiversx/sdk-nestjs-cache"; import { IndexerService } from "src/common/indexer/indexer.service"; import { OriginLogger } from "@multiversx/sdk-nestjs-common"; @@ -818,30 +819,39 @@ export class TokenService { CacheInfo.EsdtAssets('').ttl, ); - for (const token of tokens) { - const priceSourcetype = token.assets?.priceSource?.type; - - if (priceSourcetype === TokenAssetsPriceSourceType.dataApi) { - token.price = await this.dataApiService.getEsdtTokenPrice(token.identifier); - } else if (priceSourcetype === TokenAssetsPriceSourceType.customUrl && token.assets?.priceSource?.url) { - const pathToPrice = token.assets?.priceSource?.path ?? "0.usdPrice"; - const tokenData = await this.fetchTokenDataFromUrl(token.assets.priceSource.url, pathToPrice); - - if (tokenData) { - token.price = tokenData; - } - } + await ConcurrencyUtils.executeWithConcurrencyLimit( + tokens, + async (token) => { + try { + const priceSourcetype = token.assets?.priceSource?.type; + + if (priceSourcetype === TokenAssetsPriceSourceType.dataApi) { + token.price = await this.dataApiService.getEsdtTokenPrice(token.identifier); + } else if (priceSourcetype === TokenAssetsPriceSourceType.customUrl && token.assets?.priceSource?.url) { + const pathToPrice = token.assets?.priceSource?.path ?? "0.usdPrice"; + const tokenData = await this.fetchTokenDataFromUrl(token.assets.priceSource.url, pathToPrice); + + if (tokenData) { + token.price = tokenData; + } + } - if (token.price) { - const supply = await this.esdtService.getTokenSupply(token.identifier); - token.supply = supply.totalSupply; - token.circulatingSupply = supply.circulatingSupply; + if (token.price) { + const supply = await this.esdtService.getTokenSupply(token.identifier); + token.supply = supply.totalSupply; + token.circulatingSupply = supply.circulatingSupply; - if (token.circulatingSupply) { - token.marketCap = token.price * NumberUtils.denominateString(token.circulatingSupply, token.decimals); + if (token.circulatingSupply) { + token.marketCap = token.price * NumberUtils.denominateString(token.circulatingSupply, token.decimals); + } + } + } catch (error) { + this.logger.error(`Error processing price/supply for token ${token.identifier}: ${error}`); } - } - } + }, + 50, + 'Token prices and supply calculation' + ); tokens = tokens.sortedDescending( token => token.assets ? 1 : 0, From 9a2dc109b6750261a982661b3451eb858a1e5e8f Mon Sep 17 00:00:00 2001 From: cfaur09 Date: Tue, 14 Oct 2025 11:35:24 +0300 Subject: [PATCH 2/7] run all three batch operations in parallel with increased concurrency --- src/endpoints/tokens/token.service.ts | 71 ++++++++++++++------------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/src/endpoints/tokens/token.service.ts b/src/endpoints/tokens/token.service.ts index 7cd687b99..ccec26aed 100644 --- a/src/endpoints/tokens/token.service.ts +++ b/src/endpoints/tokens/token.service.ts @@ -817,6 +817,7 @@ export class TokenService { async token => await this.getTokenAssetsRaw(token.identifier), (token, assets) => token.assets = assets, CacheInfo.EsdtAssets('').ttl, + 50, ); await ConcurrencyUtils.executeWithConcurrencyLimit( @@ -936,41 +937,41 @@ export class TokenService { } private async batchProcessTokens(tokens: TokenDetailed[]) { - await this.cachingService.batchApplyAll( - tokens, - token => CacheInfo.TokenTransactions(token.identifier).key, - async token => await this.getTotalTransactions(token), - (token, result) => { - token.transactions = result?.count; - token.transactionsLastUpdatedAt = result?.lastUpdatedAt; - }, - CacheInfo.TokenTransactions('').ttl, - 10, - ); - - await this.cachingService.batchApplyAll( - tokens, - token => CacheInfo.TokenAccounts(token.identifier).key, - async token => await this.getTotalAccounts(token), - (token, result) => { - token.accounts = result?.count; - token.accountsLastUpdatedAt = result?.lastUpdatedAt; - }, - CacheInfo.TokenAccounts('').ttl, - 10, - ); - - await this.cachingService.batchApplyAll( - tokens, - token => CacheInfo.TokenTransfers(token.identifier).key, - async token => await this.getTotalTransfers(token), - (token, result) => { - token.transfers = result?.count; - token.transfersLastUpdatedAt = result?.lastUpdatedAt; - }, - CacheInfo.TokenTransfers('').ttl, - 10, - ); + await Promise.all([ + this.cachingService.batchApplyAll( + tokens, + token => CacheInfo.TokenTransactions(token.identifier).key, + async token => await this.getTotalTransactions(token), + (token, result) => { + token.transactions = result?.count; + token.transactionsLastUpdatedAt = result?.lastUpdatedAt; + }, + CacheInfo.TokenTransactions('').ttl, + 50, + ), + this.cachingService.batchApplyAll( + tokens, + token => CacheInfo.TokenAccounts(token.identifier).key, + async token => await this.getTotalAccounts(token), + (token, result) => { + token.accounts = result?.count; + token.accountsLastUpdatedAt = result?.lastUpdatedAt; + }, + CacheInfo.TokenAccounts('').ttl, + 50, + ), + this.cachingService.batchApplyAll( + tokens, + token => CacheInfo.TokenTransfers(token.identifier).key, + async token => await this.getTotalTransfers(token), + (token, result) => { + token.transfers = result?.count; + token.transfersLastUpdatedAt = result?.lastUpdatedAt; + }, + CacheInfo.TokenTransfers('').ttl, + 50, + ), + ]); } private async getAllTokensFromApi(): Promise { From fa36664693f9e674e9f7aa3dafc6492d8abfd2bb Mon Sep 17 00:00:00 2001 From: cfaur09 Date: Tue, 14 Oct 2025 11:38:11 +0300 Subject: [PATCH 3/7] add more logs --- src/endpoints/tokens/token.service.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/endpoints/tokens/token.service.ts b/src/endpoints/tokens/token.service.ts index ccec26aed..a81528ef1 100644 --- a/src/endpoints/tokens/token.service.ts +++ b/src/endpoints/tokens/token.service.ts @@ -937,6 +937,8 @@ export class TokenService { } private async batchProcessTokens(tokens: TokenDetailed[]) { + this.logger.log(`Starting batch process for ${tokens.length} tokens`); + await Promise.all([ this.cachingService.batchApplyAll( tokens, @@ -972,6 +974,8 @@ export class TokenService { 50, ), ]); + + this.logger.log(`Batch process for ${tokens.length} tokens finished`); } private async getAllTokensFromApi(): Promise { From 402bfd02cef149f277b899f439494d447cfaaec9 Mon Sep 17 00:00:00 2001 From: cfaur09 Date: Tue, 14 Oct 2025 11:55:20 +0300 Subject: [PATCH 4/7] apply mex prices --- src/endpoints/tokens/token.service.ts | 52 +++++++++++++++++---------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/src/endpoints/tokens/token.service.ts b/src/endpoints/tokens/token.service.ts index a81528ef1..d8ad11fc4 100644 --- a/src/endpoints/tokens/token.service.ts +++ b/src/endpoints/tokens/token.service.ts @@ -1061,29 +1061,45 @@ export class TokenService { try { const indexedTokens = await this.mexTokenService.getMexPricesRaw(); - for (const token of tokens) { - const price = indexedTokens[token.identifier]; - if (price) { - const supply = await this.esdtService.getTokenSupply(token.identifier); - if (token.assets && token.identifier.split('-')[0] === 'EGLDUSDC') { - price.price = price.price / (10 ** 12) * 2; - } + const tokensWithPrices = tokens.filter(token => indexedTokens[token.identifier]); - if (price.isToken) { - token.price = price.price; - token.marketCap = price.price * NumberUtils.denominateString(supply.circulatingSupply, token.decimals); + this.logger.log(`Applying MEX prices for ${tokensWithPrices.length} tokens with parallel supply fetching`); - if (token.totalLiquidity && token.marketCap && (token.totalLiquidity / token.marketCap < LOW_LIQUIDITY_THRESHOLD)) { - token.isLowLiquidity = true; - token.lowLiquidityThresholdPercent = LOW_LIQUIDITY_THRESHOLD * 100; + await ConcurrencyUtils.executeWithConcurrencyLimit( + tokensWithPrices, + async (token) => { + try { + const price = indexedTokens[token.identifier]; + if (!price) { + return; } - } - token.supply = supply.totalSupply; - token.circulatingSupply = supply.circulatingSupply; - } - } + const supply = await this.esdtService.getTokenSupply(token.identifier); + + if (token.assets && token.identifier.split('-')[0] === 'EGLDUSDC') { + price.price = price.price / (10 ** 12) * 2; + } + + if (price.isToken) { + token.price = price.price; + token.marketCap = price.price * NumberUtils.denominateString(supply.circulatingSupply, token.decimals); + + if (token.totalLiquidity && token.marketCap && (token.totalLiquidity / token.marketCap < LOW_LIQUIDITY_THRESHOLD)) { + token.isLowLiquidity = true; + token.lowLiquidityThresholdPercent = LOW_LIQUIDITY_THRESHOLD * 100; + } + } + + token.supply = supply.totalSupply; + token.circulatingSupply = supply.circulatingSupply; + } catch (error) { + this.logger.error(`Error applying MEX price for token ${token.identifier}: ${error}`); + } + }, + 50, + 'MEX prices and supply' + ); } catch (error) { this.logger.error('Could not apply mex tokens prices'); this.logger.error(error); From 66ea7f246acfea5185b70917dac48e4b98e45758 Mon Sep 17 00:00:00 2001 From: cfaur09 Date: Tue, 14 Oct 2025 14:17:53 +0300 Subject: [PATCH 5/7] add logs ( to be removed ) --- src/endpoints/tokens/token.service.ts | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/endpoints/tokens/token.service.ts b/src/endpoints/tokens/token.service.ts index d8ad11fc4..4cef10e38 100644 --- a/src/endpoints/tokens/token.service.ts +++ b/src/endpoints/tokens/token.service.ts @@ -759,10 +759,10 @@ export class TokenService { } this.logger.log(`Starting to fetch all tokens`); + const startFungible = Date.now(); const tokensProperties = await this.esdtService.getAllFungibleTokenProperties(); let tokens = tokensProperties.map(properties => ApiUtils.mergeObjects(new TokenDetailed(), properties)); - - this.logger.log(`Fetched ${tokens.length} fungible tokens`); + this.logger.log(`Fetched ${tokens.length} fungible tokens in ${Date.now() - startFungible}ms`); const allAssets = await this.assetsService.getAllTokenAssets(); @@ -777,10 +777,9 @@ export class TokenService { } this.logger.log(`Starting to fetch all meta tokens`); - + const startMeta = Date.now(); const collections = await this.collectionService.getNftCollections(new QueryPagination({ from: 0, size: 10000 }), { type: [NftType.MetaESDT] }); - - this.logger.log(`Fetched ${collections.length} meta tokens`); + this.logger.log(`Fetched ${collections.length} meta tokens in ${Date.now() - startMeta}ms`); for (const collection of collections) { tokens.push(new TokenDetailed({ @@ -801,16 +800,23 @@ export class TokenService { })); } + const startBatchProcess = Date.now(); await this.batchProcessTokens(tokens); + this.logger.log(`Batch process completed in ${Date.now() - startBatchProcess}ms`); const nonMetaEsdtTokens = tokens.filter(x => x.type !== TokenType.MetaESDT); + this.logger.log(`Applying MEX data for ${nonMetaEsdtTokens.length} non-meta tokens`); + const startMex = Date.now(); await Promise.all([ this.applyMexLiquidity(nonMetaEsdtTokens), this.applyMexPrices(nonMetaEsdtTokens), this.applyMexPairType(nonMetaEsdtTokens), this.applyMexPairTradesCount(nonMetaEsdtTokens), ]); + this.logger.log(`MEX data applied in ${Date.now() - startMex}ms`); + this.logger.log(`Fetching assets for ${tokens.length} tokens`); + const startAssets = Date.now(); await this.cachingService.batchApplyAll( tokens, token => CacheInfo.EsdtAssets(token.identifier).key, @@ -819,7 +825,10 @@ export class TokenService { CacheInfo.EsdtAssets('').ttl, 50, ); + this.logger.log(`Assets fetched in ${Date.now() - startAssets}ms`); + this.logger.log(`Processing price sources and supply for ${tokens.length} tokens`); + const startPriceSupply = Date.now(); await ConcurrencyUtils.executeWithConcurrencyLimit( tokens, async (token) => { @@ -853,7 +862,10 @@ export class TokenService { 50, 'Token prices and supply calculation' ); + this.logger.log(`Price sources and supply processed in ${Date.now() - startPriceSupply}ms`); + this.logger.log(`Sorting and finalizing ${tokens.length} tokens`); + const startFinalize = Date.now(); tokens = tokens.sortedDescending( token => token.assets ? 1 : 0, token => token.marketCap ? 1 : 0, @@ -874,7 +886,9 @@ export class TokenService { marketCap: 0, }); tokens = [...tokens, egldToken]; + this.logger.log(`Sorting and finalization completed in ${Date.now() - startFinalize}ms`); + this.logger.log(`Total tokens processed: ${tokens.length}`); return tokens; } From c37e920bd9da28bd4f3542490bc3114d584910a8 Mon Sep 17 00:00:00 2001 From: cfaur09 Date: Tue, 14 Oct 2025 14:58:46 +0300 Subject: [PATCH 6/7] increse cron time expression --- src/crons/cache.warmer/cache.warmer.service.ts | 2 +- src/endpoints/tokens/token.service.ts | 13 +------------ 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/src/crons/cache.warmer/cache.warmer.service.ts b/src/crons/cache.warmer/cache.warmer.service.ts index 7fbc17c04..0e03f6a00 100644 --- a/src/crons/cache.warmer/cache.warmer.service.ts +++ b/src/crons/cache.warmer/cache.warmer.service.ts @@ -156,7 +156,7 @@ export class CacheWarmerService { await this.invalidateKey(CacheInfo.TransactionPool.key, pool, this.apiConfigService.getTransactionPoolCacheWarmerTtlInSeconds()); } - @Cron(CronExpression.EVERY_MINUTE) + @Cron(CronExpression.EVERY_10_MINUTES) @Lock({ name: 'All Tokens invalidations', verbose: true }) async handleEsdtTokenInvalidations() { const tokens = await this.tokenService.getAllTokensRaw(); diff --git a/src/endpoints/tokens/token.service.ts b/src/endpoints/tokens/token.service.ts index 4cef10e38..3de4fe3f9 100644 --- a/src/endpoints/tokens/token.service.ts +++ b/src/endpoints/tokens/token.service.ts @@ -777,9 +777,7 @@ export class TokenService { } this.logger.log(`Starting to fetch all meta tokens`); - const startMeta = Date.now(); const collections = await this.collectionService.getNftCollections(new QueryPagination({ from: 0, size: 10000 }), { type: [NftType.MetaESDT] }); - this.logger.log(`Fetched ${collections.length} meta tokens in ${Date.now() - startMeta}ms`); for (const collection of collections) { tokens.push(new TokenDetailed({ @@ -800,23 +798,19 @@ export class TokenService { })); } - const startBatchProcess = Date.now(); await this.batchProcessTokens(tokens); - this.logger.log(`Batch process completed in ${Date.now() - startBatchProcess}ms`); const nonMetaEsdtTokens = tokens.filter(x => x.type !== TokenType.MetaESDT); this.logger.log(`Applying MEX data for ${nonMetaEsdtTokens.length} non-meta tokens`); - const startMex = Date.now(); + await Promise.all([ this.applyMexLiquidity(nonMetaEsdtTokens), this.applyMexPrices(nonMetaEsdtTokens), this.applyMexPairType(nonMetaEsdtTokens), this.applyMexPairTradesCount(nonMetaEsdtTokens), ]); - this.logger.log(`MEX data applied in ${Date.now() - startMex}ms`); this.logger.log(`Fetching assets for ${tokens.length} tokens`); - const startAssets = Date.now(); await this.cachingService.batchApplyAll( tokens, token => CacheInfo.EsdtAssets(token.identifier).key, @@ -825,10 +819,8 @@ export class TokenService { CacheInfo.EsdtAssets('').ttl, 50, ); - this.logger.log(`Assets fetched in ${Date.now() - startAssets}ms`); this.logger.log(`Processing price sources and supply for ${tokens.length} tokens`); - const startPriceSupply = Date.now(); await ConcurrencyUtils.executeWithConcurrencyLimit( tokens, async (token) => { @@ -862,10 +854,8 @@ export class TokenService { 50, 'Token prices and supply calculation' ); - this.logger.log(`Price sources and supply processed in ${Date.now() - startPriceSupply}ms`); this.logger.log(`Sorting and finalizing ${tokens.length} tokens`); - const startFinalize = Date.now(); tokens = tokens.sortedDescending( token => token.assets ? 1 : 0, token => token.marketCap ? 1 : 0, @@ -886,7 +876,6 @@ export class TokenService { marketCap: 0, }); tokens = [...tokens, egldToken]; - this.logger.log(`Sorting and finalization completed in ${Date.now() - startFinalize}ms`); this.logger.log(`Total tokens processed: ${tokens.length}`); return tokens; From ce6fe4c4a9be63d2d0610ec422d4189b9df4c398 Mon Sep 17 00:00:00 2001 From: cfaur09 Date: Tue, 14 Oct 2025 17:05:58 +0300 Subject: [PATCH 7/7] set cron to 2 minutes --- src/crons/cache.warmer/cache.warmer.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crons/cache.warmer/cache.warmer.service.ts b/src/crons/cache.warmer/cache.warmer.service.ts index 0e03f6a00..f4cd9bdf2 100644 --- a/src/crons/cache.warmer/cache.warmer.service.ts +++ b/src/crons/cache.warmer/cache.warmer.service.ts @@ -156,7 +156,7 @@ export class CacheWarmerService { await this.invalidateKey(CacheInfo.TransactionPool.key, pool, this.apiConfigService.getTransactionPoolCacheWarmerTtlInSeconds()); } - @Cron(CronExpression.EVERY_10_MINUTES) + @Cron('*/2 * * * *') @Lock({ name: 'All Tokens invalidations', verbose: true }) async handleEsdtTokenInvalidations() { const tokens = await this.tokenService.getAllTokensRaw();