From e1cfb677ff796ff0cf1165e0c6e6470e4c230e80 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Wed, 14 Jan 2026 09:22:42 +0200 Subject: [PATCH 1/5] Fix indexer retrial errors, do not retry for every error and fix the issue related to the same nonce used for every retry --- .../Indexer/processors/BaseProcessor.ts | 136 +++++++++++------- .../processors/MetadataEventProcessor.ts | 17 ++- 2 files changed, 95 insertions(+), 58 deletions(-) diff --git a/src/components/Indexer/processors/BaseProcessor.ts b/src/components/Indexer/processors/BaseProcessor.ts index b8833b429..0d8a282ed 100644 --- a/src/components/Indexer/processors/BaseProcessor.ts +++ b/src/components/Indexer/processors/BaseProcessor.ts @@ -28,6 +28,7 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC20TemplateEnterprise.sol/ERC20TemplateEnterprise.json' with { type: 'json' } import { fetchTransactionReceipt } from '../../core/utils/validateOrders.js' import { withRetrial } from '../utils.js' +import { OceanNodeKeys } from '../../../@types/OceanNode.js' export abstract class BaseEventProcessor { protected networkId: number @@ -205,6 +206,31 @@ export abstract class BaseEventProcessor { return true } + private async getNonce(decryptorURL: string, keys: OceanNodeKeys) { + try { + if (URLUtils.isValidUrl(decryptorURL)) { + INDEXER_LOGGER.logMessage( + `decryptDDO: Making HTTP request for nonce. DecryptorURL: ${decryptorURL}` + ) + const nonceResponse = await axios.get( + `${decryptorURL}/api/services/nonce?userAddress=${keys.ethAddress}`, + { timeout: 20000 } + ) + return nonceResponse.status === 200 && nonceResponse.data + ? String(parseInt(nonceResponse.data.nonce) + 1) + : Date.now().toString() + } else { + return Date.now().toString() + } + } catch (err) { + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `decryptDDO: Error getting nonce, using timestamp: ${err.message}` + ) + return Date.now().toString() + } + } + protected async decryptDDO( decryptorURL: string, flag: string, @@ -224,75 +250,78 @@ export abstract class BaseEventProcessor { ) const config = await getConfiguration() const { keys } = config - let nonce: string - try { - if (URLUtils.isValidUrl(decryptorURL)) { - INDEXER_LOGGER.logMessage( - `decryptDDO: Making HTTP request for nonce. DecryptorURL: ${decryptorURL}` - ) - const nonceResponse = await axios.get( - `${decryptorURL}/api/services/nonce?userAddress=${keys.ethAddress}`, - { timeout: 20000 } - ) - nonce = - nonceResponse.status === 200 && nonceResponse.data - ? String(parseInt(nonceResponse.data.nonce) + 1) - : Date.now().toString() - } else { - nonce = Date.now().toString() - } - } catch (err) { - INDEXER_LOGGER.log( - LOG_LEVELS_STR.LEVEL_ERROR, - `decryptDDO: Error getting nonce, using timestamp: ${err.message}` - ) - nonce = Date.now().toString() - } const nodeId = keys.peerId.toString() - const wallet: ethers.Wallet = new ethers.Wallet(process.env.PRIVATE_KEY as string) - const useTxIdOrContractAddress = txId || contractAddress - const message = String( - useTxIdOrContractAddress + keys.ethAddress + chainId.toString() + nonce - ) - const messageHash = ethers.solidityPackedKeccak256( - ['bytes'], - [ethers.hexlify(ethers.toUtf8Bytes(message))] - ) + const createSignature = async () => { + const nonce: string = await this.getNonce(decryptorURL, keys) + INDEXER_LOGGER.logMessage( + `decryptDDO: Fetched fresh nonce ${nonce} for decrypt attempt` + ) - const messageHashBytes = ethers.getBytes(messageHash) - const signature = await wallet.signMessage(messageHashBytes) + const message = String( + useTxIdOrContractAddress + keys.ethAddress + chainId.toString() + nonce + ) + const messageHash = ethers.solidityPackedKeccak256( + ['bytes'], + [ethers.hexlify(ethers.toUtf8Bytes(message))] + ) + const messageHashBytes = ethers.getBytes(messageHash) + const signature = await wallet.signMessage(messageHashBytes) - const recoveredAddress = ethers.verifyMessage(messageHashBytes, signature) - INDEXER_LOGGER.logMessage( - `decryptDDO: recovered address: ${recoveredAddress}, expected: ${keys.ethAddress}` - ) + const recoveredAddress = ethers.verifyMessage(messageHashBytes, signature) + INDEXER_LOGGER.logMessage( + `decryptDDO: recovered address: ${recoveredAddress}, expected: ${keys.ethAddress}` + ) + + return { nonce, signature } + } if (URLUtils.isValidUrl(decryptorURL)) { try { - const payload = { - transactionId: txId, - chainId, - decrypterAddress: keys.ethAddress, - dataNftAddress: contractAddress, - signature, - nonce - } const response = await withRetrial(async () => { + const { nonce, signature } = await createSignature() + + const payload = { + transactionId: txId, + chainId, + decrypterAddress: keys.ethAddress, + dataNftAddress: contractAddress, + signature, + nonce + } try { const res = await axios({ method: 'post', url: `${decryptorURL}/api/services/decrypt`, data: payload, - timeout: 30000 + timeout: 30000, + validateStatus: (status) => { + return ( + (status >= 200 && status < 300) || status === 400 || status === 403 + ) + } }) + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_INFO, + `Decrypt request successful. Status: ${res.status}, ${res.statusText}` + ) + + if (res.status === 400 || res.status === 403) { + // Return error response, to avoid retry for unnecessary errors + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `bProvider exception on decrypt DDO. Status: ${res.status}, ${res.statusText}` + ) + return res + } + if (res.status !== 200 && res.status !== 201) { const message = `bProvider exception on decrypt DDO. Status: ${res.status}, ${res.statusText}` INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, message) - throw new Error(message) // do NOT retry + throw new Error(message) // Retry 5XX errors } return res } catch (err: any) { @@ -313,6 +342,10 @@ export abstract class BaseEventProcessor { } }) + if (response.status === 400 || response.status === 403) { + throw new Error(`Provider validation failed: ${response.statusText}`) + } + let responseHash if (response.data instanceof Object) { responseHash = create256Hash(JSON.stringify(response.data)) @@ -334,6 +367,9 @@ export abstract class BaseEventProcessor { } else { const node = OceanNode.getInstance(config, await getDatabase()) if (nodeId === decryptorURL) { + // Fetch nonce and signature for local node path + const { nonce, signature } = await createSignature() + const decryptDDOTask: DecryptDDOCommand = { command: PROTOCOL_COMMANDS.DECRYPT_DDO, transactionId: txId, @@ -358,6 +394,8 @@ export abstract class BaseEventProcessor { } } else { try { + const { nonce, signature } = await createSignature() + const p2pNode = await node.getP2PNode() let isBinaryContent = false const sink = async function (source: any) { diff --git a/src/components/Indexer/processors/MetadataEventProcessor.ts b/src/components/Indexer/processors/MetadataEventProcessor.ts index 8e8c754b4..606612a86 100644 --- a/src/components/Indexer/processors/MetadataEventProcessor.ts +++ b/src/components/Indexer/processors/MetadataEventProcessor.ts @@ -49,10 +49,16 @@ export class MetadataEventProcessor extends BaseEventProcessor { ERC721Template.abi, eventName ) + const metadata = decodedEventData.args[4] + INDEXER_LOGGER.logMessage( + `Decoded metadata event ${JSON.stringify(metadata)}...`, + true + ) const metadataHash = decodedEventData.args[5] const flag = decodedEventData.args[3] const owner = decodedEventData.args[0] + // TODO: Decrypt DDO only for the case where REVOKED is updated to ACTIVE const ddo = await this.decryptDDO( decodedEventData.args[2], flag, @@ -208,17 +214,10 @@ export class MetadataEventProcessor extends BaseEventProcessor { if (eventName === EVENTS.METADATA_UPDATED) { if (!previousDdoInstance) { INDEXER_LOGGER.logMessage( - `Previous DDO with did ${ddoInstance.getDid()} was not found the database. Maybe it was deleted/hidden to some violation issues`, + `Previous DDO with did ${ddoInstance.getDid()} was not found the database`, true ) - await ddoState.update( - this.networkId, - did, - event.address, - event.transactionHash, - false, - `Previous DDO with did ${ddoInstance.getDid()} was not found the database. Maybe it was deleted/hidden to some violation issues` - ) + // Consider make the UPDATE flow to work as an upsert return } const [isUpdateable, error] = this.isUpdateable( From 547dedeb4c579d773994e8b2d99c322de3295d4c Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Wed, 14 Jan 2026 18:36:49 +0200 Subject: [PATCH 2/5] Remove ddo index for Deprecated and Revoked states, allow decrypt for End of life metadata state --- .../processors/MetadataEventProcessor.ts | 23 +++++++++++++++++++ src/components/core/handler/ddoHandler.ts | 8 +------ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/components/Indexer/processors/MetadataEventProcessor.ts b/src/components/Indexer/processors/MetadataEventProcessor.ts index 606612a86..54a99dfde 100644 --- a/src/components/Indexer/processors/MetadataEventProcessor.ts +++ b/src/components/Indexer/processors/MetadataEventProcessor.ts @@ -58,7 +58,30 @@ export class MetadataEventProcessor extends BaseEventProcessor { const metadataHash = decodedEventData.args[5] const flag = decodedEventData.args[3] const owner = decodedEventData.args[0] +<<<<<<< Updated upstream // TODO: Decrypt DDO only for the case where REVOKED is updated to ACTIVE +======= + + const dataNftAddress = ethers.getAddress(event.address) + + const templateContract = new ethers.Contract( + dataNftAddress, + ERC721Template.abi, + signer + ) + const metaData = await templateContract.getMetaData() + const metaDataState = Number(metaData[2]) + + if ([MetadataStates.DEPRECATED, MetadataStates.REVOKED].includes(metaDataState)) { + INDEXER_LOGGER.logMessage( + `Metadata state is ${metaDataState}, DDO is deleted`, + true + ) + await ddoState.delete(did) + return + } + +>>>>>>> Stashed changes const ddo = await this.decryptDDO( decodedEventData.args[2], flag, diff --git a/src/components/core/handler/ddoHandler.ts b/src/components/core/handler/ddoHandler.ts index 6573514e1..df77a25be 100644 --- a/src/components/core/handler/ddoHandler.ts +++ b/src/components/core/handler/ddoHandler.ts @@ -280,13 +280,7 @@ export class DecryptDdoHandler extends CommandHandler { ) const metaData = await templateContract.getMetaData() const metaDataState = Number(metaData[2]) - if ( - [ - MetadataStates.END_OF_LIFE, - MetadataStates.DEPRECATED, - MetadataStates.REVOKED - ].includes(metaDataState) - ) { + if ([MetadataStates.DEPRECATED, MetadataStates.REVOKED].includes(metaDataState)) { CORE_LOGGER.logMessage(`Decrypt DDO: error metadata state ${metaDataState}`, true) return { stream: null, From 15daa77fb3b2efaf002b1849dd1b8f3a08b01fde Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Wed, 14 Jan 2026 18:38:21 +0200 Subject: [PATCH 3/5] fixed conflict --- src/components/Indexer/processors/MetadataEventProcessor.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/components/Indexer/processors/MetadataEventProcessor.ts b/src/components/Indexer/processors/MetadataEventProcessor.ts index 54a99dfde..f919a4bc9 100644 --- a/src/components/Indexer/processors/MetadataEventProcessor.ts +++ b/src/components/Indexer/processors/MetadataEventProcessor.ts @@ -58,9 +58,6 @@ export class MetadataEventProcessor extends BaseEventProcessor { const metadataHash = decodedEventData.args[5] const flag = decodedEventData.args[3] const owner = decodedEventData.args[0] -<<<<<<< Updated upstream - // TODO: Decrypt DDO only for the case where REVOKED is updated to ACTIVE -======= const dataNftAddress = ethers.getAddress(event.address) @@ -81,7 +78,6 @@ export class MetadataEventProcessor extends BaseEventProcessor { return } ->>>>>>> Stashed changes const ddo = await this.decryptDDO( decodedEventData.args[2], flag, From a9cc9f7ba6a015ec759ad57b5bca07a4fbcdc46e Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Wed, 14 Jan 2026 19:29:21 +0200 Subject: [PATCH 4/5] small fixes --- .../Indexer/processors/MetadataEventProcessor.ts | 12 +++++------- src/components/core/handler/ddoHandler.ts | 1 + 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/components/Indexer/processors/MetadataEventProcessor.ts b/src/components/Indexer/processors/MetadataEventProcessor.ts index f919a4bc9..ac4108965 100644 --- a/src/components/Indexer/processors/MetadataEventProcessor.ts +++ b/src/components/Indexer/processors/MetadataEventProcessor.ts @@ -13,7 +13,7 @@ import { INDEXER_LOGGER } from '../../../utils/logging/common.js' import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js' import { asyncCallWithTimeout } from '../../../utils/util.js' import { PolicyServer } from '../../policyServer/index.js' -import { wasNFTDeployedByOurFactory, getPricingStatsForDddo } from '../utils.js' +import { wasNFTDeployedByOurFactory, getPricingStatsForDddo, getDid } from '../utils.js' import { BaseEventProcessor } from './BaseProcessor.js' import ERC721Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC721Template.sol/ERC721Template.json' with { type: 'json' } import { Purgatory } from '../purgatory.js' @@ -51,16 +51,14 @@ export class MetadataEventProcessor extends BaseEventProcessor { ) const metadata = decodedEventData.args[4] - INDEXER_LOGGER.logMessage( - `Decoded metadata event ${JSON.stringify(metadata)}...`, - true - ) const metadataHash = decodedEventData.args[5] const flag = decodedEventData.args[3] const owner = decodedEventData.args[0] const dataNftAddress = ethers.getAddress(event.address) + did = getDid(event.address, chainId) + const templateContract = new ethers.Contract( dataNftAddress, ERC721Template.abi, @@ -71,9 +69,10 @@ export class MetadataEventProcessor extends BaseEventProcessor { if ([MetadataStates.DEPRECATED, MetadataStates.REVOKED].includes(metaDataState)) { INDEXER_LOGGER.logMessage( - `Metadata state is ${metaDataState}, DDO is deleted`, + `Delete DDO because Metadata state is ${metaDataState}`, true ) + await ddoDatabase.delete(did) await ddoState.delete(did) return } @@ -236,7 +235,6 @@ export class MetadataEventProcessor extends BaseEventProcessor { `Previous DDO with did ${ddoInstance.getDid()} was not found the database`, true ) - // Consider make the UPDATE flow to work as an upsert return } const [isUpdateable, error] = this.isUpdateable( diff --git a/src/components/core/handler/ddoHandler.ts b/src/components/core/handler/ddoHandler.ts index df77a25be..36e8de4f6 100644 --- a/src/components/core/handler/ddoHandler.ts +++ b/src/components/core/handler/ddoHandler.ts @@ -294,6 +294,7 @@ export class DecryptDdoHandler extends CommandHandler { if ( ![ MetadataStates.ACTIVE, + MetadataStates.END_OF_LIFE, MetadataStates.ORDERING_DISABLED, MetadataStates.UNLISTED ].includes(metaDataState) From a80babfa10cd5523e839496749d9fab9ae51078c Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Thu, 15 Jan 2026 09:46:45 +0200 Subject: [PATCH 5/5] adapt revoked and deprecated logic --- .../processors/MetadataEventProcessor.ts | 43 ++++++++++++++++--- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/src/components/Indexer/processors/MetadataEventProcessor.ts b/src/components/Indexer/processors/MetadataEventProcessor.ts index ac4108965..89389b208 100644 --- a/src/components/Indexer/processors/MetadataEventProcessor.ts +++ b/src/components/Indexer/processors/MetadataEventProcessor.ts @@ -65,16 +65,47 @@ export class MetadataEventProcessor extends BaseEventProcessor { signer ) const metaData = await templateContract.getMetaData() - const metaDataState = Number(metaData[2]) + const metadataState = Number(metaData[2]) - if ([MetadataStates.DEPRECATED, MetadataStates.REVOKED].includes(metaDataState)) { + if ([MetadataStates.DEPRECATED, MetadataStates.REVOKED].includes(metadataState)) { INDEXER_LOGGER.logMessage( - `Delete DDO because Metadata state is ${metaDataState}`, + `Delete DDO because Metadata state is ${metadataState}`, true ) - await ddoDatabase.delete(did) - await ddoState.delete(did) - return + const { ddo: ddoDatabase } = await getDatabase() + const ddo = await ddoDatabase.retrieve(did) + if (!ddo) { + INDEXER_LOGGER.logMessage( + `Detected MetadataState changed for ${did}, but it does not exists.` + ) + return + } + + const ddoInstance = DDOManager.getDDOClass(ddo) + + INDEXER_LOGGER.logMessage( + `DDO became non-visible from ${ + ddoInstance.getAssetFields().indexedMetadata.nft.state + } to ${metadataState}` + ) + + const shortDdoInstance = DDOManager.getDDOClass({ + id: ddo.id, + version: 'deprecated', + chainId, + nftAddress: ddo.nftAddress, + indexedMetadata: { + nft: { + state: metadataState + } + } + }) + + const savedDDO = await this.createOrUpdateDDO( + shortDdoInstance, + EVENTS.METADATA_STATE + ) + return savedDDO } const ddo = await this.decryptDDO(