From 851e663961279509e653e0f40d0a7e5a278dcaac Mon Sep 17 00:00:00 2001 From: Daniel M Date: Thu, 25 Sep 2025 09:24:52 -0400 Subject: [PATCH 1/7] add dynamodb client to pom.xml --- Server/pom.xml | 151 ++++++----- Server/src/config/config.xml | 9 +- .../msgtracking/DynamoDbTrackingModule.java | 50 ++++ pom.xml | 247 ++++++++++-------- 4 files changed, 268 insertions(+), 189 deletions(-) create mode 100644 Server/src/main/java/org/openas2/processor/msgtracking/DynamoDbTrackingModule.java diff --git a/Server/pom.xml b/Server/pom.xml index 86febd97..c2d099f1 100644 --- a/Server/pom.xml +++ b/Server/pom.xml @@ -43,13 +43,13 @@ org.apache.maven.plugins maven-dependency-plugin - - - - properties - - - + + + + properties + + + org.apache.maven.plugins @@ -79,10 +79,10 @@ - - - - + + + + @@ -99,16 +99,16 @@ package - - - - - - + + + + + @@ -120,8 +120,8 @@ + file="${project.build.directory}/${project.build.finalName}.jar" + todir="${package.assembly.dir}/lib" verbose="true"/> @@ -138,14 +138,14 @@ - + - - + destfile="${project.basedir}/dist/${project.dist.package.name}" + update="true"> + + @@ -160,18 +160,18 @@ org.apache.maven.plugins maven-jar-plugin - - - - - - true - - - org.openas2.app.OpenAS2Server - - - + + + + + + true + + + org.openas2.app.OpenAS2Server + + + org.apache.maven.plugins @@ -188,6 +188,15 @@ + + com.amazonaws + aws-java-sdk-dynamodb + + + com.amazonaws + DynamoDBLocal + test + org.dom4j dom4j @@ -212,26 +221,26 @@ commons-cli commons-cli - - org.slf4j - slf4j-api - - - ch.qos.logback - logback-classic - + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + com.sun.mail jakarta.mail - + com.h2database h2 - - com.zaxxer - HikariCP - + + com.zaxxer + HikariCP + org.bouncycastle bcpg-jdk18on @@ -258,31 +267,31 @@ jackson-databind jar - - com.fasterxml.jackson.module - jackson-module-jaxb-annotations - + + com.fasterxml.jackson.module + jackson-module-jaxb-annotations + org.glassfish.jersey.media jersey-media-json-jackson jar - - org.glassfish.jersey.inject - jersey-hk2 + + org.glassfish.jersey.inject + jersey-hk2 jakarta.xml.bind jakarta.xml.bind-api - - jakarta.ws.rs - jakarta.ws.rs-api - - - jakarta.annotation - jakarta.annotation-api + + jakarta.ws.rs + jakarta.ws.rs-api + + + jakarta.annotation + jakarta.annotation-api org.junit.jupiter @@ -294,15 +303,15 @@ hamcrest test - - org.mockito - mockito-junit-jupiter - test - + + org.mockito + mockito-junit-jupiter + test + org.mockito mockito-core test - + diff --git a/Server/src/config/config.xml b/Server/src/config/config.xml index a242e8d6..9070b923 100644 --- a/Server/src/config/config.xml +++ b/Server/src/config/config.xml @@ -26,7 +26,8 @@ module.AS2SenderModule.enabled="true" module.AS2SenderModule.readtimeout="60000" module.MDNSenderModule.enabled="true" - module.DbTrackingModule.enabled="true" + module.DbTrackingModule.enabled="false" + module.DynamoDbTrackingModule.enabled="true" module.MDNFileModule.enabled="true" module.MDNFileModule.filename="$properties.storageBaseDir$/$mdn.msg.sender.as2_id$-$mdn.msg.receiver.as2_id$/mdn/$date.yyyy-MM-dd$/$mdn.msg.headers.message-id$" module.MDNFileModule.tempdir="$properties.storageBaseDir$/temp" @@ -61,6 +62,7 @@ email.smtppwd="mySmtpPwd" email.subject="$exception.name$: $exception.message$" email.bodytemplate="%home%/emailtemplate.txt" + msg_tracking.use_dynamo_db="true" msg_tracking.use_embedded_db="true" msg_tracking.force_load_jdbc_driver="false" msg_tracking.db_user="sa" @@ -109,7 +111,7 @@ ssl_protocol="TLS" ssl_keystore="$properties.ssl_keystore$" ssl_keystore_password="$properties.ssl_keystore_password$" - userid="$properties.restapi.command.processor.userid$" + userid="$properties.restapi.command.processor.userid$" password="$properties.restapi.command.processor.password$" /> + failures) { + return false; + } + + @Override + protected void persist(Message msg, Map map) { + + } +} diff --git a/pom.xml b/pom.xml index 85cc5298..8c60b327 100644 --- a/pom.xml +++ b/pom.xml @@ -34,10 +34,23 @@ 11 + 1.12.791 + + com.amazonaws + aws-java-sdk-bom + ${aws.java.sdk.version} + pom + import + + + com.amazonaws + DynamoDBLocal + [1.11,2.0) + org.osgi org.osgi.core @@ -107,105 +120,105 @@ org.junit.jupiter - junit-jupiter - 5.13.4 + junit-jupiter + 5.13.4 test org.mockito mockito-core - - 5.18.0 + + 5.18.0 + test + + + org.mockito + mockito-junit-jupiter + + 5.18.0 + test + + + + org.hamcrest + hamcrest + 3.0 test - - org.mockito - mockito-junit-jupiter - - 5.18.0 - test - - - - org.hamcrest - hamcrest - 3.0 - test - commons-io commons-io 2.20.0 - - org.slf4j - slf4j-api - 2.0.17 - - - ch.qos.logback - logback-classic - 1.5.18 - - - jakarta.ws.rs - jakarta.ws.rs-api - 3.1.0 - - - jakarta.annotation - jakarta.annotation-api - 3.0.0 - - - org.glassfish.jersey.containers - jersey-container-grizzly2-http - 3.1.10 - jar - - - com.fasterxml.jackson.core - jackson-databind - 2.19.2 - jar - - - com.fasterxml.jackson.module - jackson-module-jaxb-annotations - 2.19.2 - - - org.glassfish.jersey.media - jersey-media-json-jackson - 3.1.10 - jar - - - org.glassfish.jersey.inject - jersey-hk2 - 3.1.10 - + + org.slf4j + slf4j-api + 2.0.17 + + + ch.qos.logback + logback-classic + 1.5.18 + + + jakarta.ws.rs + jakarta.ws.rs-api + 3.1.0 + + + jakarta.annotation + jakarta.annotation-api + 3.0.0 + + + org.glassfish.jersey.containers + jersey-container-grizzly2-http + 3.1.10 + jar + + + com.fasterxml.jackson.core + jackson-databind + 2.19.2 + jar + + + com.fasterxml.jackson.module + jackson-module-jaxb-annotations + 2.19.2 + + + org.glassfish.jersey.media + jersey-media-json-jackson + 3.1.10 + jar + + + org.glassfish.jersey.inject + jersey-hk2 + 3.1.10 + jakarta.xml.bind jakarta.xml.bind-api 4.0.2 - com.sun.xml.bind - jaxb-core - 4.0.5 - - - com.sun.xml.bind - jaxb-impl - 4.0.5 - - - com.zaxxer - HikariCP - 7.0.0 - + com.sun.xml.bind + jaxb-core + 4.0.5 + + + com.sun.xml.bind + jaxb-impl + 4.0.5 + + + com.zaxxer + HikariCP + 7.0.0 + @@ -284,15 +297,15 @@ - - org.codehaus.mojo - versions-maven-plugin - 2.16.2 - + + org.codehaus.mojo + versions-maven-plugin + 2.16.2 + false file://${session.executionRootDirectory}/version-rules.xml - - + + org.apache.maven.plugins maven-antrun-plugin @@ -406,26 +419,28 @@ - - - com.github.spotbugs - spotbugs-maven-plugin - 4.8.5.0 - - Max - medium - true - ${session.executionRootDirectory}/spotbugs-security-include.xml - ${session.executionRootDirectory}/spotbugs-security-exclude.xml - - - com.h3xstream.findsecbugs - findsecbugs-plugin - 1.13.0 - - - - + + + com.github.spotbugs + spotbugs-maven-plugin + 4.8.5.0 + + Max + medium + true + ${session.executionRootDirectory}/spotbugs-security-include.xml + + ${session.executionRootDirectory}/spotbugs-security-exclude.xml + + + + com.h3xstream.findsecbugs + findsecbugs-plugin + 1.13.0 + + + + @@ -450,11 +465,11 @@ org.apache.maven.plugins maven-javadoc-plugin 3.6.3 - - true - true - false - + + true + true + false + attach-javadocs @@ -469,10 +484,10 @@ maven-gpg-plugin 3.2.4 - - --pinentry-mode - loopback - + + --pinentry-mode + loopback + From 686b1ab63315b7e2e11a2f01b563616da6d6f1a5 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Thu, 25 Sep 2025 09:26:44 -0400 Subject: [PATCH 2/7] add dynamodb client to pom.xml --- Server/src/config/config.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Server/src/config/config.xml b/Server/src/config/config.xml index 9070b923..4477d461 100644 --- a/Server/src/config/config.xml +++ b/Server/src/config/config.xml @@ -26,8 +26,8 @@ module.AS2SenderModule.enabled="true" module.AS2SenderModule.readtimeout="60000" module.MDNSenderModule.enabled="true" - module.DbTrackingModule.enabled="false" - module.DynamoDbTrackingModule.enabled="true" + module.DbTrackingModule.enabled="true" + module.DynamoDbTrackingModule.enabled="false" module.MDNFileModule.enabled="true" module.MDNFileModule.filename="$properties.storageBaseDir$/$mdn.msg.sender.as2_id$-$mdn.msg.receiver.as2_id$/mdn/$date.yyyy-MM-dd$/$mdn.msg.headers.message-id$" module.MDNFileModule.tempdir="$properties.storageBaseDir$/temp" From cd8bb3d278a67c2890636817edd3227bfd0901e1 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Thu, 25 Sep 2025 09:27:00 -0400 Subject: [PATCH 3/7] add dynamodb client to pom.xml --- Server/src/config/config.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/Server/src/config/config.xml b/Server/src/config/config.xml index 4477d461..71b4f989 100644 --- a/Server/src/config/config.xml +++ b/Server/src/config/config.xml @@ -62,7 +62,6 @@ email.smtppwd="mySmtpPwd" email.subject="$exception.name$: $exception.message$" email.bodytemplate="%home%/emailtemplate.txt" - msg_tracking.use_dynamo_db="true" msg_tracking.use_embedded_db="true" msg_tracking.force_load_jdbc_driver="false" msg_tracking.db_user="sa" From fab6fd9f51300d27c13729b520daa70b33219d2b Mon Sep 17 00:00:00 2001 From: Daniel M Date: Sat, 8 Nov 2025 18:07:16 +1200 Subject: [PATCH 4/7] add dynamodb client to pom.xml --- Server/pom.xml | 14 +- Server/src/config/as2_certs.p12 | Bin 7668 -> 7668 bytes Server/src/config/config.xml | 11 +- .../msgtracking/DynamoDBHandler.java | 210 ++++++++ .../msgtracking/DynamoDBTrackingModule.java | 487 ++++++++++++++++++ .../msgtracking/DynamoDbTrackingModule.java | 50 -- .../msgtracking/MessageMetadata.java | 239 +++++++++ pom.xml | 31 +- 8 files changed, 969 insertions(+), 73 deletions(-) create mode 100644 Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBHandler.java create mode 100644 Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBTrackingModule.java delete mode 100644 Server/src/main/java/org/openas2/processor/msgtracking/DynamoDbTrackingModule.java create mode 100644 Server/src/main/java/org/openas2/processor/msgtracking/MessageMetadata.java diff --git a/Server/pom.xml b/Server/pom.xml index c2d099f1..dcce6ac8 100644 --- a/Server/pom.xml +++ b/Server/pom.xml @@ -188,15 +188,15 @@ - - com.amazonaws - aws-java-sdk-dynamodb + + software.amazon.awssdk + dynamodb - com.amazonaws - DynamoDBLocal - test - + software.amazon.awssdk + dynamodb-enhanced + + org.dom4j dom4j diff --git a/Server/src/config/as2_certs.p12 b/Server/src/config/as2_certs.p12 index 47d0232a47a7bde32c6ffcacbafb9274df9980dd..42e850f6584444d26f0fbb9464c8149b4c167c7d 100644 GIT binary patch delta 7176 zcmV+j9QWh&JM=q{b`(oO1}t+Jw73ZI=gTeTc-`l}Y2A};5hQ;F_1Gh_JOA+~4dF3S z;Vq~Hf&|do_h~Ty4fPJ1B1A7*gLy>6qgWK+#n zArDfm0qxmjAz@e#PXPq6Lm6(7QEc$tJJXYeI1>9gpKpI&OSid4!qHwys$&xaiz)2F zvHs9|>L|h6L~-9odD(ZIqU-B#qV}B94~p(9OUOx1tlFQHQ^%*xN`*gHB>a}3&pazM`)xp`bmXjQX&p?DjOp566M#}~Wh(25C zkdlS{)+>J@whkX>iE|NzXdBC-UFNBqPu?b+Oj3r=HhXf-jVIP(W-d zK+!=_R&(?8J)lw^8PZCXkO4e-UakVI`9)evT8e*U$k3v=>SS)tc}4_TIhCNWIRJWy zCAO0+`zx{e+O+p=hv?#!94AknJ(~uZ%?4iJU(o{u*GF(A_{~x57*L@C4ntKOF)0%Y z5Dg@18hzLZjmC`~-&4QLPuAH&ZR|ui#nuKDex_I%0=;zNi|#DgX+Jdgi+?g6SW z?rq{55o2^@d@%8{gt9BrI;yfP3POjq^52Y}Qov1Q942N8ua!FVwV#oOrpN28mTHP$ zypgy=+K?(s17)nB;h66TM;y;bIaL-ge6xQ`37nAfJDefjq=9j*;vTKWc!MbVmtnC{0DynC z9;sO%KVmbe5&~R1RA_$XsH=F)_tt7MRHR3U5{C?3m1We3sh+@YqLy0^q?)f!O1w~G zIq`@51%<;hUai=30=8YC>Gzv2^d_)VEGFOJ>$bm^v_%Osfq{wV$Gm)UBHS77k9Gkz zdsBR%O|X!bLIDY1oeUwB`SHfT(D{EK*iBH8ubvi^U~;WM+<8l+e^eQlIRHxB;@H^n z9%#U_>uS<^1XA`(M=SOF_g_rytM%col2Y{q@RE7V0kD4~A?Yzo5-Ll^|I`U<7KANl(NLgIF5ORz)D)3VgWCWou4WT`KK zF-9;U1_>&LNQUV{wbKMVAdFdVN`5b#*b1P^K` zehc;ZOmYmBKi{?YBc+>Aole{=hNg)8Xj?U1Aqc7TWh$Qv2n>IyzyWHbW*Ptqif3z} ztwCnl6hN5NJzeDN`f`{t6@!DLlx|e}d9`IUB3brv0|-xRHd#<82&Urm-k2Ez$QU+k zT%8HebAyFOEtLRV((Ve~PJpffe3^h)z6y4!O3a+~#&ncRM8d-;3T3q7#8#vHwXTpA zf&|W&iyuVJ$@G6ww8u@KdB?sZ1-7%$BV!vajr&PRNIJ_6vRvH`d&sP75{FtiJ_ZQe zB9CVK1*SX)Nn)}D3(L=?ghfGJXL_i1Keee#D}=F>k7E?Px&2p3GRJWw;1^TIt-yW~ z(5){&sJv78T?f4-YBu&28`r2>M?h)7KaZ;BL&r}w+U$P_Ijw4#h67odrC#`@1Put< z3P`d5`{#}?XG2`bl%6z$B0uF9eph}w-D9Y)$ zv@*S$y-a^jVF>KATuvKAHgUV94@K}>bs?xM%j3ALl`Q88ydHTIp;BH5Spy+>Kb>L2 z_a??%N)13`vZr4^u6X2@;&)4O8Hep1+xCF0yi;{j1b4!Fh{#3vir@e{QJ#FO;tKp= ztorD^i8o*U7j4j-7Q!B^0yQ79njp5wqBN61eHef5(vKt!5gdogLDF(q_l(VjzMXY@ z!89$yEH58%>V})?T2IT19kaO!IjNaL%3gJdFN(1E$@#GTEe$JZQa8Yw3pn}AFS_yg zrD%&~2!fT`dT*ss4azMWbZ8WQ9?*9u52kjyq*y+RiI3;KSN|}H6B>t9=CWYie-yN$ zsaJnXy#0O`!=pT>41ktUf{?Ddma4X)I*$xMqw+uObRtwIU|QiAAAb1p)R~0FKVkU> zi*HDTt}Haq6&}1&if`q_L8XZ|e-}MYrD@&E2?q)xlwM>cZJS1(4K9hVH27V}2GYO0{CXyEp49d^9RiuALrdadH>uI}l;2XkDXquy>-H=?_47?;W zp!IK2@RT~zWNltdj}wjL!iD+OKutI@>{9V;gYs;BZN3P$*xb`2$?oSbLl#MN!us;c zc5}WM1bVn?yBMw8sa)e3D6%QK(cIye`49P^S<+b2&WiJoLcwF&(344G5z$P`>C%5A zLota5iyiYq(q+4XoegZ&8wo)hsr_E&i^?+q+jCXS^5{Z%j#k&s^xO=Gg{-dzVWeW>PJe2iJ9s<2iBvq zOLrTNJWi3PzYQ?qV+%*ZMa0dv2?lLx=&Ihlcu2R`VMp}m_XJaR9rvd_m-g;_Cd6%! zq43vIGF9eDWK~RCdF|}*+EQ6oD9h=ugz89fojIT}vHGc9=Z^&u9Qg9>PIHBwbV_c! zn7U|zIU06A)MEfad;c+(1f6{L8v#e@P+ebUlP?Q94>mG2I59XiFfcJSFoFe_kwG;S zxps7H=qOzP(H3Mc=rY)?JE^AklWh?se+4{Jpj%L46Fpi@RB zMKAi#N?-_;d*PBbA()6dgF8E5oYAXho|pGi62oNm z5>#@P@-yhmMwYx6crwSYpU%s%p)6s=?*1Q&O+F z`t3XpES|*OXa-bAVtZ_Re<-XZ?g|9h?RT)MH=x}28(X|cq9#DtYIthjk*G?khPGLx zLUGg=Gsks9Z({*NiUpsWe4`%j=c+^rPDP;n!laJUkV_X-!E@DcOB*q&^$?9rj_(ge z(nkbZDq!Iv6ji?cgtAo{$$zN{u}V#vV|V)1I4Y_D0TS>bb6ooFf6G(g-t!Z=-9_q= zDP=IItv+4sYlz6kBVkfUhv3O6UVsO?3c8`CA?G7{}n>Q>E3zZb2-}pTc>vJ zIqi4cGN*h^t(?{Ak6U?>CIF$WRzA>(uvdoMoOM>S8^OY~d0*D&ZHnLDLH=n)5>9sR zthIC&>GtR)jP_^m^`=N@#s`z+P^Ysa-0zXEsS3)B=j(3)e^b8qS`hlz<}_<}uo9K4 zZ;sYXhwV!L;LSO0edhL~6=9=A;Al)Iq{b(UMF&GnBCPMN5-KAhBt3Y0S890Sd*bhw zanTwqg9H}w5TEr`a@1?%XL0CFenc=oDpwxOZreoQg>HSfPclcbG|%7m9sf|@$dR~T zZFKF1d{J1qe}8=*QQS^x4d*5Ky1*DLI%<;Iq7$45M{P` zK`EL3P#?gI`BEIX_Zcs1j-$^HIqB&T@JIIQjs&=bPNX}@)z%c`O~ZZLAIf*8lo<> zL66>y+xl_H8Iy0Vavxd!ChM4Win1b+78h)(RP_j?WW#mrw4QG%wZ9$b-O))mkMShW zA6Z=ZKz8&9`lGNwfTE(ZBcOeW`0SM@6(yTMA^Dm=DXsP$P&kt>3nG*35E%9Qbn7|61oA_<7%p?@kp zj&w3aIo)U3UU7Iv@mV|}WKKgT3?YhbiM;%J4xcb#cM?i;>i}|U{t9LiYS=jCh}l;j zhNeddC%3VG4!n^POVPw_y|QuICZ$az>EYBuNG$?0dj?<`k(D-F$Mam})H6e*wCqh` z9*}4L+4#`d=j;q+umG774CrZ1E3D-%v1{s{92TV@^9ZvoHpM`S{+If5+UtQ)SKY*l zt#BsAY5zU4BVq6rW$r?}#O>I@cvP046L`fHs$IU4u2#>{+} zcoZkuIs9P;5PaYPyVxhBJbJhu&X8e70sIMB4F|Oqj~(b(r+ATp*rC6sI{<7sO288U zNX>$3at-tC*FV5z&|{yirOZ$l>nQm&TJ+bVh9?bs3g(V65~wdtwNeGiy#H;pgg$sD zL&M;ITdkY%-4z{^1l$I_i4HdDNbJS%(-=XCDuKhgbV&zX%kSLDZ2S3_VRnIe^0g&F zF03%E-#vWPx0}y@XIUlxJhDZE@Qxru;b_KjWKwSqXwe@NKMXwgnmzjIEBc9utyc>R zJcV1}YI}W#Z+DGmFg|bHc4N=)8Kw+^T|Rh!z}d(XIFTwHKX3Tcw!D5cTa$?1x?GBK z8=XWU+Q?O#?T`#EeQ5N+Apju^yNL%Si+W8CG(Cjj{<>B8Y&2CRUn+d1cPRNS zol)0LJ*=B$B2Q1m_HMDe?J1(NOfinPEsR~=U;;M=j+8BEY5XzvJ}ZXwiK*RH8;9`R%o8jYafyZW_)N52S!XW5}FbHDnCBuqYxV73&WITW>9wdBf(Jkap2zjw^3O7wJ#}J%# zVrs$}W{O%v?h2~c0lhpnYoBs|+*jB0_PfC zWrGwraUH&OlQ-HZUc>EpLImF?@I8rhz7Ny?NsSvRQ%C8t7wfYNp|qSmIN63j(2 zvoG$~kA}L^o$uKWv<{%cZuu#iB5bRd0ev&1OgPSDvfje;aL)_tQDh5j<7pL>2``VF z*g3blVzfBMEo$+UVftf#Ot?ndin6T^c{lxyC?o7sGyWrE^PE*P>BIR<$&&bVtLfP+ zy7t96fxt`b$KJgQr++!DD_Xm=^Gvmcm@gkq__t$2t#pD)wELUtIO4y>)eHID#Xp0S zY2>1^H&&yC(tM23VoU5+lr#CgP`}smhZot9na%Q|@~6w{qP)X@6ePk4yA&Fa9EGAR zM#REtQkx5;vbowvO=M|5!)QnZnaOo?JRkn->6%_WdnkT#C}$04RzO`)*c;Iw0yUWVuwh%yTXvO%E$TV;F)9Dth`Ae1Gq^?$E`IvVt3}y>9ql{!%u7(vDy(OCQKUTi>t2DTSlK z&aJSm8K?%5`34;;4#{& zyGI?)_Sj;7iFUClGcGTgzV_m!Etv99uXV&E;U!=a?iHMIxemF7nqKz zbh}yXb_#RNjn-?qQ}BXFFY#AgZcxeWe^;8YnrjPhAdT@9TH`-{yc;^fXyhbwyr{`j zDK!w$V-cMDW%Pp$%v;5hG=r49p|g8FXT_U;1uDI*vjjAMs{EPa{A_+3^)I2|&*va+ zN$&DU2V2|mOrr~+$1pt)@;05gvVuy7&m-ZtXW6AkmT>a9A*l8avkuv2=)J~l;~uNX zJ9=?j(9kTxL17&9jxt}>@bQ|KC5+V@$c&1Cb2-kmP0vQ;(u{YExzEH&o2PqThKeEXb`f2Eb?h8r3zC#96NBpHR6?pPy2R7l~bM09IsRu(V&S zx6%*aiTqG?fIoLiw-*u16WZQCQT8(MJoD)^5M0$EVA`9VPOs^%Ub+*kM{OubwyyQ6 zvokA|DMSQh$g=fM-6(3CEI^<+>Xhhzt@+EkIJjU(tO;-cA_Wxh-B-P1r{JVMEzKL; z?niM$y*?~DJvfLueYz8Co@;(V#Y4m95B&$Po$4F1 zB=q)EVeI@Yr2Bf*Ou%pkj2z`=!2cOuD5HV0#5>oyuoc@RT>K1f(PA8d%#VC_ zcVM=L+IL_)ZH{(pZCeCctwzdR8t6Ls-d_&CQX5rI z9Zk_D2hOYZ2frcLp&Ojpf>uWf!Td%IV})`I_zmalA@v3viBSIfg~?~E>W*jiMI01b zT42?vlvIijpqk09{pyjqZ|In=vw`Ekx6R*+PB|p*65Rn-G{G7A$OtV#^2f(rD!%{A z?V`5+WiU)p<4{}0wf_o#Sv7nsr5!A;KAku|P~KcFE3-70$m8<3bO10rwLpQDT@hv9 zKFtzbX;Fr2;nXOv?~IH;vj|1#5gsg&Nt;(2_*eLSZ&-prj2y0KAf1s8JPB)g8r&wo zMSToclkz<~=wlc5 zzsq(6f&|b2uwF*S?oyn9QxUcIqL(nal{*~Jn6Od7@_pG|T>Q7{av*TmxH%j-Rf~T( zy6-C84p|VNb5cn^k-w^|;^=I~GZyNiIJG2Gv0Xgn&b!}sJJx4qW{APb63j(mk^HWc z#pU*o1UNoUh(LeZzgO_9U<>ElpDxb(lG-?XD zJ)9LH=d_wCvUfAWC-R_0B?C@?XVre?qgoOnv1sEQebXPUFrZXId*So`edTjbM^i#5 zII)Nafq7ncQzokhbR-8oi0AE2?YXL*NU4D(zGl7k{vChJr=Ud;`4(iSG#x%x!pgTg zIvjzK|L~)e_=M5%-L0zN%PwI zw7ZNLi;FtNyJ!I@__H9}p-PSJ`@s+ia<)DUBNs}!(HWb2e&{?GEMF=h<F+@ z_T12(;cFMj&q8csQa%6k0Qo4p`2$C}-FTWi{kVSv4v7V+!?XHrfBpPnUMrgZX+zI> z=DO*mZo_URI%e1D z_0-07%WEVJghBrVj!%z7Hf7NFBpAqAgdVgnDD@Yw6OBYpqK(7`+}$T9@P>#Qvt=?S zH5z{q5L7jw!!M*HOhmHG!bQm=82TZ{(pdjEDpzjx+{v&eF-S25I@ph$y&o9k(G9~< z7fzdgIBEoImmS^?Vb{WGAPKfI7t1^8KjQHp2|8BLz6w;UYF=Ay?7E9f?V269#wjUl z$IBR#b+?$NZZ`nbBR;;-o$1!X7e1#?vPpmcMeU_W-YG|{EIK2yxg|-i5w7NiBcw zf8wG|Hz;}XSJNLA@?iFqHW|oMhSvsYQ-MJ9zIpX^jVQ?lq>Ok91gP0INpxa;i_n*i zoY1_ctn6FP8Pm`OsB4jhA_&KKAc&LNQU&AO}q>=K9BH4Qsc z<+|BaUV-Xi>11nMs6ibAj-Z5Ese-_h(jXFhUTk+LQ@zo=3IPpp{ait3-nd}98)Jl05~Zk zyU6dFpRRv`c*L&)R>$}YF=3x{FYL%Ls{otD$g8oG;S7}FUEOB^tS@!3IQys!!(+qq z#X=Ur-}v=j&|CP7Xe)upjY(%J2>OIDG6FskP&9|sKkJig^DfGoI?Su{^XHl{njlqS z3L77}vg~$uCOCh2ex0Hw_hZJ&jI})VMogkXVGu$A?Q3i|Xe5Y=%qK6yRqQ`p5e@B9 zP!fjfg$09+`&@EZAH^5H25c+anyuDMZpWN(ZsO+j+lY}#viGT-UHdKfwY@^X)oHJS z#@#Zl$cmBc7{~rU6P3V|_%vy&2yiW}%5|K2I$==Qz#5Mm%Y*~g_7dU!{K(Vg`>Uq=>^ z>CAI@Ia;xG#9xAuchPNQfhLC)Webr2?rPL=im<}cISb-V8|u3;0o|e`%WMLo%iUIw zCT73+y6b;(PCdu5lmxL>w*pMA2xa6+B4_6_Ps@uw2&XvX?X3as{s}B09%G;i@Fy<> zZLp_X0d5%Z48;+RG502R0;t;rCF=IuUJ)MAfE+sw>v9GW&$5?eohJ&yy*(!o6WaEp-V7 zoxK8)iLd)+=nj!-Z87%*k2znSGDKaL zJdYv_LlcZb;I|Q!_`QLdQ8F~qE9X3&S~TAvlP?Q94>dG1Ff}qXH83(YFoFe_kwG;S zluY}}qS^JSnNrLkt|Na@Q}by(lWh?sf2Xz*NjAD#$42Lm&R?%ufdqmC&~9X9VMvW~ z4Rj2^6HhHbOOEbk6nVuy;B7GcPL>UK%QmX@5X_L);%>ynPkg)L4xuGMX$F6~N)H;r zUt-FESgt&<)kG#F2y(9-`)?@vp@J5s4Y6sv*QCYc`(B2@V@XC%kqNDh}WQPO6{bj^Cabv!{xbt zacU7A+jpHG0oO)qAj}Z`PR1I!h8!b@iMbcxEgXkUld=#3a9DzBW58lqOC0wq6R=w} zxLvr?&I8&p5x6vgaU{DehJ9-r}E9JmcmUHbjT3j65gyCf~G7gSc~P zwmzE`4@Q}z!mupkr1{sTf3LXwBuqhzf9f{OXlkT5$qFpEDB0E!2zyrbD>_Kp>|J#^ zr^to0Ho2Qdfl;(VzKCu&x&PVGt`ey;!{`*G(^s@;4pu<*;)*MmwXQ{@VQGm|MUOw{ zL25`Vp+EtpdHe(q+jEQcV5Wv&l4eDn$c=d~Z*A*Tuw~W5;dT`Ye>>_|OD<=0fZQv$ z=!_f27>Ykn;01xjdmTu_aMms$O~^&PLLIGyeL0`zj;6n(B3LdvLCu#Aj!hkho;bvd z0+b8-|HX-?KH8iwtV= z$GWIb;r0C*^i^|(Uousnu&`lL;;e)WEv4DqHG%vhA%c>le`y`sdEUY$jCq*=c`vXp zBlomAe@}O~wl+vQ^r+ttqD-gMw$^k*8Kvt31<+vGOgs*NOd?BRH=KmoxG8FbQ6*}S zdnvSKi2yw%s(sl=aU=f64dmW{DvP$lMH zdIMl6`Z$6^i&S`chF^lECek?@>xUVW$u;;Vr9^v$ZKnTBajGXBxZz9&V#JfvCrU%M!~MwMEDERC}ENTfUA{UUr$D*Vonvpp4Z zL8_4hj)EkBr|_+eG6S>WV1D+h?OCErXLGa_2KVJ->cE2wsUMRs3nG*35E%cnr|@xCVP-wy16%Mh^ijHMrsa0rUgNB zIbOo#H8{q35mPw=IV|D^g90sw8wa|f7l+%~MG0@!d$EH))ZDv@0p-FpSDKWzFVR$@ z;m04u#yV=Cmg+vD4FkhC0Lm!RLH9(qs9=09Y%O+w&7Ff~3NfK6k%oZup@98a7`|%U zmv=0Ziy-kM4(YUHNy>fd!AgSoS=dtpan-iVXTWOorD*m;!-a;LP0^|mkm(7(DJ7=U z{~N5nvwgkOdb%+Vk>h)+mYTGf8*bFGQ`wFO)V`c$rAiX!a&-XFcq#55oW-ck!!Fr~ z+qAcT&RuXqg1vhG+CxrHMKrA5QJRx1PbZpFDDm@RI@Ke=qOFm}m@sb3xlXKppGl4H z27QU`Vp*gr*1U$%6Bc!C6qTb!l)Q;CA!*-DzEsEc$#&?kslutsa)Sn<=@k(?Ml^S+ z<}$U92h=YugFA#$OD)Z*7(fVf0N8d(#8$n3DD_!d|0%qaiQUaqW=m9&riTIBlgly} z%4sYJ^Vl{KlQfXE=QJ;Aabf;SUrTikgS3;30I4x0Bs^}?%h&X8rq72D8lZQ^1mh4D zX9O~OD&dKR%-tm6&+n^JEv+JW!R7z6Klxb_A{xCU{ChSi?(8RYxB(U~Ej3sIMeE{! zS+_tZ>|`}=1A*;A@!cB9Y0ECJJP^D8+q3>ADp8|WQnm)n_E5DMaweSdA!??!Jl+Mp+Pvj%M}N~7%jDf$cp~$`N#DaKYs=R)s+GUgr{az zM+%TIMH((Bg@vk*aQer+G|%Z+MPc#^={%`RVL$N zUDIT&WGhLt=mbYUXBXO23>4+&sk2Jop+C!6P^LS0F14VBxQSvkFMm@I2eJ^*YM1$V z2jcq$TMU!ujSd4G_qH`sPU+HZi>v5VxQhsxG|&Z{o!Q=2eKg0Tu8nx9_-y%w_rHF|s;FF-Krhj0*t~YBbQ8uMJrq75xofGS;9f z>}DjgYy5unh$E}7MbBu5cA0i!i-(k-Xgu{9QHMx8;sYw`biWsbvm;?=#{GR^U|!Zm z05ZV*voWAQ64|mbq&fQ`g<&uAucfPEbglj9wq-503c5tU!yBW2{q$j%Z6F`2pFhwq z>qERuPeDMfqfmd2M7B;!M;&5s8bDODg`Y=S}xXyB5!6 z$8V6iJo})`C`&7U2Yje&QP?{wP`-(K_xEuhnVSV9;?nk3(S_!J#i^tJU7!ov?$AB~ z*>ui8A75xI-tfixZv$dvF7I}ooN~wYvU+0PY-l((1Mr#yCZ6$0>?~iHZ-rS=~I!1gbXoQ z>LB8u0Lu4&%ps>)BTcLB$pI>-ybg!|X#m~(n|Up<@GVwkmOk5ux>WrwOU@}VkYzIl z<0q;S+krXqhmWPUm1Xv7{sB8~2}!k;>k}Z8p|*@p)}b(Ms1${-m6ITq1?Z*uYO!wx9@vhEQvZOPRfYycrG)gJy)?#0UO{@%w=te}BXp zIw_y@iz=Axs^bCG-l5E?zGZZiqHaYsF**n{{>cWVrsB2xw=_gRPe&6K=F4Hap_*Lg zgJRYvqM1rSINh6zX7}!9YER37N?)K%t!s`$s4|F7!80kmkm4I%fqpTi zUpvtRZwp@dHU?q$JeKWIf#~IJ8t5)th%d>;4ywC6pJ_WcwrgDFg@upiuZ;F{ZE7>u z{&OuL+>SrW{Sb^?t$@b&DJT(;^a>o&#)!Os$5Y7fT)fS$^yoGbzL?G(PhJxoVW_+% z`(GFRNk6ui9LuJrg5P4KL|?0v^Io7i?o?%6`?g`yiRl~RIe4mGn`mn%5TC$Hhv3!Y z0p@_4T}oxiEIR!;p?fo6IDHc&4N`C4^g{!1YZMje94TJGGsqtlGFXD660{0lb!Fy% zQE|gs^mG^{6=BWvqH8u{9i`YThZ`Q=ybe`n(*!K9ma>Ha=|^wG=L+>2y^8^&X=?&HwG8`u!7D@fk5-~Z0G9wuX;-uCH-{t3nZJvN4w#Y*H$YBLMuFEPxcHp z0&g;p?Q+Pwm>VM*Icg76)S6;dJU!=A3Jk@5kiXa4sN#6ArUgID*;^LXg=&R=>t3FE zxbJ(g9Uw^1`@mUj)G6u8Yka3_`CJ3_lgL`J*tlx~GFB zBWcm?^q3nkJK!v9ZNS|zKrT!BhH2d2eyOy6~5Bv`xItd%Kt4(g?o2^hqDPEFs zelU)3zWSi-6oK&N!@NR_iu(5Qj4t5<< zhi=LP*A@9`4pvEIt6D!!KcpTH&o-re06`3U^zA;CFII;Qk2(`EMEbF`-44ZY}M9&a@pzRd#`Og;>IqJ-oCJsz#VC(E_- zyB7C$10bSKwL3Xbr}hT*`s008MdC4tNXWUPWM?HcFikKqFbxI?V1`HmWdj5P0R;dA zAa8_XBL}3gnz!esB3i+>b5-_PE1icD?yl%LxPYUm^8^&_uo*0x7m&;7loP9|wR_RL K2@)#;0w)mBimN67 diff --git a/Server/src/config/config.xml b/Server/src/config/config.xml index 71b4f989..42395c3d 100644 --- a/Server/src/config/config.xml +++ b/Server/src/config/config.xml @@ -75,6 +75,9 @@ msg_tracking.tcp_server_start="true" msg_tracking.tcp_server_port="9092" msg_tracking.tcp_server_password="openas2" + msg_tracking.aws_region="us-east-1" + msg_tracking.dynamodb_table_name="as2db-dev" + msg_tracking.consistent_read="true" reject_unsigned_messages="false" pollerConfigBase.outboxdir="$properties.storageBaseDir$/outbox/$partnership.receiver.as2_id$" pollerConfigBase.errordir="$properties.storageBaseDir$/outbox/error/$date.YYYY$-$date.MM$-$date.dd$/$partnership.receiver.as2_id$" @@ -151,9 +154,11 @@ tcp_server_start="$properties.msg_tracking.tcp_server_start$" tcp_server_port="$properties.msg_tracking.tcp_server_port$" tcp_server_password="$properties.msg_tracking.tcp_server_password$"/> - + params) + throws OpenAS2Exception { + + // Extract DynamoDB-specific parameters + tableName = params.get("table_name"); + if (tableName == null || tableName.isEmpty()) { + tableName = "msg_metadata"; // Default table name + } + + awsRegion = params.get("aws_region"); + if (awsRegion == null || awsRegion.isEmpty()) { + awsRegion = "us-east-1"; // Default region + logger.warn("No aws_region parameter specified, defaulting to us-east-1"); + } + + // Create the client + createConnectionPool(jdbcConnectString, dbUser, dbPwd); + + // If custom endpoint is specified (for local DynamoDB testing) + String endpoint = params.get("dynamodb_endpoint"); + if (endpoint != null && !endpoint.isEmpty()) { + logger.info("Using custom DynamoDB endpoint: " + endpoint); + dynamoDbClient.close(); + dynamoDbClient = DynamoDbClient.builder() + .region(Region.of(awsRegion)) + .endpointOverride(URI.create(endpoint)) + .credentialsProvider(dbUser != null && dbPwd != null + ? StaticCredentialsProvider.create(AwsBasicCredentials.create(dbUser, dbPwd)) + : DefaultCredentialsProvider.create()) + .build(); + } + + logger.info("DynamoDB handler started with table: " + tableName + " in region: " + awsRegion); + } + + /** + * Stops the DynamoDB handler and releases resources. + */ + @Override + public void stop() { + destroyConnectionPool(); + } + + /** + * Closes the DynamoDB client and releases resources. + */ + @Override + public void destroyConnectionPool() { + if (dynamoDbClient != null) { + try { + dynamoDbClient.close(); + logger.info("DynamoDB client closed successfully"); + } catch (Exception e) { + logger.error("Error closing DynamoDB client", e); + } finally { + dynamoDbClient = null; + } + } + } + + /** + * Returns the DynamoDB client. + * Note: This method signature is maintained for interface compatibility, + * but returns null as DynamoDB doesn't use JDBC connections. + * Use getDynamoDbClient() instead. + * + * @return null (DynamoDB doesn't use JDBC Connection) + * @throws SQLException Not thrown + * @throws OpenAS2Exception if client is not initialized + */ + @Override + public Connection getConnection() throws SQLException, OpenAS2Exception { + if (dynamoDbClient == null) { + throw new OpenAS2Exception("DynamoDB client not initialized"); + } + // Return null as DynamoDB doesn't use JDBC connections + // The actual DynamoDB client can be accessed via getDynamoDbClient() + return null; + } + + /** + * Gets the DynamoDB client for performing operations. + * + * @return the DynamoDB client + * @throws OpenAS2Exception if client is not initialized + */ + public DynamoDbClient getDynamoDbClient() throws OpenAS2Exception { + if (dynamoDbClient == null) { + throw new OpenAS2Exception("DynamoDB client not initialized"); + } + return dynamoDbClient; + } + + /** + * Gets the configured table name. + * + * @return the DynamoDB table name + */ + public String getTableName() { + return tableName; + } + + /** + * Shuts down the DynamoDB client. + * + * @param connectString Not used for DynamoDB + * @return true if shutdown was successful + * @throws SQLException Not thrown + * @throws OpenAS2Exception if shutdown fails + */ + @Override + public boolean shutdown(String connectString) throws SQLException, OpenAS2Exception { + destroyConnectionPool(); + return true; + } +} diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBTrackingModule.java b/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBTrackingModule.java new file mode 100644 index 00000000..408cb5a5 --- /dev/null +++ b/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBTrackingModule.java @@ -0,0 +1,487 @@ +/* Copyright Uhuru Technology 2016 https://www.uhurutechnology.com + * Distributed under the GPLv3 license or a commercial license must be acquired. + */ +package org.openas2.processor.msgtracking; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.openas2.OpenAS2Exception; +import org.openas2.Session; +import org.openas2.message.Message; +import org.openas2.util.DateUtil; + +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient; +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable; +import software.amazon.awssdk.enhanced.dynamodb.Key; +import software.amazon.awssdk.enhanced.dynamodb.TableSchema; +import software.amazon.awssdk.enhanced.dynamodb.model.GetItemEnhancedRequest; +import software.amazon.awssdk.enhanced.dynamodb.model.QueryConditional; +import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest; +import software.amazon.awssdk.enhanced.dynamodb.model.ScanEnhancedRequest; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * DynamoDB-based message tracking module. + * Stores AS2 message metadata in Amazon DynamoDB. + */ +public class DynamoDBTrackingModule extends BaseMsgTrackingModule { + + public static final String PARAM_AWS_REGION = "aws_region"; + public static final String PARAM_AWS_ACCESS_KEY = "aws_access_key_id"; + public static final String PARAM_AWS_SECRET_KEY = "aws_secret_access_key"; + public static final String PARAM_TABLE_NAME = "table_name"; + public static final String PARAM_DYNAMODB_ENDPOINT = "dynamodb_endpoint"; + public static final String PARAM_CONSISTENT_READ = "consistent_read"; + + private static final Logger logger = LoggerFactory.getLogger(DynamoDBTrackingModule.class); + private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); + + private DynamoDBHandler dbHandler = null; + private DynamoDbEnhancedClient enhancedClient = null; + private DynamoDbTable table = null; + private String tableName = "msg_metadata"; + private boolean consistentRead = false; // Optional: defaults to eventually consistent if not specified + private boolean isRunning = false; + + @Override + public void init(Session session, Map options) throws OpenAS2Exception { + super.init(session, options); + tableName = getParameter(PARAM_TABLE_NAME, "msg_metadata"); + + // Configure read consistency (optional - defaults to eventually consistent if not specified) + String consistentReadParam = getParameter(PARAM_CONSISTENT_READ, null); + if (consistentReadParam != null) { + consistentRead = Boolean.parseBoolean(consistentReadParam); + } + // else: remains false (eventually consistent, DynamoDB default) + + if (logger.isInfoEnabled()) { + logger.info("DynamoDB tracking module initialized with consistent_read=" + consistentRead + + (consistentReadParam == null ? " (not specified, using DynamoDB default)" : " (explicitly configured)")); + } + } + + @Override + protected void persist(Message msg, Map map) { + try { + // Check if record exists + String msgId = map.get(FIELDS.MSG_ID); + if (msgId == null || msgId.isEmpty()) { + logger.error("Cannot persist record without MSG_ID: " + map); + return; + } + + MessageMetadata existingMetadata = getMessageMetadata(msgId); + boolean isUpdate = (existingMetadata != null); + + MessageMetadata metadata = mapToMetadata(map, existingMetadata, isUpdate); + + // Put item to DynamoDB (will insert or update) + table.putItem(metadata); + + if (logger.isDebugEnabled()) { + logger.debug((isUpdate ? "Updated" : "Created") + " tracking record in DynamoDB: " + metadata); + } + + } catch (Exception e) { + msg.setLogMsg("Failed to persist tracking event to DynamoDB: " + + org.openas2.util.Logging.getExceptionMsg(e) + " ::: Data map: " + map); + logger.error(msg.getLogMsg(), e); + } + } + + /** + * Lists all messages from DynamoDB. + * Note: Uses scan operation which can be expensive for large tables. + * Consider implementing pagination for production use. + * + * @return list of message metadata records + */ + public ArrayList> listMessages() { + ArrayList> rows = new ArrayList<>(); + + try { + // Scan the table (consider using pagination for large datasets) + ScanEnhancedRequest scanRequest = ScanEnhancedRequest.builder() + .limit(1000) // Limit to 1000 items + .build(); + + table.scan(scanRequest).items().forEach(metadata -> { + HashMap row = metadataToMap(metadata); + rows.add(row); + }); + + if (logger.isDebugEnabled()) { + logger.debug("Listed " + rows.size() + " messages from DynamoDB"); + } + + } catch (DynamoDbException e) { + logger.error("Failed to list messages from DynamoDB", e); + } + + return rows; + } + + /** + * Retrieves a specific message by ID from DynamoDB. + * Uses strongly consistent reads if configured. + * + * @param msgId the message ID to retrieve + * @return map containing message metadata, or empty map if not found + */ + public HashMap showMessage(String msgId) { + HashMap row = new HashMap<>(); + + try { + Key key = Key.builder() + .partitionValue(msgId) + .build(); + + GetItemEnhancedRequest request = GetItemEnhancedRequest.builder() + .key(key) + .consistentRead(consistentRead) + .build(); + + MessageMetadata metadata = table.getItem(request); + + if (metadata != null) { + row = metadataToMap(metadata); + if (logger.isTraceEnabled()) { + logger.trace("Retrieved message from DynamoDB: " + msgId + " (consistentRead=" + consistentRead + ")"); + } + } else { + logger.warn("Message not found in DynamoDB: " + msgId); + } + + } catch (DynamoDbException e) { + logger.error("Failed to retrieve message from DynamoDB: " + msgId, e); + } + + return row; + } + + /** + * Retrieves messages within a date range for chart data. + * Uses the StateIndex GSI for efficient querying. + * + * @param params map containing startDate and endDate + * @return list of message metadata within the date range + */ + public ArrayList> getDataCharts(HashMap params) { + ArrayList> rows = new ArrayList<>(); + + try { + String startDate = params.get("startDate"); + String endDate = params.get("endDate"); + + if (startDate == null || endDate == null) { + logger.error("startDate and endDate are required for getDataCharts"); + return rows; + } + + // Convert dates to timestamps for comparison + String startTimestamp = startDate + " 00:00:00.000"; + String endTimestamp = endDate + " 23:59:59.999"; + + // Use scan with filter expression for date range + // Note: For better performance, consider using GSI with state as partition key + Map expressionValues = new HashMap<>(); + expressionValues.put(":startDate", AttributeValue.builder().s(startTimestamp).build()); + expressionValues.put(":endDate", AttributeValue.builder().s(endTimestamp).build()); + + ScanEnhancedRequest scanRequest = ScanEnhancedRequest.builder() + .filterExpression(software.amazon.awssdk.enhanced.dynamodb.Expression.builder() + .expression("createDt BETWEEN :startDate AND :endDate") + .expressionValues(expressionValues) + .build()) + .limit(1000) + .build(); + + table.scan(scanRequest).items().forEach(metadata -> { + HashMap row = new HashMap<>(); + row.put(FIELDS.MSG_ID, metadata.getMsgId()); + row.put(FIELDS.STATE, metadata.getState()); + row.put(FIELDS.STATUS, metadata.getStatus()); + row.put(FIELDS.CREATE_DT, metadata.getCreateDt()); + rows.add(row); + }); + + if (logger.isDebugEnabled()) { + logger.debug("Retrieved " + rows.size() + " chart data records from DynamoDB"); + } + + } catch (Exception e) { + logger.error("Failed to retrieve chart data from DynamoDB", e); + } + + return rows; + } + + @Override + public boolean isRunning() { + return isRunning; + } + + @Override + public void start() throws OpenAS2Exception { + dbHandler = new DynamoDBHandler(); + dbHandler.start(null, + getParameter(PARAM_AWS_ACCESS_KEY, null), + getParameter(PARAM_AWS_SECRET_KEY, null), + getParameters()); + + // Initialize Enhanced Client + enhancedClient = DynamoDbEnhancedClient.builder() + .dynamoDbClient(dbHandler.getDynamoDbClient()) + .build(); + + // Get table reference + table = enhancedClient.table(tableName, TableSchema.fromBean(MessageMetadata.class)); + + isRunning = true; + logger.info("DynamoDB tracking module started with table: " + tableName); + } + + @Override + public void stop() { + if (dbHandler != null) { + dbHandler.stop(); + } + isRunning = false; + logger.info("DynamoDB tracking module stopped"); + } + + @Override + public boolean healthcheck(List failures) { + try { + // Simple healthcheck: try to describe the table + if (table == null || dbHandler == null) { + failures.add(this.getClass().getSimpleName() + " - DynamoDB client not initialized"); + return false; + } + + // Try a simple get operation to verify connectivity + table.describeTable(); + return true; + + } catch (Exception e) { + failures.add(this.getClass().getSimpleName() + + " - Failed to check DynamoDB tracking module: " + e.getMessage()); + return false; + } + } + + /** + * Retrieves a MessageMetadata object by message ID. + * Uses strongly consistent reads if configured (important for Global Tables). + * + * @param msgId the message ID + * @return MessageMetadata object or null if not found + */ + private MessageMetadata getMessageMetadata(String msgId) { + try { + Key key = Key.builder() + .partitionValue(msgId) + .build(); + + GetItemEnhancedRequest request = GetItemEnhancedRequest.builder() + .key(key) + .consistentRead(consistentRead) + .build(); + + return table.getItem(request); + } catch (Exception e) { + logger.trace("Message not found (this is normal for new records): " + msgId); + return null; + } + } + + /** + * Converts a map to MessageMetadata object. + * + * @param map the source map + * @param existingMetadata existing metadata for update operations + * @param isUpdate whether this is an update operation + * @return MessageMetadata object + */ + private MessageMetadata mapToMetadata(Map map, MessageMetadata existingMetadata, boolean isUpdate) { + MessageMetadata metadata = existingMetadata != null ? existingMetadata : new MessageMetadata(); + + // Set partition key + if (map.containsKey(FIELDS.MSG_ID)) { + metadata.setMsgId(map.get(FIELDS.MSG_ID)); + } + + // Set other fields only if they're present in the map + if (map.containsKey(FIELDS.PRIOR_MSG_ID)) { + metadata.setPriorMsgId(map.get(FIELDS.PRIOR_MSG_ID)); + } + if (map.containsKey(FIELDS.MDN_ID)) { + metadata.setMdnId(map.get(FIELDS.MDN_ID)); + } + if (map.containsKey(FIELDS.DIRECTION)) { + metadata.setDirection(map.get(FIELDS.DIRECTION)); + } + if (map.containsKey(FIELDS.IS_RESEND)) { + metadata.setIsResend(map.get(FIELDS.IS_RESEND)); + } + if (map.containsKey(FIELDS.RESEND_COUNT)) { + String resendCountStr = map.get(FIELDS.RESEND_COUNT); + if (resendCountStr != null && !resendCountStr.isEmpty()) { + try { + metadata.setResendCount(Integer.parseInt(resendCountStr)); + } catch (NumberFormatException e) { + logger.warn("Invalid resend_count value: " + resendCountStr); + } + } + } + if (map.containsKey(FIELDS.SENDER_ID)) { + metadata.setSenderId(map.get(FIELDS.SENDER_ID)); + } + if (map.containsKey(FIELDS.RECEIVER_ID)) { + metadata.setReceiverId(map.get(FIELDS.RECEIVER_ID)); + } + if (map.containsKey(FIELDS.STATUS)) { + metadata.setStatus(map.get(FIELDS.STATUS)); + } + if (map.containsKey(FIELDS.STATE)) { + metadata.setState(map.get(FIELDS.STATE)); + } + if (map.containsKey(FIELDS.STATE_MSG)) { + metadata.setStateMsg(map.get(FIELDS.STATE_MSG)); + } + if (map.containsKey(FIELDS.SIGNATURE_ALGORITHM)) { + metadata.setSignatureAlgorithm(map.get(FIELDS.SIGNATURE_ALGORITHM)); + } + if (map.containsKey(FIELDS.ENCRYPTION_ALGORITHM)) { + metadata.setEncryptionAlgorithm(map.get(FIELDS.ENCRYPTION_ALGORITHM)); + } + if (map.containsKey(FIELDS.COMPRESSION)) { + metadata.setCompression(map.get(FIELDS.COMPRESSION)); + } + if (map.containsKey(FIELDS.FILE_NAME)) { + metadata.setFileName(map.get(FIELDS.FILE_NAME)); + } + if (map.containsKey(FIELDS.SENT_FILE_NAME)) { + metadata.setSentFileName(map.get(FIELDS.SENT_FILE_NAME)); + } + if (map.containsKey(FIELDS.CONTENT_TYPE)) { + metadata.setContentType(map.get(FIELDS.CONTENT_TYPE)); + } + if (map.containsKey(FIELDS.CONTENT_TRANSFER_ENCODING)) { + metadata.setContentTransferEncoding(map.get(FIELDS.CONTENT_TRANSFER_ENCODING)); + } + if (map.containsKey(FIELDS.MDN_MODE)) { + metadata.setMdnMode(map.get(FIELDS.MDN_MODE)); + } + if (map.containsKey(FIELDS.MDN_RESPONSE)) { + metadata.setMdnResponse(map.get(FIELDS.MDN_RESPONSE)); + } + + // Handle timestamps + String currentTimestamp = DateUtil.getSqlTimestamp(); + + if (isUpdate) { + // Update timestamp + metadata.setUpdateDt(currentTimestamp); + // Preserve create timestamp + if (existingMetadata != null && existingMetadata.getCreateDt() != null) { + metadata.setCreateDt(existingMetadata.getCreateDt()); + } + } else { + // New record - set create timestamp + metadata.setCreateDt(currentTimestamp); + } + + return metadata; + } + + /** + * Converts MessageMetadata object to a map. + * + * @param metadata the MessageMetadata object + * @return map containing message metadata + */ + private HashMap metadataToMap(MessageMetadata metadata) { + HashMap map = new HashMap<>(); + + if (metadata.getMsgId() != null) { + map.put("MSG_ID", metadata.getMsgId()); + } + if (metadata.getPriorMsgId() != null) { + map.put("PRIOR_MSG_ID", metadata.getPriorMsgId()); + } + if (metadata.getMdnId() != null) { + map.put("MDN_ID", metadata.getMdnId()); + } + if (metadata.getDirection() != null) { + map.put("DIRECTION", metadata.getDirection()); + } + if (metadata.getIsResend() != null) { + map.put("IS_RESEND", metadata.getIsResend()); + } + if (metadata.getResendCount() != null) { + map.put("RESEND_COUNT", metadata.getResendCount().toString()); + } + if (metadata.getSenderId() != null) { + map.put("SENDER_ID", metadata.getSenderId()); + } + if (metadata.getReceiverId() != null) { + map.put("RECEIVER_ID", metadata.getReceiverId()); + } + if (metadata.getStatus() != null) { + map.put("STATUS", metadata.getStatus()); + } + if (metadata.getState() != null) { + map.put("STATE", metadata.getState()); + } + if (metadata.getStateMsg() != null) { + map.put("STATE_MSG", metadata.getStateMsg()); + } + if (metadata.getSignatureAlgorithm() != null) { + map.put("SIGNATURE_ALGORITHM", metadata.getSignatureAlgorithm()); + } + if (metadata.getEncryptionAlgorithm() != null) { + map.put("ENCRYPTION_ALGORITHM", metadata.getEncryptionAlgorithm()); + } + if (metadata.getCompression() != null) { + map.put("COMPRESSION", metadata.getCompression()); + } + if (metadata.getFileName() != null) { + map.put("FILE_NAME", metadata.getFileName()); + } + if (metadata.getSentFileName() != null) { + map.put("SENT_FILE_NAME", metadata.getSentFileName()); + } + if (metadata.getContentType() != null) { + map.put("CONTENT_TYPE", metadata.getContentType()); + } + if (metadata.getContentTransferEncoding() != null) { + map.put("CONTENT_TRANSFER_ENCODING", metadata.getContentTransferEncoding()); + } + if (metadata.getMdnMode() != null) { + map.put("MDN_MODE", metadata.getMdnMode()); + } + if (metadata.getMdnResponse() != null) { + map.put("MDN_RESPONSE", metadata.getMdnResponse()); + } + if (metadata.getCreateDt() != null) { + map.put("CREATE_DT", metadata.getCreateDt()); + } + if (metadata.getUpdateDt() != null) { + map.put("UPDATE_DT", metadata.getUpdateDt()); + } + + return map; + } +} diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDbTrackingModule.java b/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDbTrackingModule.java deleted file mode 100644 index 640410b0..00000000 --- a/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDbTrackingModule.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.openas2.processor.msgtracking; - -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; -import com.amazonaws.endpoints.AccountIdEndpointMode; -import com.amazonaws.regions.Regions; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; -import org.openas2.OpenAS2Exception; -import org.openas2.message.Message; - -import java.util.List; -import java.util.Map; - -public class DynamoDbTrackingModule extends BaseMsgTrackingModule implements TrackingModule { - - public DynamoDbTrackingModule() { - super(); - ClientConfiguration config = new ClientConfiguration().withAccountIdEndpointMode(AccountIdEndpointMode.DISABLED); - AWSCredentialsProvider credentialsProvider = new EnvironmentVariableCredentialsProvider(); - - AmazonDynamoDB dynamodb = AmazonDynamoDBClientBuilder.standard().withClientConfiguration(config).withCredentials(credentialsProvider).withRegion(Regions.US_EAST_1).build(); - } - - @Override - public boolean isRunning() { - return false; - } - - @Override - public void start() throws OpenAS2Exception { - - } - - @Override - public void stop() throws OpenAS2Exception { - - } - - @Override - public boolean healthcheck(List failures) { - return false; - } - - @Override - protected void persist(Message msg, Map map) { - - } -} diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/MessageMetadata.java b/Server/src/main/java/org/openas2/processor/msgtracking/MessageMetadata.java new file mode 100644 index 00000000..e723950d --- /dev/null +++ b/Server/src/main/java/org/openas2/processor/msgtracking/MessageMetadata.java @@ -0,0 +1,239 @@ +/* Copyright Uhuru Technology 2016 https://www.uhurutechnology.com + * Distributed under the GPLv3 license or a commercial license must be acquired. + */ +package org.openas2.processor.msgtracking; + +import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; +import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; +import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey; +import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondarySortKey; + +import java.time.Instant; + +/** + * DynamoDB data model for AS2 message metadata. + * Maps to the msg_metadata table structure. + */ +@DynamoDbBean +public class MessageMetadata { + + private String msgId; + private String priorMsgId; + private String mdnId; + private String direction; + private String isResend; + private Integer resendCount; + private String senderId; + private String receiverId; + private String status; + private String state; + private String stateMsg; + private String signatureAlgorithm; + private String encryptionAlgorithm; + private String compression; + private String fileName; + private String sentFileName; + private String contentType; + private String contentTransferEncoding; + private String mdnMode; + private String mdnResponse; + private String createDt; + private String updateDt; + + /** + * Default constructor required by DynamoDB Enhanced Client. + */ + public MessageMetadata() { + } + + @DynamoDbPartitionKey + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + public String getPriorMsgId() { + return priorMsgId; + } + + public void setPriorMsgId(String priorMsgId) { + this.priorMsgId = priorMsgId; + } + + public String getMdnId() { + return mdnId; + } + + public void setMdnId(String mdnId) { + this.mdnId = mdnId; + } + + public String getDirection() { + return direction; + } + + public void setDirection(String direction) { + this.direction = direction; + } + + public String getIsResend() { + return isResend; + } + + public void setIsResend(String isResend) { + this.isResend = isResend; + } + + public Integer getResendCount() { + return resendCount; + } + + public void setResendCount(Integer resendCount) { + this.resendCount = resendCount; + } + + public String getSenderId() { + return senderId; + } + + public void setSenderId(String senderId) { + this.senderId = senderId; + } + + public String getReceiverId() { + return receiverId; + } + + public void setReceiverId(String receiverId) { + this.receiverId = receiverId; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + @DynamoDbSecondaryPartitionKey(indexNames = "StateIndex") + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + public String getStateMsg() { + return stateMsg; + } + + public void setStateMsg(String stateMsg) { + this.stateMsg = stateMsg; + } + + public String getSignatureAlgorithm() { + return signatureAlgorithm; + } + + public void setSignatureAlgorithm(String signatureAlgorithm) { + this.signatureAlgorithm = signatureAlgorithm; + } + + public String getEncryptionAlgorithm() { + return encryptionAlgorithm; + } + + public void setEncryptionAlgorithm(String encryptionAlgorithm) { + this.encryptionAlgorithm = encryptionAlgorithm; + } + + public String getCompression() { + return compression; + } + + public void setCompression(String compression) { + this.compression = compression; + } + + public String getFileName() { + return fileName; + } + + public void setFileName(String fileName) { + this.fileName = fileName; + } + + public String getSentFileName() { + return sentFileName; + } + + public void setSentFileName(String sentFileName) { + this.sentFileName = sentFileName; + } + + public String getContentType() { + return contentType; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public String getContentTransferEncoding() { + return contentTransferEncoding; + } + + public void setContentTransferEncoding(String contentTransferEncoding) { + this.contentTransferEncoding = contentTransferEncoding; + } + + public String getMdnMode() { + return mdnMode; + } + + public void setMdnMode(String mdnMode) { + this.mdnMode = mdnMode; + } + + public String getMdnResponse() { + return mdnResponse; + } + + public void setMdnResponse(String mdnResponse) { + this.mdnResponse = mdnResponse; + } + + @DynamoDbSecondarySortKey(indexNames = "StateIndex") + public String getCreateDt() { + return createDt; + } + + public void setCreateDt(String createDt) { + this.createDt = createDt; + } + + public String getUpdateDt() { + return updateDt; + } + + public void setUpdateDt(String updateDt) { + this.updateDt = updateDt; + } + + @Override + public String toString() { + return "MessageMetadata{" + + "msgId='" + msgId + '\'' + + ", senderId='" + senderId + '\'' + + ", receiverId='" + receiverId + '\'' + + ", state='" + state + '\'' + + ", status='" + status + '\'' + + ", createDt='" + createDt + '\'' + + '}'; + } +} diff --git a/pom.xml b/pom.xml index 8c60b327..3989b16a 100644 --- a/pom.xml +++ b/pom.xml @@ -34,23 +34,28 @@ 11 - 1.12.791 + 2.29.40 - - com.amazonaws - aws-java-sdk-bom - ${aws.java.sdk.version} - pom - import - - - com.amazonaws - DynamoDBLocal - [1.11,2.0) - + + software.amazon.awssdk + bom + ${aws.java.sdk.version} + pom + import + + + software.amazon.awssdk + dynamodb + ${aws.java.sdk.version} + + + software.amazon.awssdk + dynamodb-enhanced + ${aws.java.sdk.version} + org.osgi org.osgi.core From ae3fd3eeec37f0de382968b12f94d4e2ffd7c01c Mon Sep 17 00:00:00 2001 From: Daniel M Date: Wed, 12 Nov 2025 19:14:56 +1100 Subject: [PATCH 5/7] wip --- .../msgtracking/DynamoDBTrackingModule.java | 272 ++++++------------ .../msgtracking/MessageMetadata.java | 5 + 2 files changed, 93 insertions(+), 184 deletions(-) diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBTrackingModule.java b/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBTrackingModule.java index 408cb5a5..ba537ef7 100644 --- a/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBTrackingModule.java +++ b/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBTrackingModule.java @@ -15,15 +15,10 @@ import software.amazon.awssdk.enhanced.dynamodb.Key; import software.amazon.awssdk.enhanced.dynamodb.TableSchema; import software.amazon.awssdk.enhanced.dynamodb.model.GetItemEnhancedRequest; -import software.amazon.awssdk.enhanced.dynamodb.model.QueryConditional; -import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest; import software.amazon.awssdk.enhanced.dynamodb.model.ScanEnhancedRequest; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; @@ -67,7 +62,7 @@ public void init(Session session, Map options) throws OpenAS2Exc if (logger.isInfoEnabled()) { logger.info("DynamoDB tracking module initialized with consistent_read=" + consistentRead + - (consistentReadParam == null ? " (not specified, using DynamoDB default)" : " (explicitly configured)")); + (consistentReadParam == null ? " (not specified, using DynamoDB default)" : " (explicitly configured)")); } } @@ -77,25 +72,21 @@ protected void persist(Message msg, Map map) { // Check if record exists String msgId = map.get(FIELDS.MSG_ID); if (msgId == null || msgId.isEmpty()) { - logger.error("Cannot persist record without MSG_ID: " + map); + logger.error("Cannot persist record without MSG_ID: {}", map); return; } - MessageMetadata existingMetadata = getMessageMetadata(msgId); - boolean isUpdate = (existingMetadata != null); - - MessageMetadata metadata = mapToMetadata(map, existingMetadata, isUpdate); + MessageMetadata metadata = mapToMetadata(map, existingMetadata); // Put item to DynamoDB (will insert or update) table.putItem(metadata); if (logger.isDebugEnabled()) { - logger.debug((isUpdate ? "Updated" : "Created") + " tracking record in DynamoDB: " + metadata); + logger.debug("{} tracking record in DynamoDB: {}", existingMetadata != null ? "Updated" : "Created", metadata); } - } catch (Exception e) { msg.setLogMsg("Failed to persist tracking event to DynamoDB: " + - org.openas2.util.Logging.getExceptionMsg(e) + " ::: Data map: " + map); + org.openas2.util.Logging.getExceptionMsg(e) + " ::: Data map: " + map); logger.error(msg.getLogMsg(), e); } } @@ -107,17 +98,17 @@ protected void persist(Message msg, Map map) { * * @return list of message metadata records */ - public ArrayList> listMessages() { - ArrayList> rows = new ArrayList<>(); + public ArrayList> listMessages() { + ArrayList> rows = new ArrayList<>(); try { // Scan the table (consider using pagination for large datasets) ScanEnhancedRequest scanRequest = ScanEnhancedRequest.builder() - .limit(1000) // Limit to 1000 items - .build(); + .limit(1000) // Limit to 1000 items + .build(); table.scan(scanRequest).items().forEach(metadata -> { - HashMap row = metadataToMap(metadata); + Map row = metadataToMap(metadata); rows.add(row); }); @@ -139,18 +130,18 @@ public ArrayList> listMessages() { * @param msgId the message ID to retrieve * @return map containing message metadata, or empty map if not found */ - public HashMap showMessage(String msgId) { - HashMap row = new HashMap<>(); + public Map showMessage(String msgId) { + Map row = new HashMap<>(); try { Key key = Key.builder() - .partitionValue(msgId) - .build(); + .partitionValue(msgId) + .build(); GetItemEnhancedRequest request = GetItemEnhancedRequest.builder() - .key(key) - .consistentRead(consistentRead) - .build(); + .key(key) + .consistentRead(consistentRead) + .build(); MessageMetadata metadata = table.getItem(request); @@ -200,12 +191,12 @@ public ArrayList> getDataCharts(HashMap expressionValues.put(":endDate", AttributeValue.builder().s(endTimestamp).build()); ScanEnhancedRequest scanRequest = ScanEnhancedRequest.builder() - .filterExpression(software.amazon.awssdk.enhanced.dynamodb.Expression.builder() - .expression("createDt BETWEEN :startDate AND :endDate") - .expressionValues(expressionValues) - .build()) - .limit(1000) - .build(); + .filterExpression(software.amazon.awssdk.enhanced.dynamodb.Expression.builder() + .expression("createDt BETWEEN :startDate AND :endDate") + .expressionValues(expressionValues) + .build()) + .limit(1000) + .build(); table.scan(scanRequest).items().forEach(metadata -> { HashMap row = new HashMap<>(); @@ -217,7 +208,7 @@ public ArrayList> getDataCharts(HashMap }); if (logger.isDebugEnabled()) { - logger.debug("Retrieved " + rows.size() + " chart data records from DynamoDB"); + logger.debug("Retrieved {} chart data records from DynamoDB", rows.size()); } } catch (Exception e) { @@ -236,14 +227,14 @@ public boolean isRunning() { public void start() throws OpenAS2Exception { dbHandler = new DynamoDBHandler(); dbHandler.start(null, - getParameter(PARAM_AWS_ACCESS_KEY, null), - getParameter(PARAM_AWS_SECRET_KEY, null), - getParameters()); + getParameter(PARAM_AWS_ACCESS_KEY, null), + getParameter(PARAM_AWS_SECRET_KEY, null), + getParameters()); // Initialize Enhanced Client enhancedClient = DynamoDbEnhancedClient.builder() - .dynamoDbClient(dbHandler.getDynamoDbClient()) - .build(); + .dynamoDbClient(dbHandler.getDynamoDbClient()) + .build(); // Get table reference table = enhancedClient.table(tableName, TableSchema.fromBean(MessageMetadata.class)); @@ -276,7 +267,7 @@ public boolean healthcheck(List failures) { } catch (Exception e) { failures.add(this.getClass().getSimpleName() + - " - Failed to check DynamoDB tracking module: " + e.getMessage()); + " - Failed to check DynamoDB tracking module: " + e.getMessage()); return false; } } @@ -291,13 +282,13 @@ public boolean healthcheck(List failures) { private MessageMetadata getMessageMetadata(String msgId) { try { Key key = Key.builder() - .partitionValue(msgId) - .build(); + .partitionValue(msgId) + .build(); GetItemEnhancedRequest request = GetItemEnhancedRequest.builder() - .key(key) - .consistentRead(consistentRead) - .build(); + .key(key) + .consistentRead(consistentRead) + .build(); return table.getItem(request); } catch (Exception e) { @@ -309,93 +300,52 @@ private MessageMetadata getMessageMetadata(String msgId) { /** * Converts a map to MessageMetadata object. * - * @param map the source map + * @param map the source map * @param existingMetadata existing metadata for update operations - * @param isUpdate whether this is an update operation * @return MessageMetadata object */ - private MessageMetadata mapToMetadata(Map map, MessageMetadata existingMetadata, boolean isUpdate) { + private MessageMetadata mapToMetadata(Map map, MessageMetadata existingMetadata) { MessageMetadata metadata = existingMetadata != null ? existingMetadata : new MessageMetadata(); // Set partition key - if (map.containsKey(FIELDS.MSG_ID)) { - metadata.setMsgId(map.get(FIELDS.MSG_ID)); - } + metadata.setMsgId(map.get(FIELDS.MSG_ID)); // Set other fields only if they're present in the map - if (map.containsKey(FIELDS.PRIOR_MSG_ID)) { - metadata.setPriorMsgId(map.get(FIELDS.PRIOR_MSG_ID)); - } - if (map.containsKey(FIELDS.MDN_ID)) { - metadata.setMdnId(map.get(FIELDS.MDN_ID)); - } - if (map.containsKey(FIELDS.DIRECTION)) { - metadata.setDirection(map.get(FIELDS.DIRECTION)); - } - if (map.containsKey(FIELDS.IS_RESEND)) { - metadata.setIsResend(map.get(FIELDS.IS_RESEND)); - } - if (map.containsKey(FIELDS.RESEND_COUNT)) { - String resendCountStr = map.get(FIELDS.RESEND_COUNT); - if (resendCountStr != null && !resendCountStr.isEmpty()) { - try { - metadata.setResendCount(Integer.parseInt(resendCountStr)); - } catch (NumberFormatException e) { - logger.warn("Invalid resend_count value: " + resendCountStr); - } + metadata.setPriorMsgId(map.get(FIELDS.PRIOR_MSG_ID)); + metadata.setMdnId(map.get(FIELDS.MDN_ID)); + metadata.setDirection(map.get(FIELDS.DIRECTION)); + metadata.setIsResend(map.get(FIELDS.IS_RESEND)); + String resendCountStr = map.get(FIELDS.RESEND_COUNT); + if (resendCountStr != null && !resendCountStr.isEmpty()) { + try { + metadata.setResendCount(Integer.parseInt(resendCountStr)); + } catch (NumberFormatException e) { + logger.warn("Invalid resend_count value: {}", resendCountStr); } } - if (map.containsKey(FIELDS.SENDER_ID)) { - metadata.setSenderId(map.get(FIELDS.SENDER_ID)); - } - if (map.containsKey(FIELDS.RECEIVER_ID)) { - metadata.setReceiverId(map.get(FIELDS.RECEIVER_ID)); - } - if (map.containsKey(FIELDS.STATUS)) { - metadata.setStatus(map.get(FIELDS.STATUS)); - } - if (map.containsKey(FIELDS.STATE)) { - metadata.setState(map.get(FIELDS.STATE)); - } - if (map.containsKey(FIELDS.STATE_MSG)) { - metadata.setStateMsg(map.get(FIELDS.STATE_MSG)); - } - if (map.containsKey(FIELDS.SIGNATURE_ALGORITHM)) { - metadata.setSignatureAlgorithm(map.get(FIELDS.SIGNATURE_ALGORITHM)); - } - if (map.containsKey(FIELDS.ENCRYPTION_ALGORITHM)) { - metadata.setEncryptionAlgorithm(map.get(FIELDS.ENCRYPTION_ALGORITHM)); - } - if (map.containsKey(FIELDS.COMPRESSION)) { - metadata.setCompression(map.get(FIELDS.COMPRESSION)); - } - if (map.containsKey(FIELDS.FILE_NAME)) { - metadata.setFileName(map.get(FIELDS.FILE_NAME)); - } - if (map.containsKey(FIELDS.SENT_FILE_NAME)) { - metadata.setSentFileName(map.get(FIELDS.SENT_FILE_NAME)); - } - if (map.containsKey(FIELDS.CONTENT_TYPE)) { - metadata.setContentType(map.get(FIELDS.CONTENT_TYPE)); - } - if (map.containsKey(FIELDS.CONTENT_TRANSFER_ENCODING)) { - metadata.setContentTransferEncoding(map.get(FIELDS.CONTENT_TRANSFER_ENCODING)); - } - if (map.containsKey(FIELDS.MDN_MODE)) { - metadata.setMdnMode(map.get(FIELDS.MDN_MODE)); - } - if (map.containsKey(FIELDS.MDN_RESPONSE)) { - metadata.setMdnResponse(map.get(FIELDS.MDN_RESPONSE)); - } + metadata.setSenderId(map.get(FIELDS.SENDER_ID)); + metadata.setReceiverId(map.get(FIELDS.RECEIVER_ID)); + metadata.setStatus(map.get(FIELDS.STATUS)); + metadata.setState(map.get(FIELDS.STATE)); + metadata.setStateMsg(map.get(FIELDS.STATE_MSG)); + metadata.setSignatureAlgorithm(map.get(FIELDS.SIGNATURE_ALGORITHM)); + metadata.setEncryptionAlgorithm(map.get(FIELDS.ENCRYPTION_ALGORITHM)); + metadata.setCompression(map.get(FIELDS.COMPRESSION)); + metadata.setFileName(map.get(FIELDS.FILE_NAME)); + metadata.setSentFileName(map.get(FIELDS.SENT_FILE_NAME)); + metadata.setContentType(map.get(FIELDS.CONTENT_TYPE)); + metadata.setContentTransferEncoding(map.get(FIELDS.CONTENT_TRANSFER_ENCODING)); + metadata.setMdnMode(map.get(FIELDS.MDN_MODE)); + metadata.setMdnResponse(map.get(FIELDS.MDN_RESPONSE)); // Handle timestamps String currentTimestamp = DateUtil.getSqlTimestamp(); - if (isUpdate) { + if (existingMetadata != null) { // Update timestamp metadata.setUpdateDt(currentTimestamp); // Preserve create timestamp - if (existingMetadata != null && existingMetadata.getCreateDt() != null) { + if (existingMetadata.getCreateDt() != null) { metadata.setCreateDt(existingMetadata.getCreateDt()); } } else { @@ -412,76 +362,30 @@ private MessageMetadata mapToMetadata(Map map, MessageMetadata e * @param metadata the MessageMetadata object * @return map containing message metadata */ - private HashMap metadataToMap(MessageMetadata metadata) { - HashMap map = new HashMap<>(); - - if (metadata.getMsgId() != null) { - map.put("MSG_ID", metadata.getMsgId()); - } - if (metadata.getPriorMsgId() != null) { - map.put("PRIOR_MSG_ID", metadata.getPriorMsgId()); - } - if (metadata.getMdnId() != null) { - map.put("MDN_ID", metadata.getMdnId()); - } - if (metadata.getDirection() != null) { - map.put("DIRECTION", metadata.getDirection()); - } - if (metadata.getIsResend() != null) { - map.put("IS_RESEND", metadata.getIsResend()); - } - if (metadata.getResendCount() != null) { - map.put("RESEND_COUNT", metadata.getResendCount().toString()); - } - if (metadata.getSenderId() != null) { - map.put("SENDER_ID", metadata.getSenderId()); - } - if (metadata.getReceiverId() != null) { - map.put("RECEIVER_ID", metadata.getReceiverId()); - } - if (metadata.getStatus() != null) { - map.put("STATUS", metadata.getStatus()); - } - if (metadata.getState() != null) { - map.put("STATE", metadata.getState()); - } - if (metadata.getStateMsg() != null) { - map.put("STATE_MSG", metadata.getStateMsg()); - } - if (metadata.getSignatureAlgorithm() != null) { - map.put("SIGNATURE_ALGORITHM", metadata.getSignatureAlgorithm()); - } - if (metadata.getEncryptionAlgorithm() != null) { - map.put("ENCRYPTION_ALGORITHM", metadata.getEncryptionAlgorithm()); - } - if (metadata.getCompression() != null) { - map.put("COMPRESSION", metadata.getCompression()); - } - if (metadata.getFileName() != null) { - map.put("FILE_NAME", metadata.getFileName()); - } - if (metadata.getSentFileName() != null) { - map.put("SENT_FILE_NAME", metadata.getSentFileName()); - } - if (metadata.getContentType() != null) { - map.put("CONTENT_TYPE", metadata.getContentType()); - } - if (metadata.getContentTransferEncoding() != null) { - map.put("CONTENT_TRANSFER_ENCODING", metadata.getContentTransferEncoding()); - } - if (metadata.getMdnMode() != null) { - map.put("MDN_MODE", metadata.getMdnMode()); - } - if (metadata.getMdnResponse() != null) { - map.put("MDN_RESPONSE", metadata.getMdnResponse()); - } - if (metadata.getCreateDt() != null) { - map.put("CREATE_DT", metadata.getCreateDt()); - } - if (metadata.getUpdateDt() != null) { - map.put("UPDATE_DT", metadata.getUpdateDt()); - } - + private Map metadataToMap(MessageMetadata metadata) { + Map map = new HashMap<>(); + map.put(FIELDS.MSG_ID, metadata.getMsgId()); + map.put(FIELDS.PRIOR_MSG_ID, metadata.getPriorMsgId()); + map.put(FIELDS.MDN_ID, metadata.getMdnId()); + map.put(FIELDS.DIRECTION, metadata.getDirection()); + map.put(FIELDS.IS_RESEND, metadata.getIsResend()); + map.put(FIELDS.RESEND_COUNT, metadata.getResendCountStr()); + map.put(FIELDS.SENDER_ID, metadata.getSenderId()); + map.put(FIELDS.RECEIVER_ID, metadata.getReceiverId()); + map.put(FIELDS.STATUS, metadata.getStatus()); + map.put(FIELDS.STATE, metadata.getState()); + map.put(FIELDS.STATE_MSG, metadata.getStateMsg()); + map.put(FIELDS.SIGNATURE_ALGORITHM, metadata.getSignatureAlgorithm()); + map.put(FIELDS.ENCRYPTION_ALGORITHM, metadata.getEncryptionAlgorithm()); + map.put(FIELDS.COMPRESSION, metadata.getCompression()); + map.put(FIELDS.FILE_NAME, metadata.getFileName()); + map.put(FIELDS.SENT_FILE_NAME, metadata.getSentFileName()); + map.put(FIELDS.CONTENT_TYPE, metadata.getContentType()); + map.put(FIELDS.CONTENT_TRANSFER_ENCODING, metadata.getContentTransferEncoding()); + map.put(FIELDS.MDN_MODE, metadata.getMdnMode()); + map.put(FIELDS.MDN_RESPONSE, metadata.getMdnResponse()); + map.put(FIELDS.CREATE_DT, metadata.getCreateDt()); + map.put(FIELDS.UPDATE_DT, metadata.getUpdateDt()); return map; } } diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/MessageMetadata.java b/Server/src/main/java/org/openas2/processor/msgtracking/MessageMetadata.java index e723950d..b753d79f 100644 --- a/Server/src/main/java/org/openas2/processor/msgtracking/MessageMetadata.java +++ b/Server/src/main/java/org/openas2/processor/msgtracking/MessageMetadata.java @@ -3,12 +3,14 @@ */ package org.openas2.processor.msgtracking; +import org.openas2.util.DateUtil; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondarySortKey; import java.time.Instant; +import java.util.Map; /** * DynamoDB data model for AS2 message metadata. @@ -90,6 +92,9 @@ public void setIsResend(String isResend) { public Integer getResendCount() { return resendCount; } + public String getResendCountStr() { + return resendCount == null ? null : resendCount.toString(); + } public void setResendCount(Integer resendCount) { this.resendCount = resendCount; From 4c9943b8fff2033a868faef20fd3f1d3010a4502 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Wed, 12 Nov 2025 19:27:32 +1100 Subject: [PATCH 6/7] wip --- .../msgtracking/DynamoDBHandler.java | 1 - .../msgtracking/DynamoDBTrackingModule.java | 22 +++++-------------- 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBHandler.java b/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBHandler.java index d5001ab7..c1b5bc54 100644 --- a/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBHandler.java +++ b/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBHandler.java @@ -8,7 +8,6 @@ import org.openas2.OpenAS2Exception; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBTrackingModule.java b/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBTrackingModule.java index ba537ef7..ee06e39e 100644 --- a/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBTrackingModule.java +++ b/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBTrackingModule.java @@ -31,20 +31,17 @@ */ public class DynamoDBTrackingModule extends BaseMsgTrackingModule { - public static final String PARAM_AWS_REGION = "aws_region"; public static final String PARAM_AWS_ACCESS_KEY = "aws_access_key_id"; public static final String PARAM_AWS_SECRET_KEY = "aws_secret_access_key"; public static final String PARAM_TABLE_NAME = "table_name"; - public static final String PARAM_DYNAMODB_ENDPOINT = "dynamodb_endpoint"; public static final String PARAM_CONSISTENT_READ = "consistent_read"; private static final Logger logger = LoggerFactory.getLogger(DynamoDBTrackingModule.class); private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); private DynamoDBHandler dbHandler = null; - private DynamoDbEnhancedClient enhancedClient = null; private DynamoDbTable table = null; - private String tableName = "msg_metadata"; + private String tableName; private boolean consistentRead = false; // Optional: defaults to eventually consistent if not specified private boolean isRunning = false; @@ -59,11 +56,7 @@ public void init(Session session, Map options) throws OpenAS2Exc consistentRead = Boolean.parseBoolean(consistentReadParam); } // else: remains false (eventually consistent, DynamoDB default) - - if (logger.isInfoEnabled()) { - logger.info("DynamoDB tracking module initialized with consistent_read=" + consistentRead + - (consistentReadParam == null ? " (not specified, using DynamoDB default)" : " (explicitly configured)")); - } + logger.info("DynamoDB tracking module initialized with consistent_read_param={}, consistent_read={}", consistentReadParam, consistentRead); } @Override @@ -232,7 +225,7 @@ public void start() throws OpenAS2Exception { getParameters()); // Initialize Enhanced Client - enhancedClient = DynamoDbEnhancedClient.builder() + DynamoDbEnhancedClient enhancedClient = DynamoDbEnhancedClient.builder() .dynamoDbClient(dbHandler.getDynamoDbClient()) .build(); @@ -240,7 +233,7 @@ public void start() throws OpenAS2Exception { table = enhancedClient.table(tableName, TableSchema.fromBean(MessageMetadata.class)); isRunning = true; - logger.info("DynamoDB tracking module started with table: " + tableName); + logger.info("DynamoDB tracking module started with table: {}", tableName); } @Override @@ -260,14 +253,11 @@ public boolean healthcheck(List failures) { failures.add(this.getClass().getSimpleName() + " - DynamoDB client not initialized"); return false; } - // Try a simple get operation to verify connectivity table.describeTable(); return true; - } catch (Exception e) { - failures.add(this.getClass().getSimpleName() + - " - Failed to check DynamoDB tracking module: " + e.getMessage()); + failures.add(this.getClass().getSimpleName() + " - Failed to check DynamoDB tracking module: " + e.getMessage()); return false; } } @@ -292,7 +282,7 @@ private MessageMetadata getMessageMetadata(String msgId) { return table.getItem(request); } catch (Exception e) { - logger.trace("Message not found (this is normal for new records): " + msgId); + logger.trace("Message not found (this is normal for new records): {}", msgId); return null; } } From 5e13e41c63f64e466a09b5d00c67c8a0b8b44f9d Mon Sep 17 00:00:00 2001 From: Daniel M Date: Wed, 12 Nov 2025 19:58:07 -0500 Subject: [PATCH 7/7] wip --- .../main/java/org/openas2/BaseComponent.java | 2 +- .../msgtracking/DynamoDBHandler.java | 47 +++++++------------ .../msgtracking/DynamoDBTrackingModule.java | 3 +- 3 files changed, 20 insertions(+), 32 deletions(-) diff --git a/Server/src/main/java/org/openas2/BaseComponent.java b/Server/src/main/java/org/openas2/BaseComponent.java index 7a1d2b74..a521efdd 100644 --- a/Server/src/main/java/org/openas2/BaseComponent.java +++ b/Server/src/main/java/org/openas2/BaseComponent.java @@ -68,7 +68,7 @@ public int getParameterInt(String key, boolean required, int defaultValue) throw public Map getParameters() { if (parameters == null) { - parameters = new HashMap(); + parameters = new HashMap<>(); } return parameters; diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBHandler.java b/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBHandler.java index c1b5bc54..86459ac4 100644 --- a/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBHandler.java +++ b/Server/src/main/java/org/openas2/processor/msgtracking/DynamoDBHandler.java @@ -33,22 +33,20 @@ class DynamoDBHandler implements IDBHandler { @Nullable private DynamoDbClient dynamoDbClient = null; - private String tableName = null; private String awsRegion = null; /** * Creates DynamoDB client with provided credentials and configuration. * * @param connectString Not used for DynamoDB (table name passed in params) - * @param userName AWS Access Key ID (can be null to use default credentials) - * @param pwd AWS Secret Access Key (can be null to use default credentials) + * @param userName AWS Access Key ID (can be null to use default credentials) + * @param pwd AWS Secret Access Key (can be null to use default credentials) * @throws OpenAS2Exception if client creation fails */ @Override public void createConnectionPool(String connectString, String userName, String pwd) throws OpenAS2Exception { if (dynamoDbClient != null) { - throw new OpenAS2Exception( - "DynamoDB client already initialized. Cannot create a new client. Stop current one first."); + throw new OpenAS2Exception("DynamoDB client already initialized. Cannot create a new client. Stop current one first."); } try { @@ -74,7 +72,7 @@ public void createConnectionPool(String connectString, String userName, String p } dynamoDbClient = builder.build(); - logger.info("DynamoDB client initialized successfully for region: " + awsRegion); + logger.info("DynamoDB client initialized successfully for region: {}", awsRegion); } catch (Exception e) { throw new OpenAS2Exception("Failed to initialize DynamoDB client: " + e.getMessage(), e); @@ -85,9 +83,9 @@ public void createConnectionPool(String connectString, String userName, String p * Initializes the DynamoDB handler with configuration parameters. * * @param jdbcConnectString Not used for DynamoDB - * @param dbUser AWS Access Key ID (can be null) - * @param dbPwd AWS Secret Access Key (can be null) - * @param params Configuration parameters including table_name, aws_region, dynamodb_endpoint + * @param dbUser AWS Access Key ID (can be null) + * @param dbPwd AWS Secret Access Key (can be null) + * @param params Configuration parameters including table_name, aws_region, dynamodb_endpoint * @throws OpenAS2Exception if initialization fails */ @Override @@ -95,7 +93,7 @@ public void start(String jdbcConnectString, String dbUser, String dbPwd, Map options) throws OpenAS2Exception { super.init(session, options); - tableName = getParameter(PARAM_TABLE_NAME, "msg_metadata"); + tableName = getParameter(DbTrackingModule.PARAM_TABLE_NAME, "msg_metadata"); // Configure read consistency (optional - defaults to eventually consistent if not specified) String consistentReadParam = getParameter(PARAM_CONSISTENT_READ, null);