Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 133 additions & 67 deletions src/common/indexer/elastic/elastic.indexer.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { HttpStatus, Injectable } from "@nestjs/common";
import { BinaryUtils } from "@multiversx/sdk-nestjs-common";
import { ElasticQuery, QueryOperator, QueryType, QueryConditionOptions, ElasticSortOrder, ElasticSortProperty, TermsQuery, RangeGreaterThanOrEqual, MatchQuery } from "@multiversx/sdk-nestjs-elastic";
import { ElasticQuery, QueryOperator, QueryType, QueryConditionOptions, ElasticSortOrder, ElasticSortProperty, RangeGreaterThanOrEqual, MatchQuery } from "@multiversx/sdk-nestjs-elastic";
import { IndexerInterface } from "../indexer.interface";
import { ApiConfigService } from "src/common/api-config/api.config.service";
import { CollectionFilter } from "src/endpoints/collections/entities/collection.filter";
Expand Down Expand Up @@ -279,9 +279,7 @@ export class ElasticIndexerService implements IndexerInterface {

const elasticOperations = await this.elasticService.getList('operations', 'txHash', elasticQuery);

for (const operation of elasticOperations) {
this.processTransaction(operation);
}
this.bulkProcessTransactions(elasticOperations);

return elasticOperations;
}
Expand Down Expand Up @@ -338,15 +336,15 @@ export class ElasticIndexerService implements IndexerInterface {
.withMustMatchCondition('type', 'unsigned')
.withPagination({ from: 0, size: transactionHashes.length + 1 })
.withSort([{ name: 'timestamp', order: ElasticSortOrder.ascending }])
.withTerms(new TermsQuery('originalTxHash', transactionHashes));
.withMustMultiShouldCondition(transactionHashes, hash => QueryType.Match('originalTxHash', hash));

return await this.elasticService.getList('operations', 'scHash', elasticQuery);
}

async getAccountsForAddresses(addresses: string[]): Promise<any[]> {
const elasticQuery: ElasticQuery = ElasticQuery.create()
.withPagination({ from: 0, size: addresses.length + 1 })
.withTerms(new TermsQuery('address', addresses));
.withMustMultiShouldCondition(addresses, address => QueryType.Match('address', address));

return await this.elasticService.getList('accounts', 'address', elasticQuery);
}
Expand Down Expand Up @@ -383,9 +381,7 @@ export class ElasticIndexerService implements IndexerInterface {

const results = await this.elasticService.getList('operations', 'hash', elasticQuery);

for (const result of results) {
this.processTransaction(result);
}
this.bulkProcessTransactions(results);

return results;
}
Expand Down Expand Up @@ -548,9 +544,7 @@ export class ElasticIndexerService implements IndexerInterface {

const transactions = await this.elasticService.getList('operations', 'txHash', elasticQuery);

for (const transaction of transactions) {
this.processTransaction(transaction);
}
this.bulkProcessTransactions(transactions);

return transactions;
}
Expand All @@ -561,6 +555,19 @@ export class ElasticIndexerService implements IndexerInterface {
}
}

private bulkProcessTransactions(transactions: any[]) {
if (!transactions || transactions.length === 0) {
return;
}

for (let i = 0; i < transactions.length; i++) {
const transaction = transactions[i];
if (transaction && !transaction.function) {
transaction.function = transaction.operation;
}
}
}

