From 49acead14bd8816af6c03bd701112a33d976f50b Mon Sep 17 00:00:00 2001 From: pawana_backbase Date: Fri, 20 Feb 2026 12:32:45 +0530 Subject: [PATCH 1/7] portfolio-trading-accounts upsert --- .../configuration/InvestmentClientConfig.java | 7 + .../InvestmentServiceConfiguration.java | 5 +- .../stream/investment/InvestmentData.java | 4 + .../stream/investment/model/Account.java | 13 + .../model/InvestmentPortfolioAccount.java | 12 + .../investment/saga/InvestmentSaga.java | 34 ++ .../service/InvestmentPortfolioService.java | 329 ++++++++++++++++++ 7 files changed, 402 insertions(+), 2 deletions(-) create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/Account.java create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolioAccount.java diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java index d5c6289c8..09e913129 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java @@ -12,6 +12,7 @@ import com.backbase.investment.api.service.v1.InvestmentProductsApi; import com.backbase.investment.api.service.v1.PaymentsApi; import com.backbase.investment.api.service.v1.PortfolioApi; +import com.backbase.investment.api.service.v1.PortfolioTradingAccountsApi; import com.backbase.stream.clients.config.CompositeApiClientConfig; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.databind.ObjectMapper; @@ -117,6 +118,12 @@ public PaymentsApi paymentsApi(ApiClient investmentApiClient) { return new PaymentsApi(investmentApiClient); } + @Bean + @ConditionalOnMissingBean + public PortfolioTradingAccountsApi portfolioTradingAccountsApi(ApiClient investmentApiClient) { + return new PortfolioTradingAccountsApi(investmentApiClient); + } + @Bean @ConditionalOnMissingBean public CurrencyApi currencyApi(ApiClient investmentApiClient) { diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java index a6fbe808c..dc3561f7d 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java @@ -11,6 +11,7 @@ import com.backbase.investment.api.service.v1.InvestmentProductsApi; import com.backbase.investment.api.service.v1.PaymentsApi; import com.backbase.investment.api.service.v1.PortfolioApi; +import com.backbase.investment.api.service.v1.PortfolioTradingAccountsApi; import com.backbase.stream.clients.autoconfigure.DbsApiClientsAutoConfiguration; import com.backbase.stream.investment.saga.InvestmentAssetUniverseSaga; import com.backbase.stream.investment.saga.InvestmentContentSaga; @@ -59,10 +60,10 @@ public CustomIntegrationApiService customIntegrationApiService(ApiClient apiClie @Bean public InvestmentPortfolioService investmentPortfolioService(PortfolioApi portfolioApi, - InvestmentProductsApi investmentProductsApi, PaymentsApi paymentsApi, + InvestmentProductsApi investmentProductsApi, PaymentsApi paymentsApi, PortfolioTradingAccountsApi portfolioTradingAccountsApi, InvestmentIngestionConfigurationProperties configurationProperties) { return new InvestmentPortfolioService(investmentProductsApi, portfolioApi, paymentsApi, - configurationProperties); + portfolioTradingAccountsApi, configurationProperties); } @Bean diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java index e6ab6ea3c..4be21cdb2 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java @@ -4,7 +4,9 @@ import com.backbase.investment.api.service.v1.model.InvestorModelPortfolio; import com.backbase.investment.api.service.v1.model.PortfolioList; import com.backbase.investment.api.service.v1.model.PortfolioProduct; +import com.backbase.investment.api.service.v1.model.PortfolioTradingAccount; import com.backbase.investment.api.service.v1.model.ProductTypeEnum; +import com.backbase.stream.investment.model.InvestmentPortfolioAccount; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -28,6 +30,8 @@ public class InvestmentData { private List portfolioProducts; private InvestmentAssetData investmentAssetData; private List portfolios; + private List investmentPortfolioAccounts; + private List portfolioTradingAccounts; public Map> getClientsByLeExternalId() { Map> clientsByLeExternalId = new HashMap<>(); diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/Account.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/Account.java new file mode 100644 index 000000000..135ee52bc --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/Account.java @@ -0,0 +1,13 @@ +package com.backbase.stream.investment.model; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class Account { + private String accountExternalId; + private Boolean isDefault; + private Boolean isInternal; + private String productTypeExternalId; +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolioAccount.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolioAccount.java new file mode 100644 index 000000000..de829444f --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolioAccount.java @@ -0,0 +1,12 @@ +package com.backbase.stream.investment.model; + +import java.util.List; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class InvestmentPortfolioAccount { + private String portfolioExternalId; + private List accounts; +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java index 37d4ef3fc..b253ee2c7 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java @@ -1,8 +1,10 @@ package com.backbase.stream.investment.saga; +import com.backbase.investment.api.service.v1.model.PortfolioTradingAccount; import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.InvestmentTask; +import com.backbase.stream.investment.model.InvestmentPortfolioAccount; import com.backbase.stream.investment.service.AsyncTaskService; import com.backbase.stream.investment.service.InvestmentClientService; import com.backbase.stream.investment.service.InvestmentModelPortfolioService; @@ -56,6 +58,7 @@ public class InvestmentSaga implements StreamTaskExecutor { public static final String RESULT_FAILED = "failed"; private static final String INVESTMENT_PRODUCTS = "investment-products"; + private static final String INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS = "investment-portfolio-trading-accounts"; private static final String INVESTMENT_PORTFOLIO_MODELS = "investment-portfolio-models"; private static final String INVESTMENT_PORTFOLIOS = "investment-portfolios"; private static final String PROCESSING_PREFIX = "Processing "; @@ -83,6 +86,7 @@ public Mono executeTask(InvestmentTask streamTask) { .flatMap(this::upsertInvestmentPortfolios) .flatMap(this::upsertInvestmentPortfolioDeposits) .flatMap(this::upsertPortfoliosAllocations) + .flatMap(this::upsertPortfolioTradingAccounts) .doOnSuccess(completedTask -> log.info( "Successfully completed investment saga: taskId={}, taskName={}, state={}", completedTask.getId(), completedTask.getName(), completedTask.getState())) @@ -263,6 +267,36 @@ private Mono upsertInvestmentProducts(InvestmentTask investmentT }); } + private Mono upsertPortfolioTradingAccounts(InvestmentTask investmentTask) { + List portfolioTradingAccounts = investmentTask.getData().getPortfolioTradingAccounts(); + List investmentPortfolioAccounts = investmentTask.getData().getInvestmentPortfolioAccounts(); + int accountsCount = portfolioTradingAccounts.size(); + + log.info("Starting investment portfolio trading accounts upsert: taskId={}, arrangementCount={}", + investmentTask.getId(), accountsCount); + + investmentTask.info(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, null, investmentTask.getName(), + investmentTask.getId(), PROCESSING_PREFIX + accountsCount + " investment portfolio trading accounts"); + + return investmentPortfolioService.upsertPortfolioTradingAccounts(portfolioTradingAccounts, investmentPortfolioAccounts) + .map(products -> { + investmentTask.info(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, RESULT_CREATED, + investmentTask.getName(), investmentTask.getId(), + UPSERTED_PREFIX + products.size() + " investment portfolio trading accounts"); + log.info("Successfully upserted all investment portfolio trading accounts: taskId={}, productCount={}", + investmentTask.getId(), products.size()); + + return investmentTask; + }) + .doOnError(throwable -> { + log.error("Failed to upsert investment portfolio trading accounts: taskId={}, arrangementCount={}", + investmentTask.getId(), accountsCount, throwable); + investmentTask.error(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, RESULT_FAILED, + investmentTask.getName(), investmentTask.getId(), + "Failed to upsert investment portfolio trading accounts: " + throwable.getMessage()); + }); + } + /** * Upserts investment clients for all users in the investment data. * diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java index 209b1231d..6d1804437 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java @@ -3,6 +3,7 @@ import com.backbase.investment.api.service.v1.InvestmentProductsApi; import com.backbase.investment.api.service.v1.PaymentsApi; import com.backbase.investment.api.service.v1.PortfolioApi; +import com.backbase.investment.api.service.v1.PortfolioTradingAccountsApi; import com.backbase.investment.api.service.v1.model.Deposit; import com.backbase.investment.api.service.v1.model.DepositRequest; import com.backbase.investment.api.service.v1.model.DepositTypeEnum; @@ -10,11 +11,14 @@ import com.backbase.investment.api.service.v1.model.InvestorModelPortfolio; import com.backbase.investment.api.service.v1.model.PaginatedDepositList; import com.backbase.investment.api.service.v1.model.PaginatedPortfolioProductList; +import com.backbase.investment.api.service.v1.model.PaginatedPortfolioTradingAccountList; import com.backbase.investment.api.service.v1.model.PatchedPortfolioProductCreateUpdateRequest; import com.backbase.investment.api.service.v1.model.PatchedPortfolioUpdateRequest; import com.backbase.investment.api.service.v1.model.PortfolioList; import com.backbase.investment.api.service.v1.model.PortfolioProduct; import com.backbase.investment.api.service.v1.model.PortfolioProductCreateUpdateRequest; +import com.backbase.investment.api.service.v1.model.PortfolioTradingAccount; +import com.backbase.investment.api.service.v1.model.PortfolioTradingAccountRequest; import com.backbase.investment.api.service.v1.model.ProductTypeEnum; import com.backbase.investment.api.service.v1.model.Status08fEnum; import com.backbase.investment.api.service.v1.model.StatusA3dEnum; @@ -22,6 +26,7 @@ import com.backbase.stream.investment.InvestmentArrangement; import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.ModelPortfolio; +import com.backbase.stream.investment.model.InvestmentPortfolioAccount; import java.time.OffsetDateTime; import java.util.Arrays; import java.util.Collection; @@ -69,6 +74,7 @@ public class InvestmentPortfolioService { private final InvestmentProductsApi productsApi; private final PortfolioApi portfolioApi; private final PaymentsApi paymentsApi; + private final PortfolioTradingAccountsApi portfolioTradingAccountsApi; private final InvestmentIngestionConfigurationProperties config; public Mono> upsertPortfolios(List investmentArrangements, @@ -564,6 +570,329 @@ private Mono createDeposit(PortfolioList portfolio, double defaultAmoun }); } + /** + * Upserts portfolio trading accounts by matching them with investment portfolio accounts. + * + *

This method processes a list of portfolio trading accounts and associates each with its + * corresponding portfolio by resolving the portfolio UUID through the investment portfolio accounts. + * + *

Processing flow: + *

    + *
  1. Validates input lists are not null or empty
  2. + *
  3. For each trading account: + *
      + *
    • Resolves the portfolio external ID from investment portfolio accounts
    • + *
    • Fetches the portfolio UUID using the external ID
    • + *
    • Creates or updates the trading account with the resolved portfolio UUID
    • + *
    + *
  4. + *
  5. Failed trading accounts are logged and skipped (not propagated to prevent batch failures)
  6. + *
+ * + * @param portfolioTradingAccounts the trading accounts to upsert (may be null or empty) + * @param investmentPortfolioAccounts the portfolio accounts used to resolve portfolio associations + * @return Mono emitting a list of successfully upserted trading accounts, empty list if input is null/empty + */ + public Mono> upsertPortfolioTradingAccounts( + List portfolioTradingAccounts, + List investmentPortfolioAccounts) { + + log.info("Upserting portfolio trading accounts: count={}", + portfolioTradingAccounts != null ? portfolioTradingAccounts.size() : 0); + + if (portfolioTradingAccounts == null || portfolioTradingAccounts.isEmpty()) { + return Mono.just(List.of()); + } + + return Flux.fromIterable(portfolioTradingAccounts) + .flatMap(account -> upsertPortfolioTradingAccount(account, investmentPortfolioAccounts)) + .collectList(); + } + + /** + * Upserts a single portfolio trading account by resolving its portfolio association. + * + *

This method: + *

    + *
  1. Resolves the portfolio UUID from investment portfolio accounts
  2. + *
  3. Sets the resolved portfolio UUID on the trading account
  4. + *
  5. Creates or updates the trading account via the API
  6. + *
+ * + *

Errors during processing are logged and result in an empty Mono to prevent + * failing the entire batch operation. + * + * @param tradingAccount the trading account to upsert + * @param investmentPortfolioAccounts the portfolio accounts used for portfolio resolution + * @return Mono emitting the upserted trading account, or empty if processing fails + */ + private Mono upsertPortfolioTradingAccount( + PortfolioTradingAccount tradingAccount, List investmentPortfolioAccounts) { + + String externalAccountId = tradingAccount.getExternalAccountId(); + log.debug("Processing trading account: externalAccountId={}", externalAccountId); + + return resolvePortfolioUuid(tradingAccount, investmentPortfolioAccounts) + .flatMap(portfolioUuid -> { + tradingAccount.setPortfolio(portfolioUuid); + PortfolioTradingAccountRequest request = buildTradingAccountRequest(tradingAccount); + return upsertPortfolioTradingAccount(request); + }) + .doOnSuccess(created -> log.info( + "Successfully upserted portfolio trading account: uuid={}, externalAccountId={}, portfolioUuid={}", + created.getUuid(), externalAccountId, created.getPortfolio())) + .doOnError(throwable -> log.error( + "Failed to upsert portfolio trading account: externalAccountId={}", + externalAccountId, throwable)) + .onErrorResume(throwable -> { + log.warn("Skipping trading account due to error: externalAccountId={}", externalAccountId); + return Mono.empty(); + }); + } + + /** + * Resolves the portfolio UUID for a trading account. + * + *

Resolution process: + *

    + *
  1. Finds the portfolio external ID by matching the trading account's external account ID + * with accounts in the investment portfolio accounts list
  2. + *
  3. If found, fetches the corresponding portfolio UUID from the portfolio service
  4. + *
  5. If not found, returns empty and logs a warning
  6. + *
+ * + * @param tradingAccount the trading account requiring portfolio resolution + * @param investmentPortfolioAccounts the list of portfolio accounts to search + * @return Mono emitting the resolved portfolio UUID, or empty if no match found + */ + private Mono resolvePortfolioUuid( + PortfolioTradingAccount tradingAccount, List investmentPortfolioAccounts) { + + String portfolioExternalId = findPortfolioExternalId(tradingAccount, investmentPortfolioAccounts); + + if (portfolioExternalId == null) { + log.warn("No matching portfolio found for trading account: externalAccountId={}", + tradingAccount.getExternalAccountId()); + return Mono.empty(); + } + + log.debug("Resolved portfolioExternalId={} for trading account={}", + portfolioExternalId, tradingAccount.getExternalAccountId()); + + return fetchPortfolioInternalId(portfolioExternalId); + } + + private String findPortfolioExternalId( + PortfolioTradingAccount tradingAccount, List investmentPortfolioAccounts) { + + if (investmentPortfolioAccounts == null || tradingAccount == null) { + return null; + } + + String targetExternalAccountId = tradingAccount.getExternalAccountId(); + if (targetExternalAccountId == null) { + log.warn("Trading account has null externalAccountId"); + return null; + } + + return investmentPortfolioAccounts.stream() + .filter(Objects::nonNull) + .filter(ipa -> ipa.getAccounts() != null) + .flatMap(ipa -> ipa.getAccounts().stream() + .filter(Objects::nonNull) + .filter(account -> targetExternalAccountId.equals(account.getAccountExternalId())) + .map(account -> ipa.getPortfolioExternalId())) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); + } + + private Mono fetchPortfolioInternalId(String portfolioExternalId) { + log.debug("Fetching portfolio internal ID: externalId={}", portfolioExternalId); + + return listExistingPortfolios(portfolioExternalId) + .map(PortfolioList::getUuid) + .doOnSuccess(uuid -> log.debug("Resolved portfolio UUID={} for externalId={}", + uuid, portfolioExternalId)) + .doOnError(throwable -> log.error( + "Failed to fetch portfolio internal ID: externalId={}", + portfolioExternalId, throwable)); + } + + private PortfolioTradingAccountRequest buildTradingAccountRequest(PortfolioTradingAccount tradingAccount) { + return new PortfolioTradingAccountRequest() + .portfolio(tradingAccount.getPortfolio()) + .accountId(tradingAccount.getAccountId()) + .externalAccountId(tradingAccount.getExternalAccountId()) + .isDefault(tradingAccount.getIsDefault()) + .isInternal(tradingAccount.getIsInternal()); + } + + /** + * Upserts a portfolio trading account using the provided request. + * + *

Implementation of upsert pattern: + *

    + *
  1. Searches for an existing trading account by external account ID
  2. + *
  3. If found, patches the existing account
  4. + *
  5. If not found, creates a new trading account
  6. + *
+ * + * @param request the trading account request containing all necessary fields + * @return Mono emitting the created or updated trading account + */ + public Mono upsertPortfolioTradingAccount(PortfolioTradingAccountRequest request) { + + return listExistingPortfolioTradingAccounts(request) + .flatMap(existing -> patchExistingPortfolioTradingAccount(existing, request)) + .switchIfEmpty(createPortfolioTradingAccount(request)) + .doOnSuccess(account -> log.info( + "Successfully upserted portfolio trading account: uuid={}, externalAccountId={}", + account.getUuid(), request.getExternalAccountId())) + .doOnError(throwable -> log.error( + "Failed to upsert portfolio trading account: externalAccountId={}", + request.getExternalAccountId(), throwable)); + } + + /** + * Patches an existing portfolio trading account with new values. + * + *

This method attempts to update the existing trading account. If the patch + * operation fails (e.g., due to validation errors or conflicts), it falls back + * to returning the existing account rather than failing the operation. + * + *

Fallback behavior ensures data integrity by preferring existing valid data + * over propagating errors that might cause batch failures. + * + * @param existing the existing trading account to update + * @param request the request containing updated values + * @return Mono emitting the updated trading account, or the existing account if patch fails + */ + private Mono patchExistingPortfolioTradingAccount( + PortfolioTradingAccount existing, PortfolioTradingAccountRequest request) { + + String uuid = existing.getUuid().toString(); + + log.debug("Patching portfolio trading account: uuid={}, externalAccountId={}", + uuid, request.getExternalAccountId()); + + return portfolioTradingAccountsApi.patchPortfolioTradingAccount(uuid, request) + .doOnSuccess(updated -> log.info( + "Successfully patched portfolio trading account: uuid={}", updated.getUuid())) + .doOnError(throwable -> logPortfolioTradingAccountPatchError(uuid, throwable)) + .onErrorResume(WebClientResponseException.class, ex -> { + log.info("Using existing portfolio trading account due to patch failure: uuid={}", uuid); + return Mono.just(existing); + }); + } + + /** + * Creates a new portfolio trading account. + * + *

This method is called when no existing trading account is found during the + * upsert operation. It creates a new trading account with the provided details. + * + * @param request the request containing trading account details + * @return Mono emitting the newly created trading account + */ + public Mono createPortfolioTradingAccount( + PortfolioTradingAccountRequest request) { + + return portfolioTradingAccountsApi.createPortfolioTradingAccount(request) + .doOnSuccess(account -> log.info( + "Created portfolio trading account: uuid={}, externalAccountId={}", + account.getUuid(), request.getExternalAccountId())) + .doOnError(throwable -> logPortfolioTradingAccountCreationError(request.getExternalAccountId(), throwable)); + } + + /** + * Lists existing portfolio trading accounts matching the request criteria. + * + *

This method queries the trading accounts API using the external account ID + * from the request. It validates the result to ensure data consistency: + *

    + *
  • Returns empty if no matching account is found
  • + *
  • Returns the single matching account if exactly one is found
  • + *
  • Throws IllegalStateException if multiple accounts are found (data setup issue)
  • + *
+ * + * @param request the request containing the external account ID to search for + * @return Mono emitting the matching trading account, or empty if not found + * @throws IllegalStateException if more than one trading account is found with the same external account ID + */ + private Mono listExistingPortfolioTradingAccounts(PortfolioTradingAccountRequest request) { + + String externalAccountId = request.getExternalAccountId(); + + return portfolioTradingAccountsApi.listPortfolioTradingAccounts( + 1, null, null, externalAccountId, null, null, null) + .doOnSuccess(accounts -> log.debug( + "List portfolio trading accounts query completed: externalAccountId={}, found={} results", + externalAccountId, accounts != null ? accounts.getResults().size() : 0)) + .doOnError(throwable -> log.error( + "Failed to list existing portfolio trading accounts: externalAccountId={}", + externalAccountId, throwable)) + .flatMap(accounts -> validateAndExtractPortfolioTradingAccount(accounts, externalAccountId)); + } + + /** + * Validates and extracts a single trading account from the search results. + * + *

This method enforces data consistency by: + *

    + *
  • Returning empty for no results (expected case for new accounts)
  • + *
  • Returning the account for exactly one result
  • + *
  • Throwing an exception for multiple results (indicates data setup issue)
  • + *
+ * + * @param accounts the paginated list of trading accounts from the API + * @param externalAccountId the external account ID used in the search (for logging) + * @return Mono emitting the single trading account, or empty if no results + * @throws IllegalStateException if multiple trading accounts are found + */ + private Mono validateAndExtractPortfolioTradingAccount( + PaginatedPortfolioTradingAccountList accounts, String externalAccountId) { + + if (accounts == null || CollectionUtils.isEmpty(accounts.getResults())) { + log.info("No existing portfolio trading account found: externalAccountId={}", externalAccountId); + return Mono.empty(); + } + + int resultCount = accounts.getResults().size(); + if (resultCount > 1) { + log.error("Data setup issue: Found {} portfolio trading accounts with externalAccountId={}, " + + "expected at most 1. Please review trading account configuration.", + resultCount, externalAccountId); + return Mono.error(new IllegalStateException( + String.format("Data setup issue: Found %d portfolio trading accounts with externalAccountId=%s, " + + "expected at most 1. Please review trading account configuration.", + resultCount, externalAccountId))); + } + + PortfolioTradingAccount existingAccount = accounts.getResults().getFirst(); + log.info("Found existing portfolio trading account: uuid={}, externalAccountId={}", + existingAccount.getUuid(), externalAccountId); + return Mono.just(existingAccount); + } + + private void logPortfolioTradingAccountPatchError(String uuid, Throwable throwable) { + if (throwable instanceof WebClientResponseException ex) { + log.warn("PATCH portfolio trading account failed: uuid={}, status={}, body={}", + uuid, ex.getStatusCode(), ex.getResponseBodyAsString()); + } else { + log.warn("PATCH portfolio trading account failed: uuid={}", uuid, throwable); + } + } + + private void logPortfolioTradingAccountCreationError(String externalAccountId, Throwable throwable) { + if (throwable instanceof WebClientResponseException ex) { + log.warn("Portfolio trading account create failed: externalAccountId={}, status={}, body={}", + externalAccountId, ex.getStatusCode(), ex.getResponseBodyAsString()); + } else { + log.warn("Portfolio trading account create failed: externalAccountId={}", externalAccountId, throwable); + } + } + /** * Logs portfolio creation errors with detailed information about the failure. * From aedb524b8af4bdbe502ed16fee11a1a55c8716d9 Mon Sep 17 00:00:00 2001 From: pawana_backbase Date: Tue, 24 Feb 2026 12:37:54 +0530 Subject: [PATCH 2/7] Portfolio trading accounts from sa --- .../stream/investment/InvestmentData.java | 6 +- .../model/InvestmentPortfolioAccount.java | 12 - ...=> InvestmentPortfolioTradingAccount.java} | 5 +- .../investment/saga/InvestmentSaga.java | 10 +- .../service/InvestmentPortfolioService.java | 255 ++++++++---------- 5 files changed, 119 insertions(+), 169 deletions(-) delete mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolioAccount.java rename stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/{Account.java => InvestmentPortfolioTradingAccount.java} (63%) diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java index 4be21cdb2..5f5f59c7c 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java @@ -4,9 +4,8 @@ import com.backbase.investment.api.service.v1.model.InvestorModelPortfolio; import com.backbase.investment.api.service.v1.model.PortfolioList; import com.backbase.investment.api.service.v1.model.PortfolioProduct; -import com.backbase.investment.api.service.v1.model.PortfolioTradingAccount; import com.backbase.investment.api.service.v1.model.ProductTypeEnum; -import com.backbase.stream.investment.model.InvestmentPortfolioAccount; +import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -30,8 +29,7 @@ public class InvestmentData { private List portfolioProducts; private InvestmentAssetData investmentAssetData; private List portfolios; - private List investmentPortfolioAccounts; - private List portfolioTradingAccounts; + private List investmentPortfolioTradingAccounts; public Map> getClientsByLeExternalId() { Map> clientsByLeExternalId = new HashMap<>(); diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolioAccount.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolioAccount.java deleted file mode 100644 index de829444f..000000000 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolioAccount.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.backbase.stream.investment.model; - -import java.util.List; -import lombok.Builder; -import lombok.Data; - -@Data -@Builder -public class InvestmentPortfolioAccount { - private String portfolioExternalId; - private List accounts; -} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/Account.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolioTradingAccount.java similarity index 63% rename from stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/Account.java rename to stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolioTradingAccount.java index 135ee52bc..2833b0a3c 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/Account.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolioTradingAccount.java @@ -5,9 +5,10 @@ @Data @Builder -public class Account { +public class InvestmentPortfolioTradingAccount { + private String portfolioExternalId; + private String accountId; private String accountExternalId; private Boolean isDefault; private Boolean isInternal; - private String productTypeExternalId; } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java index b253ee2c7..8dbd1dc29 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java @@ -1,10 +1,9 @@ package com.backbase.stream.investment.saga; -import com.backbase.investment.api.service.v1.model.PortfolioTradingAccount; import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.InvestmentTask; -import com.backbase.stream.investment.model.InvestmentPortfolioAccount; +import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; import com.backbase.stream.investment.service.AsyncTaskService; import com.backbase.stream.investment.service.InvestmentClientService; import com.backbase.stream.investment.service.InvestmentModelPortfolioService; @@ -268,9 +267,8 @@ private Mono upsertInvestmentProducts(InvestmentTask investmentT } private Mono upsertPortfolioTradingAccounts(InvestmentTask investmentTask) { - List portfolioTradingAccounts = investmentTask.getData().getPortfolioTradingAccounts(); - List investmentPortfolioAccounts = investmentTask.getData().getInvestmentPortfolioAccounts(); - int accountsCount = portfolioTradingAccounts.size(); + List investmentPortfolioTradingAccounts = investmentTask.getData().getInvestmentPortfolioTradingAccounts(); + int accountsCount = investmentPortfolioTradingAccounts.size(); log.info("Starting investment portfolio trading accounts upsert: taskId={}, arrangementCount={}", investmentTask.getId(), accountsCount); @@ -278,7 +276,7 @@ private Mono upsertPortfolioTradingAccounts(InvestmentTask inves investmentTask.info(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, null, investmentTask.getName(), investmentTask.getId(), PROCESSING_PREFIX + accountsCount + " investment portfolio trading accounts"); - return investmentPortfolioService.upsertPortfolioTradingAccounts(portfolioTradingAccounts, investmentPortfolioAccounts) + return investmentPortfolioService.upsertPortfolioTradingAccounts(investmentPortfolioTradingAccounts) .map(products -> { investmentTask.info(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, RESULT_CREATED, investmentTask.getName(), investmentTask.getId(), diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java index 6d1804437..dc6502e33 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java @@ -26,7 +26,7 @@ import com.backbase.stream.investment.InvestmentArrangement; import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.ModelPortfolio; -import com.backbase.stream.investment.model.InvestmentPortfolioAccount; +import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; import java.time.OffsetDateTime; import java.util.Arrays; import java.util.Collection; @@ -571,143 +571,89 @@ private Mono createDeposit(PortfolioList portfolio, double defaultAmoun } /** - * Upserts portfolio trading accounts by matching them with investment portfolio accounts. + * Upserts portfolio trading accounts derived from the provided investment portfolio accounts. * - *

This method processes a list of portfolio trading accounts and associates each with its - * corresponding portfolio by resolving the portfolio UUID through the investment portfolio accounts. + *

This method constructs {@link PortfolioTradingAccount} instances directly from + * {@link InvestmentPortfolioTradingAccount} data, resolving each account's portfolio UUID + * via the portfolio service before upserting. * *

Processing flow: *

    - *
  1. Validates input lists are not null or empty
  2. - *
  3. For each trading account: + *
  4. Validates input list is not null or empty
  5. + *
  6. For each investment portfolio account: *
      - *
    • Resolves the portfolio external ID from investment portfolio accounts
    • - *
    • Fetches the portfolio UUID using the external ID
    • - *
    • Creates or updates the trading account with the resolved portfolio UUID
    • + *
    • Resolves the portfolio UUID from {@code portfolioExternalId}
    • + *
    • Constructs a {@link PortfolioTradingAccount} with all available fields
    • + *
    • Creates or updates the trading account via the API
    • *
    *
  7. - *
  8. Failed trading accounts are logged and skipped (not propagated to prevent batch failures)
  9. + *
  10. Failed accounts are logged and skipped to prevent batch failures
  11. *
* - * @param portfolioTradingAccounts the trading accounts to upsert (may be null or empty) - * @param investmentPortfolioAccounts the portfolio accounts used to resolve portfolio associations - * @return Mono emitting a list of successfully upserted trading accounts, empty list if input is null/empty + * @param investmentPortfolioTradingAccounts the source accounts containing all required field data + * @return Mono emitting a list of successfully upserted trading accounts, or an empty list if input is null/empty */ public Mono> upsertPortfolioTradingAccounts( - List portfolioTradingAccounts, - List investmentPortfolioAccounts) { + List investmentPortfolioTradingAccounts) { log.info("Upserting portfolio trading accounts: count={}", - portfolioTradingAccounts != null ? portfolioTradingAccounts.size() : 0); + investmentPortfolioTradingAccounts != null ? investmentPortfolioTradingAccounts.size() : 0); - if (portfolioTradingAccounts == null || portfolioTradingAccounts.isEmpty()) { + if (investmentPortfolioTradingAccounts == null || investmentPortfolioTradingAccounts.isEmpty()) { return Mono.just(List.of()); } - return Flux.fromIterable(portfolioTradingAccounts) - .flatMap(account -> upsertPortfolioTradingAccount(account, investmentPortfolioAccounts)) + return Flux.fromIterable(investmentPortfolioTradingAccounts) + .flatMap(this::upsertSingleTradingAccount) .collectList(); } /** - * Upserts a single portfolio trading account by resolving its portfolio association. + * Upserts a single portfolio trading account derived from an investment portfolio account. * *

This method: *

    - *
  1. Resolves the portfolio UUID from investment portfolio accounts
  2. - *
  3. Sets the resolved portfolio UUID on the trading account
  4. + *
  5. Resolves the internal portfolio UUID from the account's {@code portfolioExternalId}
  6. + *
  7. Builds a {@link PortfolioTradingAccountRequest} directly from the source account and resolved UUID
  8. *
  9. Creates or updates the trading account via the API
  10. *
* - *

Errors during processing are logged and result in an empty Mono to prevent - * failing the entire batch operation. + *

Errors are logged and result in an empty Mono to prevent failing the entire batch. * - * @param tradingAccount the trading account to upsert - * @param investmentPortfolioAccounts the portfolio accounts used for portfolio resolution + * @param investmentPortfolioTradingAccount the source account containing all required field data * @return Mono emitting the upserted trading account, or empty if processing fails */ - private Mono upsertPortfolioTradingAccount( - PortfolioTradingAccount tradingAccount, List investmentPortfolioAccounts) { + private Mono upsertSingleTradingAccount( + InvestmentPortfolioTradingAccount investmentPortfolioTradingAccount) { - String externalAccountId = tradingAccount.getExternalAccountId(); + String externalAccountId = investmentPortfolioTradingAccount.getAccountExternalId(); log.debug("Processing trading account: externalAccountId={}", externalAccountId); - return resolvePortfolioUuid(tradingAccount, investmentPortfolioAccounts) - .flatMap(portfolioUuid -> { - tradingAccount.setPortfolio(portfolioUuid); - PortfolioTradingAccountRequest request = buildTradingAccountRequest(tradingAccount); - return upsertPortfolioTradingAccount(request); - }) - .doOnSuccess(created -> log.info( - "Successfully upserted portfolio trading account: uuid={}, externalAccountId={}, portfolioUuid={}", - created.getUuid(), externalAccountId, created.getPortfolio())) - .doOnError(throwable -> log.error( - "Failed to upsert portfolio trading account: externalAccountId={}", - externalAccountId, throwable)) + return fetchPortfolioInternalId(investmentPortfolioTradingAccount.getPortfolioExternalId()) + .map(portfolioUuid -> buildTradingAccountRequest(investmentPortfolioTradingAccount, portfolioUuid)) + .flatMap(this::upsertPortfolioTradingAccount) .onErrorResume(throwable -> { - log.warn("Skipping trading account due to error: externalAccountId={}", externalAccountId); + log.warn("Skipping trading account due to error: externalAccountId={}", externalAccountId, throwable); return Mono.empty(); }); } /** - * Resolves the portfolio UUID for a trading account. + * Fetches the internal portfolio UUID using the portfolio external ID. * - *

Resolution process: - *

    - *
  1. Finds the portfolio external ID by matching the trading account's external account ID - * with accounts in the investment portfolio accounts list
  2. - *
  3. If found, fetches the corresponding portfolio UUID from the portfolio service
  4. - *
  5. If not found, returns empty and logs a warning
  6. - *
+ *

Queries the portfolio service to retrieve the internal UUID + * corresponding to the given external ID. * - * @param tradingAccount the trading account requiring portfolio resolution - * @param investmentPortfolioAccounts the list of portfolio accounts to search - * @return Mono emitting the resolved portfolio UUID, or empty if no match found + * @param portfolioExternalId the external ID of the portfolio + * @return Mono emitting the resolved portfolio UUID + * @throws IllegalStateException if the portfolio cannot be found or multiple results are returned */ - private Mono resolvePortfolioUuid( - PortfolioTradingAccount tradingAccount, List investmentPortfolioAccounts) { - - String portfolioExternalId = findPortfolioExternalId(tradingAccount, investmentPortfolioAccounts); - + private Mono fetchPortfolioInternalId(String portfolioExternalId) { if (portfolioExternalId == null) { - log.warn("No matching portfolio found for trading account: externalAccountId={}", - tradingAccount.getExternalAccountId()); + log.warn("Cannot fetch portfolio internal ID: portfolioExternalId is null"); return Mono.empty(); } - log.debug("Resolved portfolioExternalId={} for trading account={}", - portfolioExternalId, tradingAccount.getExternalAccountId()); - - return fetchPortfolioInternalId(portfolioExternalId); - } - - private String findPortfolioExternalId( - PortfolioTradingAccount tradingAccount, List investmentPortfolioAccounts) { - - if (investmentPortfolioAccounts == null || tradingAccount == null) { - return null; - } - - String targetExternalAccountId = tradingAccount.getExternalAccountId(); - if (targetExternalAccountId == null) { - log.warn("Trading account has null externalAccountId"); - return null; - } - - return investmentPortfolioAccounts.stream() - .filter(Objects::nonNull) - .filter(ipa -> ipa.getAccounts() != null) - .flatMap(ipa -> ipa.getAccounts().stream() - .filter(Objects::nonNull) - .filter(account -> targetExternalAccountId.equals(account.getAccountExternalId())) - .map(account -> ipa.getPortfolioExternalId())) - .filter(Objects::nonNull) - .findFirst() - .orElse(null); - } - - private Mono fetchPortfolioInternalId(String portfolioExternalId) { log.debug("Fetching portfolio internal ID: externalId={}", portfolioExternalId); return listExistingPortfolios(portfolioExternalId) @@ -719,13 +665,32 @@ private Mono fetchPortfolioInternalId(String portfolioExternalId) { portfolioExternalId, throwable)); } - private PortfolioTradingAccountRequest buildTradingAccountRequest(PortfolioTradingAccount tradingAccount) { + /** + * Builds a {@link PortfolioTradingAccountRequest} directly from an {@link InvestmentPortfolioTradingAccount} + * and a resolved portfolio UUID, skipping the intermediate {@link PortfolioTradingAccount} domain object. + * + *

Maps the following fields: + *

    + *
  • {@code accountExternalId} → {@code externalAccountId}
  • + *
  • {@code isDefault} → {@code isDefault}
  • + *
  • {@code isInternal} → {@code isInternal}
  • + *
  • {@code productTypeExternalId} → {@code accountId}
  • + *
  • resolved {@code portfolioUuid} → {@code portfolio}
  • + *
+ * + * @param investmentPortfolioTradingAccount the source account containing field data + * @param portfolioUuid the resolved internal portfolio UUID + * @return the constructed {@link PortfolioTradingAccountRequest} + */ + private PortfolioTradingAccountRequest buildTradingAccountRequest( + InvestmentPortfolioTradingAccount investmentPortfolioTradingAccount, UUID portfolioUuid) { + return new PortfolioTradingAccountRequest() - .portfolio(tradingAccount.getPortfolio()) - .accountId(tradingAccount.getAccountId()) - .externalAccountId(tradingAccount.getExternalAccountId()) - .isDefault(tradingAccount.getIsDefault()) - .isInternal(tradingAccount.getIsInternal()); + .portfolio(portfolioUuid) + .accountId(investmentPortfolioTradingAccount.getAccountId()) + .externalAccountId(investmentPortfolioTradingAccount.getAccountExternalId()) + .isDefault(investmentPortfolioTradingAccount.getIsDefault()) + .isInternal(investmentPortfolioTradingAccount.getIsInternal()); } /** @@ -734,7 +699,7 @@ private PortfolioTradingAccountRequest buildTradingAccountRequest(PortfolioTradi *

Implementation of upsert pattern: *

    *
  1. Searches for an existing trading account by external account ID
  2. - *
  3. If found, patches the existing account
  4. + *
  5. If found, patches the existing account with the new values
  6. *
  7. If not found, creates a new trading account
  8. *
* @@ -755,31 +720,28 @@ public Mono upsertPortfolioTradingAccount(PortfolioTrad } /** - * Patches an existing portfolio trading account with new values. - * - *

This method attempts to update the existing trading account. If the patch - * operation fails (e.g., due to validation errors or conflicts), it falls back - * to returning the existing account rather than failing the operation. + * Patches an existing portfolio trading account with updated values. * - *

Fallback behavior ensures data integrity by preferring existing valid data - * over propagating errors that might cause batch failures. + *

If the patch operation fails (e.g., due to validation errors or conflicts), + * falls back to returning the existing account to preserve data integrity + * and prevent batch failures. * * @param existing the existing trading account to update * @param request the request containing updated values * @return Mono emitting the updated trading account, or the existing account if patch fails */ private Mono patchExistingPortfolioTradingAccount( - PortfolioTradingAccount existing, PortfolioTradingAccountRequest request) { + PortfolioTradingAccount existing, + PortfolioTradingAccountRequest request) { String uuid = existing.getUuid().toString(); - log.debug("Patching portfolio trading account: uuid={}, externalAccountId={}", uuid, request.getExternalAccountId()); return portfolioTradingAccountsApi.patchPortfolioTradingAccount(uuid, request) .doOnSuccess(updated -> log.info( "Successfully patched portfolio trading account: uuid={}", updated.getUuid())) - .doOnError(throwable -> logPortfolioTradingAccountPatchError(uuid, throwable)) + .doOnError(throwable -> logPortfolioTradingAccountError("PATCH", "uuid", uuid, throwable)) .onErrorResume(WebClientResponseException.class, ex -> { log.info("Using existing portfolio trading account due to patch failure: uuid={}", uuid); return Mono.just(existing); @@ -787,40 +749,39 @@ private Mono patchExistingPortfolioTradingAccount( } /** - * Creates a new portfolio trading account. + * Creates a new portfolio trading account via the API. * - *

This method is called when no existing trading account is found during the - * upsert operation. It creates a new trading account with the provided details. + *

Called during the upsert flow when no existing trading account is found. * - * @param request the request containing trading account details + * @param request the request containing all required trading account details * @return Mono emitting the newly created trading account */ - public Mono createPortfolioTradingAccount( - PortfolioTradingAccountRequest request) { + public Mono createPortfolioTradingAccount(PortfolioTradingAccountRequest request) { return portfolioTradingAccountsApi.createPortfolioTradingAccount(request) .doOnSuccess(account -> log.info( "Created portfolio trading account: uuid={}, externalAccountId={}", account.getUuid(), request.getExternalAccountId())) - .doOnError(throwable -> logPortfolioTradingAccountCreationError(request.getExternalAccountId(), throwable)); + .doOnError(throwable -> logPortfolioTradingAccountError( + "CREATE", "externalAccountId", request.getExternalAccountId(), throwable)); } /** - * Lists existing portfolio trading accounts matching the request criteria. + * Lists existing portfolio trading accounts matching the external account ID in the request. * - *

This method queries the trading accounts API using the external account ID - * from the request. It validates the result to ensure data consistency: + *

Validates the result to ensure data consistency: *

    *
  • Returns empty if no matching account is found
  • - *
  • Returns the single matching account if exactly one is found
  • - *
  • Throws IllegalStateException if multiple accounts are found (data setup issue)
  • + *
  • Returns the single matching account if exactly one result is found
  • + *
  • Returns an error if multiple accounts are found (indicates a data setup issue)
  • *
* * @param request the request containing the external account ID to search for * @return Mono emitting the matching trading account, or empty if not found * @throws IllegalStateException if more than one trading account is found with the same external account ID */ - private Mono listExistingPortfolioTradingAccounts(PortfolioTradingAccountRequest request) { + private Mono listExistingPortfolioTradingAccounts( + PortfolioTradingAccountRequest request) { String externalAccountId = request.getExternalAccountId(); @@ -836,22 +797,23 @@ private Mono listExistingPortfolioTradingAccounts(Portf } /** - * Validates and extracts a single trading account from the search results. + * Validates and extracts a single trading account from paginated search results. * - *

This method enforces data consistency by: + *

Enforces data consistency: *

    - *
  • Returning empty for no results (expected case for new accounts)
  • - *
  • Returning the account for exactly one result
  • - *
  • Throwing an exception for multiple results (indicates data setup issue)
  • + *
  • Returns empty for no results — expected for new accounts
  • + *
  • Returns the account for exactly one result
  • + *
  • Returns an error for multiple results — indicates a data setup issue
  • *
* - * @param accounts the paginated list of trading accounts from the API - * @param externalAccountId the external account ID used in the search (for logging) - * @return Mono emitting the single trading account, or empty if no results - * @throws IllegalStateException if multiple trading accounts are found + * @param accounts the paginated list of trading accounts returned by the API + * @param externalAccountId the external account ID used in the search, for logging purposes + * @return Mono emitting the single matching trading account, or empty if no results found + * @throws IllegalStateException if multiple trading accounts are found with the same external account ID */ private Mono validateAndExtractPortfolioTradingAccount( - PaginatedPortfolioTradingAccountList accounts, String externalAccountId) { + PaginatedPortfolioTradingAccountList accounts, + String externalAccountId) { if (accounts == null || CollectionUtils.isEmpty(accounts.getResults())) { log.info("No existing portfolio trading account found: externalAccountId={}", externalAccountId); @@ -875,21 +837,24 @@ private Mono validateAndExtractPortfolioTradingAccount( return Mono.just(existingAccount); } - private void logPortfolioTradingAccountPatchError(String uuid, Throwable throwable) { - if (throwable instanceof WebClientResponseException ex) { - log.warn("PATCH portfolio trading account failed: uuid={}, status={}, body={}", - uuid, ex.getStatusCode(), ex.getResponseBodyAsString()); - } else { - log.warn("PATCH portfolio trading account failed: uuid={}", uuid, throwable); - } - } - - private void logPortfolioTradingAccountCreationError(String externalAccountId, Throwable throwable) { + /** + * Logs errors occurring during portfolio trading account operations. + * + *

Provides enhanced error context for {@link WebClientResponseException}, + * including HTTP status code and response body. For other exceptions, logs + * basic error information. + * + * @param operation a short description of the operation that failed (e.g., "PATCH", "CREATE") + * @param idLabel the label for the identifier (e.g., "uuid", "externalAccountId") + * @param idValue the value of the identifier + * @param throwable the exception that occurred + */ + private void logPortfolioTradingAccountError(String operation, String idLabel, String idValue, Throwable throwable) { if (throwable instanceof WebClientResponseException ex) { - log.warn("Portfolio trading account create failed: externalAccountId={}, status={}, body={}", - externalAccountId, ex.getStatusCode(), ex.getResponseBodyAsString()); + log.warn("Portfolio trading account {} failed: {}={}, status={}, body={}", + operation, idLabel, idValue, ex.getStatusCode(), ex.getResponseBodyAsString()); } else { - log.warn("Portfolio trading account create failed: externalAccountId={}", externalAccountId, throwable); + log.error("Portfolio trading account {} failed: {}={}", operation, idLabel, idValue, throwable); } } From cf460e88ce9143012c5c1b6e5ea3aed74cfae3fe Mon Sep 17 00:00:00 2001 From: pawana_backbase Date: Tue, 24 Feb 2026 19:10:15 +0530 Subject: [PATCH 3/7] Test fixes --- .../service/InvestmentPortfolioService.java | 2 +- .../InvestmentPortfolioServiceTest.java | 381 ++++++++++++++++++ 2 files changed, 382 insertions(+), 1 deletion(-) create mode 100644 stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java index dc6502e33..c41af64d2 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java @@ -710,7 +710,7 @@ public Mono upsertPortfolioTradingAccount(PortfolioTrad return listExistingPortfolioTradingAccounts(request) .flatMap(existing -> patchExistingPortfolioTradingAccount(existing, request)) - .switchIfEmpty(createPortfolioTradingAccount(request)) + .switchIfEmpty(Mono.defer(() -> createPortfolioTradingAccount(request))) .doOnSuccess(account -> log.info( "Successfully upserted portfolio trading account: uuid={}, externalAccountId={}", account.getUuid(), request.getExternalAccountId())) diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java new file mode 100644 index 000000000..bbe891209 --- /dev/null +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java @@ -0,0 +1,381 @@ +package com.backbase.stream.investment.service; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import com.backbase.investment.api.service.v1.InvestmentProductsApi; +import com.backbase.investment.api.service.v1.PaymentsApi; +import com.backbase.investment.api.service.v1.PortfolioApi; +import com.backbase.investment.api.service.v1.PortfolioTradingAccountsApi; +import com.backbase.investment.api.service.v1.model.PaginatedPortfolioListList; +import com.backbase.investment.api.service.v1.model.PaginatedPortfolioTradingAccountList; +import com.backbase.investment.api.service.v1.model.PortfolioList; +import com.backbase.investment.api.service.v1.model.PortfolioTradingAccount; +import com.backbase.investment.api.service.v1.model.PortfolioTradingAccountRequest; +import com.backbase.investment.api.service.v1.model.StatusA3dEnum; +import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; +import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +class InvestmentPortfolioServiceTest { + + private InvestmentProductsApi productsApi; + private PortfolioApi portfolioApi; + private PaymentsApi paymentsApi; + private PortfolioTradingAccountsApi portfolioTradingAccountsApi; + private InvestmentIngestionConfigurationProperties config; + private InvestmentPortfolioService service; + + @BeforeEach + void setUp() { + productsApi = Mockito.mock(InvestmentProductsApi.class); + portfolioApi = Mockito.mock(PortfolioApi.class); + paymentsApi = Mockito.mock(PaymentsApi.class); + portfolioTradingAccountsApi = Mockito.mock(PortfolioTradingAccountsApi.class); + config = Mockito.mock(InvestmentIngestionConfigurationProperties.class); + when(config.getPortfolioActivationPastMonths()).thenReturn(6); + service = new InvestmentPortfolioService( + productsApi, portfolioApi, paymentsApi, portfolioTradingAccountsApi, config); + } + + // ----------------------------------------------------------------------- + // upsertPortfolioTradingAccount — patch path + // ----------------------------------------------------------------------- + + @Test + void upsertPortfolioTradingAccount_existingAccount_patchesAndReturns() { + UUID existingUuid = UUID.randomUUID(); + UUID portfolioUuid = UUID.randomUUID(); + + PortfolioTradingAccount existing = Mockito.mock(PortfolioTradingAccount.class); + when(existing.getUuid()).thenReturn(existingUuid); + when(existing.getExternalAccountId()).thenReturn("EXT-001"); + + PortfolioTradingAccount patched = Mockito.mock(PortfolioTradingAccount.class); + when(patched.getUuid()).thenReturn(existingUuid); + when(patched.getExternalAccountId()).thenReturn("EXT-001"); + + PortfolioTradingAccountRequest request = new PortfolioTradingAccountRequest() + .externalAccountId("EXT-001") + .accountId("ACC-001") + .portfolio(portfolioUuid) + .isDefault(false) + .isInternal(false); + + PaginatedPortfolioTradingAccountList accountList = new PaginatedPortfolioTradingAccountList() + .results(List.of(existing)); + + when(portfolioTradingAccountsApi.listPortfolioTradingAccounts( + eq(1), isNull(), isNull(), eq("EXT-001"), isNull(), isNull(), isNull())) + .thenReturn(Mono.just(accountList)); + + when(portfolioTradingAccountsApi.patchPortfolioTradingAccount( + eq(existingUuid.toString()), any())) + .thenReturn(Mono.just(patched)); + + StepVerifier.create(service.upsertPortfolioTradingAccount(request)) + .expectNextMatches(a -> a.getUuid().equals(existingUuid)) + .verifyComplete(); + } + + @Test + void upsertPortfolioTradingAccount_noExistingAccount_createsNew() { + UUID newUuid = UUID.randomUUID(); + UUID portfolioUuid = UUID.randomUUID(); + + PortfolioTradingAccount created = Mockito.mock(PortfolioTradingAccount.class); + when(created.getUuid()).thenReturn(newUuid); + when(created.getExternalAccountId()).thenReturn("EXT-002"); + + PortfolioTradingAccountRequest request = new PortfolioTradingAccountRequest() + .externalAccountId("EXT-002") + .accountId("ACC-002") + .portfolio(portfolioUuid) + .isDefault(false) + .isInternal(false); + + when(portfolioTradingAccountsApi.listPortfolioTradingAccounts( + eq(1), isNull(), isNull(), eq("EXT-002"), isNull(), isNull(), isNull())) + .thenReturn(Mono.just(new PaginatedPortfolioTradingAccountList().results(List.of()))); + + when(portfolioTradingAccountsApi.createPortfolioTradingAccount((request))) + .thenReturn(Mono.just(created)); + + StepVerifier.create(service.upsertPortfolioTradingAccount(request)) + .expectNextMatches(a -> a.getUuid().equals(newUuid)) + .verifyComplete(); + } + + @Test + void upsertPortfolioTradingAccount_patchFails_withWebClientException_fallsBackToExisting() { + UUID existingUuid = UUID.randomUUID(); + UUID portfolioUuid = UUID.randomUUID(); + + PortfolioTradingAccount existing = Mockito.mock(PortfolioTradingAccount.class); + when(existing.getUuid()).thenReturn(existingUuid); + when(existing.getExternalAccountId()).thenReturn("EXT-003"); + + PortfolioTradingAccountRequest request = new PortfolioTradingAccountRequest() + .externalAccountId("EXT-003") + .accountId("ACC-003") + .portfolio(portfolioUuid) + .isDefault(false) + .isInternal(false); + + PaginatedPortfolioTradingAccountList accountList = new PaginatedPortfolioTradingAccountList() + .results(List.of(existing)); + + when(portfolioTradingAccountsApi.listPortfolioTradingAccounts( + eq(1), isNull(), isNull(), eq("EXT-003"), isNull(), isNull(), isNull())) + .thenReturn(Mono.just(accountList)); + + when(portfolioTradingAccountsApi.patchPortfolioTradingAccount( + (existingUuid.toString()), (request))) + .thenReturn(Mono.error(WebClientResponseException.create( + HttpStatus.BAD_REQUEST.value(), "Bad Request", + HttpHeaders.EMPTY, null, StandardCharsets.UTF_8))); + + when(portfolioTradingAccountsApi.createPortfolioTradingAccount(any())) + .thenReturn(Mono.just(existing)); + + StepVerifier.create(service.upsertPortfolioTradingAccount(request)) + .expectNextMatches(a -> a.getUuid().equals(existingUuid)) + .verifyComplete(); + } + + @Test + void upsertPortfolioTradingAccount_patchFails_withNonWebClientException_propagatesError() { + UUID existingUuid = UUID.randomUUID(); + UUID portfolioUuid = UUID.randomUUID(); + + PortfolioTradingAccount existing = Mockito.mock(PortfolioTradingAccount.class); + when(existing.getUuid()).thenReturn(existingUuid); + when(existing.getExternalAccountId()).thenReturn("EXT-004"); + + PortfolioTradingAccountRequest request = new PortfolioTradingAccountRequest() + .externalAccountId("EXT-004") + .accountId("ACC-004") + .portfolio(portfolioUuid) + .isDefault(false) + .isInternal(false); + + PaginatedPortfolioTradingAccountList accountList = new PaginatedPortfolioTradingAccountList() + .results(List.of(existing)); + + when(portfolioTradingAccountsApi.listPortfolioTradingAccounts( + eq(1), isNull(), isNull(), eq("EXT-004"), isNull(), isNull(), isNull())) + .thenReturn(Mono.just(accountList)); + + when(portfolioTradingAccountsApi.patchPortfolioTradingAccount( + (existingUuid.toString()), (request))) + .thenReturn(Mono.error(new RuntimeException("Unexpected error"))); + + StepVerifier.create(service.upsertPortfolioTradingAccount(request)) + .expectErrorMatches(e -> e instanceof RuntimeException + && e.getMessage().equals("Unexpected error")) + .verify(); + } + + @Test + void upsertPortfolioTradingAccount_multipleExistingAccounts_returnsError() { + UUID portfolioUuid = UUID.randomUUID(); + + PortfolioTradingAccount acc1 = Mockito.mock(PortfolioTradingAccount.class); + when(acc1.getUuid()).thenReturn(UUID.randomUUID()); + PortfolioTradingAccount acc2 = Mockito.mock(PortfolioTradingAccount.class); + when(acc2.getUuid()).thenReturn(UUID.randomUUID()); + + PortfolioTradingAccountRequest request = new PortfolioTradingAccountRequest() + .externalAccountId("EXT-DUP") + .accountId("ACC-DUP") + .portfolio(portfolioUuid) + .isDefault(false) + .isInternal(false); + + when(portfolioTradingAccountsApi.listPortfolioTradingAccounts( + eq(1), isNull(), isNull(), eq("EXT-DUP"), isNull(), isNull(), isNull())) + .thenReturn(Mono.just(new PaginatedPortfolioTradingAccountList() + .results(List.of(acc1, acc2)))); + + // Defensive stub — prevents NPE if switchIfEmpty is accidentally reached + when(portfolioTradingAccountsApi.createPortfolioTradingAccount(any())) + .thenReturn(Mono.error(new IllegalStateException("should not be called"))); + + StepVerifier.create(service.upsertPortfolioTradingAccount(request)) + .expectErrorMatches(e -> e instanceof IllegalStateException + && e.getMessage().contains("Found 2 portfolio trading accounts")) + .verify(); + } + + // ----------------------------------------------------------------------- + // upsertPortfolioTradingAccounts — batch resilience + // ----------------------------------------------------------------------- + + @Test + void upsertPortfolioTradingAccounts_nullInput_returnsEmptyList() { + StepVerifier.create(service.upsertPortfolioTradingAccounts(null)) + .expectNext(List.of()) + .verifyComplete(); + } + + @Test + void upsertPortfolioTradingAccounts_emptyInput_returnsEmptyList() { + StepVerifier.create(service.upsertPortfolioTradingAccounts(List.of())) + .expectNext(List.of()) + .verifyComplete(); + } + + @Test + void upsertPortfolioTradingAccounts_singleFailure_doesNotStopBatch() { + UUID portfolioUuid1 = UUID.randomUUID(); + UUID portfolioUuid2 = UUID.randomUUID(); + String externalId1 = "PORTFOLIO-EXT-001"; + String externalId2 = "PORTFOLIO-EXT-002"; + + // Mock portfolio lookups — uses externalId, not uuid setter + mockPortfolioFound(externalId1, portfolioUuid1); + mockPortfolioFound(externalId2, portfolioUuid2); + + // Account 1: list returns empty → create fails + when(portfolioTradingAccountsApi.listPortfolioTradingAccounts( + eq(1), isNull(), isNull(), eq("EXT-FAIL"), isNull(), isNull(), isNull())) + .thenReturn(Mono.just(new PaginatedPortfolioTradingAccountList().results(List.of()))); + when(portfolioTradingAccountsApi.createPortfolioTradingAccount( + argThat(r -> r != null && "EXT-FAIL".equals(r.getExternalAccountId())))) + .thenReturn(Mono.error(new RuntimeException("Create failed"))); + + // Account 2: list returns empty → create succeeds + UUID createdUuid = UUID.randomUUID(); + PortfolioTradingAccount created = Mockito.mock(PortfolioTradingAccount.class); + when(created.getUuid()).thenReturn(createdUuid); + when(portfolioTradingAccountsApi.listPortfolioTradingAccounts( + eq(1), isNull(), isNull(), eq("EXT-OK"), isNull(), isNull(), isNull())) + .thenReturn(Mono.just(new PaginatedPortfolioTradingAccountList().results(List.of()))); + when(portfolioTradingAccountsApi.createPortfolioTradingAccount( + argThat(r -> r != null && "EXT-OK".equals(r.getExternalAccountId())))) + .thenReturn(Mono.just(created)); + + List input = List.of( + buildTradingAccountInput("EXT-FAIL", externalId1), + buildTradingAccountInput("EXT-OK", externalId2) + ); + + StepVerifier.create(service.upsertPortfolioTradingAccounts(input)) + .expectNextMatches(list -> list.size() == 1 && list.get(0).getUuid().equals(createdUuid)) + .verifyComplete(); + } + + @Test + void upsertPortfolioTradingAccounts_allFail_returnsEmptyList() { + String externalId = "PORTFOLIO-EXT-ALL-FAIL"; + UUID portfolioUuid = UUID.randomUUID(); + + mockPortfolioFound(externalId, portfolioUuid); + + when(portfolioTradingAccountsApi.listPortfolioTradingAccounts( + eq(1), isNull(), isNull(), eq("EXT-ALL-FAIL"), isNull(), isNull(), isNull())) + .thenReturn(Mono.error(new RuntimeException("List failed"))); + + List input = List.of( + buildTradingAccountInput("EXT-ALL-FAIL", externalId) + ); + + StepVerifier.create(service.upsertPortfolioTradingAccounts(input)) + .expectNextMatches(List::isEmpty) + .verifyComplete(); + } + + // ----------------------------------------------------------------------- + // createPortfolioTradingAccount — direct creation + // ----------------------------------------------------------------------- + + @Test + void createPortfolioTradingAccount_success_returnsCreatedAccount() { + UUID portfolioUuid = UUID.randomUUID(); + UUID newUuid = UUID.randomUUID(); + + PortfolioTradingAccount created = Mockito.mock(PortfolioTradingAccount.class); + when(created.getUuid()).thenReturn(newUuid); + when(created.getExternalAccountId()).thenReturn("EXT-NEW"); + + PortfolioTradingAccountRequest request = new PortfolioTradingAccountRequest() + .externalAccountId("EXT-NEW") + .accountId("ACC-NEW") + .portfolio(portfolioUuid) + .isDefault(false) + .isInternal(false); + + when(portfolioTradingAccountsApi.createPortfolioTradingAccount((request))) + .thenReturn(Mono.just(created)); + + StepVerifier.create(service.createPortfolioTradingAccount(request)) + .expectNextMatches(a -> a.getUuid().equals(newUuid)) + .verifyComplete(); + } + + @Test + void createPortfolioTradingAccount_apiFails_propagatesError() { + UUID portfolioUuid = UUID.randomUUID(); + + PortfolioTradingAccountRequest request = new PortfolioTradingAccountRequest() + .externalAccountId("EXT-ERR") + .accountId("ACC-ERR") + .portfolio(portfolioUuid) + .isDefault(false) + .isInternal(false); + + when(portfolioTradingAccountsApi.createPortfolioTradingAccount((request))) + .thenReturn(Mono.error(new RuntimeException("Creation failed"))); + + StepVerifier.create(service.createPortfolioTradingAccount(request)) + .expectErrorMatches(e -> e instanceof RuntimeException + && e.getMessage().equals("Creation failed")) + .verify(); + } + + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private PortfolioList buildPortfolioList(UUID portfolioUuid, String externalId, OffsetDateTime activated) { + PortfolioList portfolio = Mockito.mock(PortfolioList.class); + when(portfolio.getUuid()).thenReturn(portfolioUuid); + when(portfolio.getExternalId()).thenReturn(externalId); + when(portfolio.getActivated()).thenReturn(activated); + when(portfolio.getStatus()).thenReturn(StatusA3dEnum.ACTIVE); + return portfolio; + } + + private void mockPortfolioFound(String externalId, UUID portfolioUuid) { + PortfolioList portfolioList = buildPortfolioList(portfolioUuid, externalId, OffsetDateTime.now().minusMonths(6)); + PaginatedPortfolioListList paginatedList = Mockito.mock(PaginatedPortfolioListList.class); + when(paginatedList.getResults()).thenReturn(List.of(portfolioList)); + + when(portfolioApi.listPortfolios(isNull(), isNull(), isNull(), + isNull(), eq(externalId), isNull(), isNull(), eq(1), + isNull(), isNull(), isNull(), isNull())) + .thenReturn(Mono.just(paginatedList)); + } + + private InvestmentPortfolioTradingAccount buildTradingAccountInput(String externalAccountId, + String portfolioExternalId) { + return InvestmentPortfolioTradingAccount.builder() + .accountExternalId(externalAccountId) + .portfolioExternalId(portfolioExternalId) + .accountId("ACC-" + externalAccountId) + .isDefault(false) + .isInternal(false) + .build(); + } +} \ No newline at end of file From 87e3010e63796536c3a352c340cef3aa83256879 Mon Sep 17 00:00:00 2001 From: pawana_backbase Date: Tue, 24 Feb 2026 19:27:32 +0530 Subject: [PATCH 4/7] Changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b2066b02f..55aadee9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All notable changes to this project will be documented in this file. +## [9.7.0] +### Added + - Implement Portfolio Trading Account Creation; + ## [9.6.0] ### Added - Implement for Document ingestion; From 367727c6798b277043a4024a4451359ec91364d2 Mon Sep 17 00:00:00 2001 From: Roman Kniazevych Date: Wed, 25 Feb 2026 09:50:49 +0200 Subject: [PATCH 5/7] fix Asset patch request to set new values --- .../investment/service/InvestmentAssetUniverseService.java | 2 +- .../resttemplate/InvestmentRestAssetUniverseService.java | 6 +++--- .../service/InvestmentAssetUniverseServiceTest.java | 4 +++- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java index 989bcb76a..ea8288b22 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java @@ -104,7 +104,7 @@ public Mono getOrCreateAsset(com.backbase. // If asset exists, log and return it .flatMap(a -> { log.info("Asset already exists with Asset Identifier : {}", assetIdentifier); - return investmentRestAssetUniverseService.patchAsset(a, asset).thenReturn(a); + return investmentRestAssetUniverseService.patchAsset(a, asset, categoryIdByCode).thenReturn(a); }) .map(assetMapper::map) // If Mono is empty (asset not found), create the asset diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java index fafadc3e3..b0f332c5f 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java @@ -64,14 +64,14 @@ public Mono createAsset(com.backbase.strea } public Mono patchAsset(Asset existAsset, - com.backbase.stream.investment.Asset asset) { + com.backbase.stream.investment.Asset asset, Map categoryIdByCode) { String assetUuid = existAsset.getUuid().toString(); log.info( "Starting asset update: assetUuid={}, assetName='{}', logoFile='{}'", assetUuid, asset.getName(), asset.getLogo()); - - return Mono.defer(() -> Mono.just(patchAsset(assetUuid, null, asset.getLogoFile()))) + OASAssetRequestDataRequest assetRequestDataRequest = assetMapper.mapAsset(asset, categoryIdByCode); + return Mono.defer(() -> Mono.just(patchAsset(assetUuid, assetRequestDataRequest, asset.getLogoFile()))) .map(patchedAsset -> { log.info( "Logo attached successfully to asset:assetUuid={}, assetName='{}', logoFile='{}'", assetUuid, diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java index ca16528a1..5733244d3 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java @@ -6,6 +6,7 @@ import com.backbase.investment.api.service.v1.model.MarketRequest; import com.backbase.stream.investment.service.resttemplate.InvestmentRestAssetUniverseService; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -99,7 +100,8 @@ void getOrCreateAsset_assetExists() { ArgumentMatchers.any(), ArgumentMatchers.any())) .thenReturn(Mono.just(existingAsset)); - Mockito.when(investmentRestAssetUniverseService.patchAsset(ArgumentMatchers.any(), ArgumentMatchers.any())) + Mockito.when(investmentRestAssetUniverseService.patchAsset(ArgumentMatchers.any(), ArgumentMatchers.any(), + HashMap.newHashMap(1))) .thenReturn(Mono.just(asset)); Mockito.when(investmentRestAssetUniverseService.createAsset( ArgumentMatchers.any(), From cd55f1fe1232180bab9258dc9fa9db671f34a8f2 Mon Sep 17 00:00:00 2001 From: Roman Kniazevych Date: Wed, 25 Feb 2026 14:49:04 +0200 Subject: [PATCH 6/7] fix execution flow --- .../backbase/stream/investment/saga/InvestmentSaga.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java index 8dbd1dc29..4274a1b0c 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java @@ -1,5 +1,6 @@ package com.backbase.stream.investment.saga; +import com.backbase.investment.api.service.v1.model.PortfolioList; import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.InvestmentTask; @@ -83,9 +84,9 @@ public Mono executeTask(InvestmentTask streamTask) { .flatMap(this::upsertClients) .flatMap(this::upsertInvestmentProducts) .flatMap(this::upsertInvestmentPortfolios) + .flatMap(this::upsertPortfolioTradingAccounts) .flatMap(this::upsertInvestmentPortfolioDeposits) .flatMap(this::upsertPortfoliosAllocations) - .flatMap(this::upsertPortfolioTradingAccounts) .doOnSuccess(completedTask -> log.info( "Successfully completed investment saga: taskId={}, taskName={}, state={}", completedTask.getId(), completedTask.getName(), completedTask.getState())) @@ -137,6 +138,11 @@ private Mono upsertPortfoliosAllocations(InvestmentTask investme data.getPortfolioProducts(), investmentTask.getData().getInvestmentAssetData())) .collectList() + .onErrorResume(throwable -> { + log.error("Allocation generation failed: for portfolios:{} taskIds={}", + data.getPortfolios().stream().map(PortfolioList::getUuid).toList(), investmentTask.getId(), throwable); + return Mono.empty(); + }) .map(o -> investmentTask) ); } From a3595dcf45d875578ac25c0813fa582c40c4cef4 Mon Sep 17 00:00:00 2001 From: Roman Kniazevych Date: Wed, 25 Feb 2026 15:00:52 +0200 Subject: [PATCH 7/7] fix execution flow --- .../stream/investment/saga/InvestmentSaga.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java index 4274a1b0c..11d9469c1 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java @@ -138,10 +138,13 @@ private Mono upsertPortfoliosAllocations(InvestmentTask investme data.getPortfolioProducts(), investmentTask.getData().getInvestmentAssetData())) .collectList() - .onErrorResume(throwable -> { - log.error("Allocation generation failed: for portfolios:{} taskIds={}", - data.getPortfolios().stream().map(PortfolioList::getUuid).toList(), investmentTask.getId(), throwable); - return Mono.empty(); + .doOnError(throwable -> { + log.error("Allocation generation failed for portfolios:{} taskId={}", + data.getPortfolios().stream().map(PortfolioList::getUuid).toList(), investmentTask.getId(), + throwable); + investmentTask.error(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, RESULT_FAILED, + investmentTask.getName(), investmentTask.getId(), + "Failed to upsert investment portfolio trading accounts: " + throwable.getMessage()); }) .map(o -> investmentTask) ); @@ -273,7 +276,8 @@ private Mono upsertInvestmentProducts(InvestmentTask investmentT } private Mono upsertPortfolioTradingAccounts(InvestmentTask investmentTask) { - List investmentPortfolioTradingAccounts = investmentTask.getData().getInvestmentPortfolioTradingAccounts(); + List investmentPortfolioTradingAccounts = investmentTask.getData() + .getInvestmentPortfolioTradingAccounts(); int accountsCount = investmentPortfolioTradingAccounts.size(); log.info("Starting investment portfolio trading accounts upsert: taskId={}, arrangementCount={}",