Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions core/integration/trg/ctp_status_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2026 CERN and copyright holders of ALICE O².
* Author: Piotr Konopka <pkonopka@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package trg

import (
"context"
"fmt"

trgpb "github.com/AliceO2Group/Control/core/integration/trg/protos"
"github.com/segmentio/kafka-go"
"github.com/spf13/viper"
"google.golang.org/protobuf/proto"
)

// CtpStatusReader reads CTP status messages from Kafka
type CtpStatusReader struct {
*kafka.Reader
}

// NewCtpStatusReader creates a new reader for CTP status messages
func NewCtpStatusReader(topic string, groupID string) *CtpStatusReader {
cfg := kafka.ReaderConfig{
Brokers: viper.GetStringSlice("kafkaEndpoints"),
Topic: topic,
GroupID: groupID,
MinBytes: 1,
MaxBytes: 10e7,
}

return &CtpStatusReader{
Reader: kafka.NewReader(cfg),
}
}

// Next reads the next CTP status message from Kafka
func (r *CtpStatusReader) Next(ctx context.Context) (*trgpb.Status, error) {
if r == nil {
return nil, fmt.Errorf("nil reader")
}
msg, err := r.ReadMessage(ctx)
if err != nil {
return nil, err
}

var status trgpb.Status
if err := proto.Unmarshal(msg.Value, &status); err != nil {
return nil, fmt.Errorf("failed to unmarshal CTP status message: %w", err)
}
return &status, nil
}
156 changes: 138 additions & 18 deletions core/integration/trg/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
*/

//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative --go-grpc_out=require_unimplemented_servers=false:. protos/ctpecs.proto
//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative --go-grpc_out=require_unimplemented_servers=false:. protos/ctp_status.proto

// Package trg provides integration with the ALICE trigger system.
package trg

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/url"
"strconv"
"strings"
Expand Down Expand Up @@ -64,25 +67,37 @@ const (
TOPIC = topic.IntegratedService + topic.Separator + "trg"
)

var ctpStatusTopic topic.Topic = "ctp.status"

type Plugin struct {
trgHost string
trgPort int

trgClient *RpcClient
trgClient *RpcClient
trgClientCancel context.CancelFunc

kafkaReader *CtpStatusReader
kafkaReaderCtx context.Context
kafkaReaderCancel context.CancelFunc

pendingRunStops map[string] /*envId*/ int64
pendingRunUnloads map[string] /*envId*/ int64

cachedStatus *TrgStatus
cachedStatusMu sync.RWMutex
cachedStatusCancelFunc context.CancelFunc
cachedStatus *TrgStatus
cachedStatusMu sync.RWMutex
}

type TrgStatus struct {
// Fields from TRG RunList queries
RunCount int `json:"runCount,omitempty"`
Lines []string `json:"lines,omitempty"`
Structured Runs `json:"structured,omitempty"`
EnvMap map[uid.ID]Run `json:"envMap,omitempty"`

// Fields from CTP status Kafka messages
Clock string `json:"clock,omitempty"`
DetectorsInHoldover []string `json:"detectorsInHoldover,omitempty"`
ClockTransitionTimeExpected int64 `json:"clockTransitionTimeExpected,omitempty"` // nanoseconds since epoch, 0 if not expected
}

func NewPlugin(endpoint string) integration.Plugin {
Expand All @@ -102,6 +117,7 @@ func NewPlugin(endpoint string) integration.Plugin {
trgClient: nil,
pendingRunStops: make(map[string]int64),
pendingRunUnloads: make(map[string]int64),
cachedStatus: &TrgStatus{},
}
}

Expand Down Expand Up @@ -189,15 +205,11 @@ func (p *Plugin) queryRunList() {
}
}

out := &TrgStatus{
RunCount: int(runReply.Rc),
Lines: strings.Split(runReply.Msg, "\n"),
Structured: structured,
EnvMap: envMap,
}

p.cachedStatusMu.Lock()
p.cachedStatus = out
p.cachedStatus.RunCount = int(runReply.Rc)
p.cachedStatus.Lines = strings.Split(runReply.Msg, "\n")
p.cachedStatus.Structured = structured
p.cachedStatus.EnvMap = envMap
p.cachedStatusMu.Unlock()
}