private buildTokenFilter(query: ElasticQuery, filter: TokenFilter): ElasticQuery {
if (filter.includeMetaESDT === true) {
query = query.withMustMultiShouldCondition([TokenType.FungibleESDT, TokenType.MetaESDT], type => QueryType.Match('type', type));
Expand Down Expand Up @@ -616,9 +623,7 @@ export class ElasticIndexerService implements IndexerInterface {

const results = await this.elasticService.getList('operations', 'hash', elasticQuerySc);

for (const result of results) {
this.processTransaction(result);
}
this.bulkProcessTransactions(results);

return results;
}
Expand All @@ -632,9 +637,11 @@ export class ElasticIndexerService implements IndexerInterface {
return [];
}

const maxSize = Math.min(hashes.length * 10, 1000);

const elasticQuery = ElasticQuery.create()
.withMustMatchCondition('type', 'unsigned')
.withPagination({ from: 0, size: 10000 })
.withPagination({ from: 0, size: maxSize })
.withSort([{ name: 'timestamp', order: ElasticSortOrder.ascending }])
.withMustMultiShouldCondition(hashes, hash => QueryType.Match('originalTxHash', hash));

Expand All @@ -646,19 +653,19 @@ export class ElasticIndexerService implements IndexerInterface {
return [];
}

const queries = identifiers.map((identifier) => QueryType.Match('identifier', identifier, QueryOperator.AND));

let elasticQuery = ElasticQuery.create();

if (pagination) {
elasticQuery = elasticQuery.withPagination(pagination);
}

elasticQuery = elasticQuery
.withSort([{ name: "balanceNum", order: ElasticSortOrder.descending }])
.withSort([
{ name: "balanceNum", order: ElasticSortOrder.descending },
{ name: 'timestamp', order: ElasticSortOrder.descending },
])
.withCondition(QueryConditionOptions.mustNot, [QueryType.Match('address', 'pending')])
.withCondition(QueryConditionOptions.should, queries)
.withSort([{ name: 'timestamp', order: ElasticSortOrder.descending }]);
.withMustMultiShouldCondition(identifiers, identifier => QueryType.Match('identifier', identifier, QueryOperator.AND));

return await this.elasticService.getList('accountsesdt', 'identifier', elasticQuery);
}
Expand All @@ -668,19 +675,19 @@ export class ElasticIndexerService implements IndexerInterface {
return [];
}

const queries = identifiers.map((identifier) => QueryType.Match('collection', identifier, QueryOperator.AND));

let elasticQuery = ElasticQuery.create();

if (pagination) {
elasticQuery = elasticQuery.withPagination(pagination);
}

elasticQuery = elasticQuery
.withSort([{ name: "balanceNum", order: ElasticSortOrder.descending }])
.withSort([
{ name: "balanceNum", order: ElasticSortOrder.descending },
{ name: 'timestamp', order: ElasticSortOrder.descending },
])
.withCondition(QueryConditionOptions.mustNot, [QueryType.Match('address', 'pending')])
.withCondition(QueryConditionOptions.should, queries)
.withSort([{ name: 'timestamp', order: ElasticSortOrder.descending }]);
.withMustMultiShouldCondition(identifiers, identifier => QueryType.Match('collection', identifier, QueryOperator.AND));

return await this.elasticService.getList('accountsesdt', 'identifier', elasticQuery);
}
Expand Down Expand Up @@ -711,11 +718,22 @@ export class ElasticIndexerService implements IndexerInterface {
]);
}

let elasticNfts = await this.elasticService.getList('tokens', 'identifier', elasticQuery);
if (elasticNfts.length === 0 && identifier !== undefined) {
elasticNfts = await this.elasticService.getList('accountsesdt', 'identifier', ElasticQuery.create().withMustMatchCondition('identifier', identifier, QueryOperator.AND));
if (identifier !== undefined) {
const [tokensResult, accountsResult] = await Promise.all([
this.elasticService.getList('tokens', 'identifier', elasticQuery).catch(() => []),
this.elasticService.getList('accountsesdt', 'identifier',
ElasticQuery.create()
.withMustMatchCondition('identifier', identifier, QueryOperator.AND)
.withPagination(pagination)
).catch(() => []),
]);

const elasticNfts = tokensResult.length > 0 ? tokensResult : accountsResult;
return elasticNfts;
} else {
const elasticNfts = await this.elasticService.getList('tokens', 'identifier', elasticQuery);
return elasticNfts;
}
return elasticNfts;
}

async getTransactionBySenderAndNonce(sender: string, nonce: number): Promise<any[]> {
Expand Down Expand Up @@ -819,52 +837,100 @@ export class ElasticIndexerService implements IndexerInterface {
}
}

