Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/crons/cache.warmer/cache.warmer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ export class CacheWarmerService {
await this.invalidateKey(CacheInfo.TransactionPool.key, pool, this.apiConfigService.getTransactionPoolCacheWarmerTtlInSeconds());
}

@Cron(CronExpression.EVERY_MINUTE)
@Cron('*/2 * * * *')
@Lock({ name: 'All Tokens invalidations', verbose: true })
async handleEsdtTokenInvalidations() {
const tokens = await this.tokenService.getAllTokensRaw();
Expand Down
188 changes: 111 additions & 77 deletions src/endpoints/tokens/token.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { TokenWithRoles } from "./entities/token.with.roles";
import { TokenWithRolesFilter } from "./entities/token.with.roles.filter";
import { AddressUtils, BinaryUtils, NumberUtils, TokenUtils } from "@multiversx/sdk-nestjs-common";
import { ApiService, ApiUtils } from "@multiversx/sdk-nestjs-http";
import { ConcurrencyUtils } from "src/utils/concurrency.utils";
import { CacheService } from "@multiversx/sdk-nestjs-cache";
import { IndexerService } from "src/common/indexer/indexer.service";
import { OriginLogger } from "@multiversx/sdk-nestjs-common";
Expand Down Expand Up @@ -758,10 +759,10 @@ export class TokenService {
}

this.logger.log(`Starting to fetch all tokens`);
const startFungible = Date.now();
const tokensProperties = await this.esdtService.getAllFungibleTokenProperties();
let tokens = tokensProperties.map(properties => ApiUtils.mergeObjects(new TokenDetailed(), properties));

this.logger.log(`Fetched ${tokens.length} fungible tokens`);
this.logger.log(`Fetched ${tokens.length} fungible tokens in ${Date.now() - startFungible}ms`);

const allAssets = await this.assetsService.getAllTokenAssets();

Expand All @@ -776,11 +777,8 @@ export class TokenService {
}

this.logger.log(`Starting to fetch all meta tokens`);

const collections = await this.collectionService.getNftCollections(new QueryPagination({ from: 0, size: 10000 }), { type: [NftType.MetaESDT] });

this.logger.log(`Fetched ${collections.length} meta tokens`);

for (const collection of collections) {
tokens.push(new TokenDetailed({
type: TokenType.MetaESDT,
Expand All @@ -803,46 +801,61 @@ export class TokenService {
await this.batchProcessTokens(tokens);

const nonMetaEsdtTokens = tokens.filter(x => x.type !== TokenType.MetaESDT);
this.logger.log(`Applying MEX data for ${nonMetaEsdtTokens.length} non-meta tokens`);

await Promise.all([
this.applyMexLiquidity(nonMetaEsdtTokens),
this.applyMexPrices(nonMetaEsdtTokens),
this.applyMexPairType(nonMetaEsdtTokens),
this.applyMexPairTradesCount(nonMetaEsdtTokens),
]);

this.logger.log(`Fetching assets for ${tokens.length} tokens`);
await this.cachingService.batchApplyAll(
tokens,
token => CacheInfo.EsdtAssets(token.identifier).key,
async token => await this.getTokenAssetsRaw(token.identifier),
(token, assets) => token.assets = assets,
CacheInfo.EsdtAssets('').ttl,
50,
);

for (const token of tokens) {
const priceSourcetype = token.assets?.priceSource?.type;

if (priceSourcetype === TokenAssetsPriceSourceType.dataApi) {
token.price = await this.dataApiService.getEsdtTokenPrice(token.identifier);
} else if (priceSourcetype === TokenAssetsPriceSourceType.customUrl && token.assets?.priceSource?.url) {
const pathToPrice = token.assets?.priceSource?.path ?? "0.usdPrice";
const tokenData = await this.fetchTokenDataFromUrl(token.assets.priceSource.url, pathToPrice);

if (tokenData) {
token.price = tokenData;
}
}
this.logger.log(`Processing price sources and supply for ${tokens.length} tokens`);
await ConcurrencyUtils.executeWithConcurrencyLimit(
tokens,
async (token) => {
try {
const priceSourcetype = token.assets?.priceSource?.type;

if (priceSourcetype === TokenAssetsPriceSourceType.dataApi) {
token.price = await this.dataApiService.getEsdtTokenPrice(token.identifier);
} else if (priceSourcetype === TokenAssetsPriceSourceType.customUrl && token.assets?.priceSource?.url) {
const pathToPrice = token.assets?.priceSource?.path ?? "0.usdPrice";
const tokenData = await this.fetchTokenDataFromUrl(token.assets.priceSource.url, pathToPrice);

if (tokenData) {
token.price = tokenData;
}
}

if (token.price) {
const supply = await this.esdtService.getTokenSupply(token.identifier);
token.supply = supply.totalSupply;
token.circulatingSupply = supply.circulatingSupply;
if (token.price) {
const supply = await this.esdtService.getTokenSupply(token.identifier);
token.supply = supply.totalSupply;
token.circulatingSupply = supply.circulatingSupply;

if (token.circulatingSupply) {
token.marketCap = token.price * NumberUtils.denominateString(token.circulatingSupply, token.decimals);
if (token.circulatingSupply) {
token.marketCap = token.price * NumberUtils.denominateString(token.circulatingSupply, token.decimals);
}
}
} catch (error) {
this.logger.error(`Error processing price/supply for token ${token.identifier}: ${error}`);
}
}
}
},
50,
'Token prices and supply calculation'
);

this.logger.log(`Sorting and finalizing ${tokens.length} tokens`);
tokens = tokens.sortedDescending(
token => token.assets ? 1 : 0,
token => token.marketCap ? 1 : 0,
Expand All @@ -864,6 +877,7 @@ export class TokenService {
});
tokens = [...tokens, egldToken];

this.logger.log(`Total tokens processed: ${tokens.length}`);
return tokens;
}

Expand Down Expand Up @@ -926,41 +940,45 @@ export class TokenService {
}

private async batchProcessTokens(tokens: TokenDetailed[]) {
await this.cachingService.batchApplyAll(
tokens,
token => CacheInfo.TokenTransactions(token.identifier).key,
async token => await this.getTotalTransactions(token),
(token, result) => {
token.transactions = result?.count;
token.transactionsLastUpdatedAt = result?.lastUpdatedAt;
},
CacheInfo.TokenTransactions('').ttl,
10,
);
this.logger.log(`Starting batch process for ${tokens.length} tokens`);

await this.cachingService.batchApplyAll(
tokens,
token => CacheInfo.TokenAccounts(token.identifier).key,
async token => await this.getTotalAccounts(token),
(token, result) => {
token.accounts = result?.count;
token.accountsLastUpdatedAt = result?.lastUpdatedAt;
},
CacheInfo.TokenAccounts('').ttl,
10,
);
await Promise.all([
this.cachingService.batchApplyAll(
tokens,
token => CacheInfo.TokenTransactions(token.identifier).key,
async token => await this.getTotalTransactions(token),
(token, result) => {
token.transactions = result?.count;
token.transactionsLastUpdatedAt = result?.lastUpdatedAt;
},
CacheInfo.TokenTransactions('').ttl,
50,
),
this.cachingService.batchApplyAll(
tokens,
token => CacheInfo.TokenAccounts(token.identifier).key,
async token => await this.getTotalAccounts(token),
(token, result) => {
token.accounts = result?.count;
token.accountsLastUpdatedAt = result?.lastUpdatedAt;
},
CacheInfo.TokenAccounts('').ttl,
50,
),
this.cachingService.batchApplyAll(
tokens,
token => CacheInfo.TokenTransfers(token.identifier).key,
async token => await this.getTotalTransfers(token),
(token, result) => {
token.transfers = result?.count;
token.transfersLastUpdatedAt = result?.lastUpdatedAt;
},
CacheInfo.TokenTransfers('').ttl,
50,
),
]);

await this.cachingService.batchApplyAll(
tokens,
token => CacheInfo.TokenTransfers(token.identifier).key,
async token => await this.getTotalTransfers(token),
(token, result) => {
token.transfers = result?.count;
token.transfersLastUpdatedAt = result?.lastUpdatedAt;
},
CacheInfo.TokenTransfers('').ttl,
10,
);
this.logger.log(`Batch process for ${tokens.length} tokens finished`);
}

private async getAllTokensFromApi(): Promise<TokenDetailed[]> {
Expand Down Expand Up @@ -1046,29 +1064,45 @@ export class TokenService {

try {
const indexedTokens = await this.mexTokenService.getMexPricesRaw();
for (const token of tokens) {
const price = indexedTokens[token.identifier];
if (price) {
const supply = await this.esdtService.getTokenSupply(token.identifier);

if (token.assets && token.identifier.split('-')[0] === 'EGLDUSDC') {
price.price = price.price / (10 ** 12) * 2;
}
const tokensWithPrices = tokens.filter(token => indexedTokens[token.identifier]);

if (price.isToken) {
token.price = price.price;
token.marketCap = price.price * NumberUtils.denominateString(supply.circulatingSupply, token.decimals);
this.logger.log(`Applying MEX prices for ${tokensWithPrices.length} tokens with parallel supply fetching`);

if (token.totalLiquidity && token.marketCap && (token.totalLiquidity / token.marketCap < LOW_LIQUIDITY_THRESHOLD)) {
token.isLowLiquidity = true;
token.lowLiquidityThresholdPercent = LOW_LIQUIDITY_THRESHOLD * 100;
await ConcurrencyUtils.executeWithConcurrencyLimit(
tokensWithPrices,
async (token) => {
try {
const price = indexedTokens[token.identifier];
if (!price) {
return;
}
}

token.supply = supply.totalSupply;
token.circulatingSupply = supply.circulatingSupply;
}
}
const supply = await this.esdtService.getTokenSupply(token.identifier);

if (token.assets && token.identifier.split('-')[0] === 'EGLDUSDC') {
price.price = price.price / (10 ** 12) * 2;
}

if (price.isToken) {
token.price = price.price;
token.marketCap = price.price * NumberUtils.denominateString(supply.circulatingSupply, token.decimals);

if (token.totalLiquidity && token.marketCap && (token.totalLiquidity / token.marketCap < LOW_LIQUIDITY_THRESHOLD)) {
token.isLowLiquidity = true;
token.lowLiquidityThresholdPercent = LOW_LIQUIDITY_THRESHOLD * 100;
}
}

token.supply = supply.totalSupply;
token.circulatingSupply = supply.circulatingSupply;
} catch (error) {
this.logger.error(`Error applying MEX price for token ${token.identifier}: ${error}`);
}
},
50,
'MEX prices and supply'
);
} catch (error) {
this.logger.error('Could not apply mex tokens prices');
this.logger.error(error);
Expand Down