Expand Down Expand Up @@ -231,10 +243,6 @@ func (p *Plugin) GetEnvironmentsData(envIds []uid.ID) map[uid.ID]string {
defer p.cachedStatusMu.RUnlock()

out := make(map[uid.ID]string)

if p.cachedStatus == nil {
return nil
}
envMap := p.cachedStatus.EnvMap
for _, envId := range envIds {
if run, ok := envMap[envId]; !ok {
Expand Down Expand Up @@ -267,7 +275,7 @@ func (p *Plugin) Init(instanceId string) error {
}

var ctx context.Context
ctx, p.cachedStatusCancelFunc = context.WithCancel(context.Background())
ctx, p.trgClientCancel = context.WithCancel(context.Background())

trgPollingInterval := viper.GetDuration("trgPollingInterval")

Expand All @@ -283,6 +291,21 @@ func (p *Plugin) Init(instanceId string) error {
}
}
}()

// Initialize CTP status reader
p.kafkaReaderCtx, p.kafkaReaderCancel = context.WithCancel(context.Background())
p.kafkaReader = NewCtpStatusReader(string(ctpStatusTopic), "o2-aliecs-core.trg")
if p.kafkaReader != nil {
log.WithField("level", infologger.IL_Devel).Info("TRG plugin: draining CTP status backlog")
p.drainCtpStatusBacklog(2 * time.Second)

// Start reading CTP status updates
go p.readCtpStatusUpdates()
} else {
log.WithField("level", infologger.IL_Support).
Warn("could not create CTP status reader, CTP status monitoring disabled")
}

return nil
}

Expand Down Expand Up @@ -1434,7 +1457,104 @@ func (p *Plugin) parseDetectors(ctsDetectorsParam string) (detectors string, err
return
}

func (p *Plugin) drainCtpStatusBacklog(timeout time.Duration) {
drainCtx, cancel := context.WithTimeout(p.kafkaReaderCtx, timeout)
defer cancel()
for {
ctpStatus, err := p.kafkaReader.Next(drainCtx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
break
}
// transient error: small sleep and continue until timeout
time.Sleep(50 * time.Millisecond)
continue
}
if ctpStatus == nil {
continue
}

detectorsInHoldover := extractDetectorsInHoldover(ctpStatus)
clockTransitionTimeExpected := extractClockTransitionExpected(ctpStatus)
currentClock := ctpStatus.Clock.String()

p.cachedStatusMu.Lock()
p.cachedStatus.Clock = currentClock
p.cachedStatus.DetectorsInHoldover = detectorsInHoldover
p.cachedStatus.ClockTransitionTimeExpected = clockTransitionTimeExpected
p.cachedStatusMu.Unlock()
}
}

func extractClockTransitionExpected(ctpStatus *trgpb.Status) int64 {
var clockTransitionTimeExpected int64
if ctpStatus.ClockTransitionExpected > 0 {
clockTransitionTimeExpected = int64(ctpStatus.TimestampNano) + int64(ctpStatus.ClockTransitionExpected)*1e9
}
return clockTransitionTimeExpected
}

func extractDetectorsInHoldover(ctpStatus *trgpb.Status) []string {
var detectorsInHoldover []string
for _, detInfo := range ctpStatus.DetectorInfo {
if detInfo.HoldoverOngoing {
detectorsInHoldover = append(detectorsInHoldover, detInfo.Detector.String())
}
}
return detectorsInHoldover
}

func (p *Plugin) readCtpStatusUpdates() {
for {
ctpStatus, err := p.kafkaReader.Next(p.kafkaReaderCtx)
if errors.Is(err, io.EOF) {
log.WithField(infologger.Level, infologger.IL_Support).
Debug("received EOF from CTP status reader, likely reading was cancelled, exiting")
break
}
if err != nil {
log.WithField(infologger.Level, infologger.IL_Support).
WithError(err).
Error("error while reading CTP status from Kafka")
time.Sleep(time.Second * 1) // throttle to avoid spamming infologger
continue
}
if ctpStatus == nil {
continue
}

detectorsInHoldover := extractDetectorsInHoldover(ctpStatus)
clockTransitionTimeExpected := extractClockTransitionExpected(ctpStatus)
currentClock := ctpStatus.Clock.String()

p.cachedStatusMu.Lock()
previousClockTransitionTime := p.cachedStatus.ClockTransitionTimeExpected
p.cachedStatus.Clock = currentClock
p.cachedStatus.DetectorsInHoldover = detectorsInHoldover
p.cachedStatus.ClockTransitionTimeExpected = clockTransitionTimeExpected
p.cachedStatusMu.Unlock()

if ctpStatus.ClockTransitionExpected > 0 && len(detectorsInHoldover) > 0 {
detectorsStr := strings.Join(detectorsInHoldover, ", ")
log.WithField(infologger.Level, infologger.IL_Ops).
Warnf("CTP clock transition expected in less than %ds for the detectors: %s. Starting a triggered run with these detectors may cause instabilities",
ctpStatus.ClockTransitionExpected, detectorsStr)
} else if ctpStatus.ClockTransitionExpected == 0 && previousClockTransitionTime > 0 {
log.WithField(infologger.Level, infologger.IL_Ops).
Info("CTP clock transition has just been performed")
}
}
}

func (p *Plugin) Destroy() error {
p.cachedStatusCancelFunc()
p.trgClientCancel()

if p.kafkaReaderCancel != nil {
p.kafkaReaderCancel()
}
if p.kafkaReader != nil {
_ = p.kafkaReader.Close()
}

return p.trgClient.Close()
}
Loading