const elasticQuery = ElasticQuery.create()
.withMustExistCondition('identifier')
.withMustMatchCondition('address', address)
.withPagination({ from: 0, size: 0 })
.withMustMatchCondition('token', filter.collection, QueryOperator.AND)
.withMustMultiShouldCondition(filter.identifiers, identifier => QueryType.Match('token', identifier, QueryOperator.AND))
.withSearchWildcardCondition(filter.search, ['token', 'name'])
.withMustMultiShouldCondition(filterTypes, type => QueryType.Match('type', type))
.withMustMultiShouldCondition(filter.subType, subType => QueryType.Match('type', subType))
.withExtra({
aggs: {
collections: {
composite: {
size: 10000,
sources: [
{
collection: {
terms: {
field: 'token',
},
},
},
],
const data: { collection: string, count: number, balance: number }[] = [];
let afterKey: any = null;
let remainingToSkip = pagination.from;
let remainingToCollect = pagination.size;

while (data.length < pagination.size) {
const batchSize = Math.min(1000, remainingToSkip + remainingToCollect);

const compositeAgg: any = {
size: batchSize,
sources: [
{
collection: {
terms: {
field: 'token',
order: 'asc',
},
},
aggs: {
balance: {
sum: {
field: 'balanceNum',
},
],
};

if (afterKey) {
compositeAgg.after = afterKey;
}

const elasticQuery = ElasticQuery.create()
.withMustExistCondition('identifier')
.withMustMatchCondition('address', address)
.withPagination({ from: 0, size: 0 })
.withMustMatchCondition('token', filter.collection, QueryOperator.AND)
.withMustMultiShouldCondition(filter.identifiers, identifier => QueryType.Match('token', identifier, QueryOperator.AND))
.withSearchWildcardCondition(filter.search, ['token', 'name'])
.withMustMultiShouldCondition(filterTypes, type => QueryType.Match('type', type))
.withMustMultiShouldCondition(filter.subType, subType => QueryType.Match('type', subType))
.withExtra({
aggs: {
collections: {
composite: compositeAgg,
aggs: {
balance: {
sum: {
field: 'balanceNum',
},
},
},
},
},
},
});
});

const result = await this.elasticService.post(`${this.apiConfigService.getElasticUrl()}/accountsesdt/_search`, elasticQuery.toJson());
const result = await this.elasticService.post(`${this.apiConfigService.getElasticUrl()}/accountsesdt/_search`, elasticQuery.toJson());
const buckets = result?.data?.aggregations?.collections?.buckets || [];

if (buckets.length === 0) {
break;
}

const buckets = result?.data?.aggregations?.collections?.buckets;
const batchData: { collection: string, count: number, balance: number }[] = buckets.map((bucket: any) => ({
collection: bucket.key.collection,
count: bucket.doc_count,
balance: bucket.balance.value,
}));

let data: { collection: string, count: number, balance: number }[] = buckets.map((bucket: any) => ({
collection: bucket.key.collection,
count: bucket.doc_count,
balance: bucket.balance.value,
}));
if (remainingToSkip > 0) {
const skipFromBatch = Math.min(remainingToSkip, batchData.length);
remainingToSkip -= skipFromBatch;

if (remainingToSkip === 0) {
const collectFromBatch = Math.min(remainingToCollect, batchData.length - skipFromBatch);
data.push(...batchData.slice(skipFromBatch, skipFromBatch + collectFromBatch));
remainingToCollect -= collectFromBatch;
}
} else {
const collectFromBatch = Math.min(remainingToCollect, batchData.length);
data.push(...batchData.slice(0, collectFromBatch));
remainingToCollect -= collectFromBatch;
}

if (remainingToCollect === 0) {
break;
}

const aggregations = result?.data?.aggregations?.collections;
if (aggregations?.after_key) {
afterKey = aggregations.after_key;
} else {
break;
}

if (buckets.length < batchSize) {
break;
}
}

data = data.slice(pagination.from, pagination.from + pagination.size);
return data;
}

Expand Down
Loading