diff --git a/src/crons/cache.warmer/cache.warmer.service.ts b/src/crons/cache.warmer/cache.warmer.service.ts index 7fbc17c04..f4cd9bdf2 100644 --- a/src/crons/cache.warmer/cache.warmer.service.ts +++ b/src/crons/cache.warmer/cache.warmer.service.ts @@ -156,7 +156,7 @@ export class CacheWarmerService { await this.invalidateKey(CacheInfo.TransactionPool.key, pool, this.apiConfigService.getTransactionPoolCacheWarmerTtlInSeconds()); } - @Cron(CronExpression.EVERY_MINUTE) + @Cron('*/2 * * * *') @Lock({ name: 'All Tokens invalidations', verbose: true }) async handleEsdtTokenInvalidations() { const tokens = await this.tokenService.getAllTokensRaw(); diff --git a/src/endpoints/tokens/token.service.ts b/src/endpoints/tokens/token.service.ts index 2677ba8fd..3de4fe3f9 100644 --- a/src/endpoints/tokens/token.service.ts +++ b/src/endpoints/tokens/token.service.ts @@ -21,6 +21,7 @@ import { TokenWithRoles } from "./entities/token.with.roles"; import { TokenWithRolesFilter } from "./entities/token.with.roles.filter"; import { AddressUtils, BinaryUtils, NumberUtils, TokenUtils } from "@multiversx/sdk-nestjs-common"; import { ApiService, ApiUtils } from "@multiversx/sdk-nestjs-http"; +import { ConcurrencyUtils } from "src/utils/concurrency.utils"; import { CacheService } from "@multiversx/sdk-nestjs-cache"; import { IndexerService } from "src/common/indexer/indexer.service"; import { OriginLogger } from "@multiversx/sdk-nestjs-common"; @@ -758,10 +759,10 @@ export class TokenService { } this.logger.log(`Starting to fetch all tokens`); + const startFungible = Date.now(); const tokensProperties = await this.esdtService.getAllFungibleTokenProperties(); let tokens = tokensProperties.map(properties => ApiUtils.mergeObjects(new TokenDetailed(), properties)); - - this.logger.log(`Fetched ${tokens.length} fungible tokens`); + this.logger.log(`Fetched ${tokens.length} fungible tokens in ${Date.now() - startFungible}ms`); const allAssets = await this.assetsService.getAllTokenAssets(); @@ -776,11 +777,8 @@ export class TokenService { } this.logger.log(`Starting to fetch all meta tokens`); - const collections = await this.collectionService.getNftCollections(new QueryPagination({ from: 0, size: 10000 }), { type: [NftType.MetaESDT] }); - this.logger.log(`Fetched ${collections.length} meta tokens`); - for (const collection of collections) { tokens.push(new TokenDetailed({ type: TokenType.MetaESDT, @@ -803,6 +801,8 @@ export class TokenService { await this.batchProcessTokens(tokens); const nonMetaEsdtTokens = tokens.filter(x => x.type !== TokenType.MetaESDT); + this.logger.log(`Applying MEX data for ${nonMetaEsdtTokens.length} non-meta tokens`); + await Promise.all([ this.applyMexLiquidity(nonMetaEsdtTokens), this.applyMexPrices(nonMetaEsdtTokens), @@ -810,39 +810,52 @@ export class TokenService { this.applyMexPairTradesCount(nonMetaEsdtTokens), ]); + this.logger.log(`Fetching assets for ${tokens.length} tokens`); await this.cachingService.batchApplyAll( tokens, token => CacheInfo.EsdtAssets(token.identifier).key, async token => await this.getTokenAssetsRaw(token.identifier), (token, assets) => token.assets = assets, CacheInfo.EsdtAssets('').ttl, + 50, ); - for (const token of tokens) { - const priceSourcetype = token.assets?.priceSource?.type; - - if (priceSourcetype === TokenAssetsPriceSourceType.dataApi) { - token.price = await this.dataApiService.getEsdtTokenPrice(token.identifier); - } else if (priceSourcetype === TokenAssetsPriceSourceType.customUrl && token.assets?.priceSource?.url) { - const pathToPrice = token.assets?.priceSource?.path ?? "0.usdPrice"; - const tokenData = await this.fetchTokenDataFromUrl(token.assets.priceSource.url, pathToPrice); - - if (tokenData) { - token.price = tokenData; - } - } + this.logger.log(`Processing price sources and supply for ${tokens.length} tokens`); + await ConcurrencyUtils.executeWithConcurrencyLimit( + tokens, + async (token) => { + try { + const priceSourcetype = token.assets?.priceSource?.type; + + if (priceSourcetype === TokenAssetsPriceSourceType.dataApi) { + token.price = await this.dataApiService.getEsdtTokenPrice(token.identifier); + } else if (priceSourcetype === TokenAssetsPriceSourceType.customUrl && token.assets?.priceSource?.url) { + const pathToPrice = token.assets?.priceSource?.path ?? "0.usdPrice"; + const tokenData = await this.fetchTokenDataFromUrl(token.assets.priceSource.url, pathToPrice); + + if (tokenData) { + token.price = tokenData; + } + } - if (token.price) { - const supply = await this.esdtService.getTokenSupply(token.identifier); - token.supply = supply.totalSupply; - token.circulatingSupply = supply.circulatingSupply; + if (token.price) { + const supply = await this.esdtService.getTokenSupply(token.identifier); + token.supply = supply.totalSupply; + token.circulatingSupply = supply.circulatingSupply; - if (token.circulatingSupply) { - token.marketCap = token.price * NumberUtils.denominateString(token.circulatingSupply, token.decimals); + if (token.circulatingSupply) { + token.marketCap = token.price * NumberUtils.denominateString(token.circulatingSupply, token.decimals); + } + } + } catch (error) { + this.logger.error(`Error processing price/supply for token ${token.identifier}: ${error}`); } - } - } + }, + 50, + 'Token prices and supply calculation' + ); + this.logger.log(`Sorting and finalizing ${tokens.length} tokens`); tokens = tokens.sortedDescending( token => token.assets ? 1 : 0, token => token.marketCap ? 1 : 0, @@ -864,6 +877,7 @@ export class TokenService { }); tokens = [...tokens, egldToken]; + this.logger.log(`Total tokens processed: ${tokens.length}`); return tokens; } @@ -926,41 +940,45 @@ export class TokenService { } private async batchProcessTokens(tokens: TokenDetailed[]) { - await this.cachingService.batchApplyAll( - tokens, - token => CacheInfo.TokenTransactions(token.identifier).key, - async token => await this.getTotalTransactions(token), - (token, result) => { - token.transactions = result?.count; - token.transactionsLastUpdatedAt = result?.lastUpdatedAt; - }, - CacheInfo.TokenTransactions('').ttl, - 10, - ); + this.logger.log(`Starting batch process for ${tokens.length} tokens`); - await this.cachingService.batchApplyAll( - tokens, - token => CacheInfo.TokenAccounts(token.identifier).key, - async token => await this.getTotalAccounts(token), - (token, result) => { - token.accounts = result?.count; - token.accountsLastUpdatedAt = result?.lastUpdatedAt; - }, - CacheInfo.TokenAccounts('').ttl, - 10, - ); + await Promise.all([ + this.cachingService.batchApplyAll( + tokens, + token => CacheInfo.TokenTransactions(token.identifier).key, + async token => await this.getTotalTransactions(token), + (token, result) => { + token.transactions = result?.count; + token.transactionsLastUpdatedAt = result?.lastUpdatedAt; + }, + CacheInfo.TokenTransactions('').ttl, + 50, + ), + this.cachingService.batchApplyAll( + tokens, + token => CacheInfo.TokenAccounts(token.identifier).key, + async token => await this.getTotalAccounts(token), + (token, result) => { + token.accounts = result?.count; + token.accountsLastUpdatedAt = result?.lastUpdatedAt; + }, + CacheInfo.TokenAccounts('').ttl, + 50, + ), + this.cachingService.batchApplyAll( + tokens, + token => CacheInfo.TokenTransfers(token.identifier).key, + async token => await this.getTotalTransfers(token), + (token, result) => { + token.transfers = result?.count; + token.transfersLastUpdatedAt = result?.lastUpdatedAt; + }, + CacheInfo.TokenTransfers('').ttl, + 50, + ), + ]); - await this.cachingService.batchApplyAll( - tokens, - token => CacheInfo.TokenTransfers(token.identifier).key, - async token => await this.getTotalTransfers(token), - (token, result) => { - token.transfers = result?.count; - token.transfersLastUpdatedAt = result?.lastUpdatedAt; - }, - CacheInfo.TokenTransfers('').ttl, - 10, - ); + this.logger.log(`Batch process for ${tokens.length} tokens finished`); } private async getAllTokensFromApi(): Promise { @@ -1046,29 +1064,45 @@ export class TokenService { try { const indexedTokens = await this.mexTokenService.getMexPricesRaw(); - for (const token of tokens) { - const price = indexedTokens[token.identifier]; - if (price) { - const supply = await this.esdtService.getTokenSupply(token.identifier); - if (token.assets && token.identifier.split('-')[0] === 'EGLDUSDC') { - price.price = price.price / (10 ** 12) * 2; - } + const tokensWithPrices = tokens.filter(token => indexedTokens[token.identifier]); - if (price.isToken) { - token.price = price.price; - token.marketCap = price.price * NumberUtils.denominateString(supply.circulatingSupply, token.decimals); + this.logger.log(`Applying MEX prices for ${tokensWithPrices.length} tokens with parallel supply fetching`); - if (token.totalLiquidity && token.marketCap && (token.totalLiquidity / token.marketCap < LOW_LIQUIDITY_THRESHOLD)) { - token.isLowLiquidity = true; - token.lowLiquidityThresholdPercent = LOW_LIQUIDITY_THRESHOLD * 100; + await ConcurrencyUtils.executeWithConcurrencyLimit( + tokensWithPrices, + async (token) => { + try { + const price = indexedTokens[token.identifier]; + if (!price) { + return; } - } - token.supply = supply.totalSupply; - token.circulatingSupply = supply.circulatingSupply; - } - } + const supply = await this.esdtService.getTokenSupply(token.identifier); + + if (token.assets && token.identifier.split('-')[0] === 'EGLDUSDC') { + price.price = price.price / (10 ** 12) * 2; + } + + if (price.isToken) { + token.price = price.price; + token.marketCap = price.price * NumberUtils.denominateString(supply.circulatingSupply, token.decimals); + + if (token.totalLiquidity && token.marketCap && (token.totalLiquidity / token.marketCap < LOW_LIQUIDITY_THRESHOLD)) { + token.isLowLiquidity = true; + token.lowLiquidityThresholdPercent = LOW_LIQUIDITY_THRESHOLD * 100; + } + } + + token.supply = supply.totalSupply; + token.circulatingSupply = supply.circulatingSupply; + } catch (error) { + this.logger.error(`Error applying MEX price for token ${token.identifier}: ${error}`); + } + }, + 50, + 'MEX prices and supply' + ); } catch (error) { this.logger.error('Could not apply mex tokens prices'); this.logger.error(error);