Socle V004 – gRPC Inter-Socles

Socle V004 - gRPC Inter-Socles

31 – Communication gRPC Inter-Socles

Vue d’ensemble

Le module gRPC permet aux instances Socle V4 de communiquer entre elles via streaming bidirectionnel. Il offre :

  • Sessions : Gestion de sessions multi-participants avec TTL
  • Streaming bidirectionnel : Communication temps reel entre Socles
  • Pipeline de traitement : Transformation des messages via Janino et JDM
  • Fan-out : Routage broadcast ou cible vers les participants
  • Pool de connexions : Connexions persistantes vers les peers

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                         Socle A                                  │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
│  │ SessionMgr  │  │  Pipeline   │  │    GrpcServerWorker     │  │
│  │             │  │  Executor   │  │    (port 9400)          │  │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘  │
│         │               │                      │                 │
│         └───────────────┼──────────────────────┘                 │
│                         │                                        │
│              ┌──────────┴──────────┐                            │
│              │   gRPC Services     │                            │
│              │  - SocleComm        │                            │
│              │  - SessionService   │                            │
│              │  - DiscoveryService │                            │
│              └──────────┬──────────┘                            │
└─────────────────────────┼───────────────────────────────────────┘
                          │ gRPC/HTTP2
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│                         Socle B                                  │
│              ┌──────────────────────┐                           │
│              │  PeerConnectionPool  │                           │
│              └──────────────────────┘                           │
└─────────────────────────────────────────────────────────────────┘

Configuration

application.yml

socle:
  grpc:
    # Activation du module
    enabled: ${GRPC_ENABLED:false}

    # Port du serveur gRPC
    port: ${GRPC_PORT:9400}

    # Identification du Socle
    socle-id: ${SOCLE_ID:${socle.app_name}}
    socle-version: ${socle.version}

    # Limites serveur
    max-inbound-message-size: ${GRPC_MAX_MESSAGE_SIZE:4194304}  # 4MB
    max-concurrent-calls-per-connection: ${GRPC_MAX_CONCURRENT_CALLS:100}

    # Sessions
    session:
      ttl-seconds: ${GRPC_SESSION_TTL:1800}           # 30 min
      max-participants: ${GRPC_MAX_PARTICIPANTS:100}
      persist-to-tech-db: ${GRPC_PERSIST_SESSIONS:true}
      cache-in-redis: ${GRPC_CACHE_REDIS:true}

    # Pipeline de traitement
    pipeline:
      enabled: ${GRPC_PIPELINE_ENABLED:true}
      config-cache-ttl-seconds: ${GRPC_PIPELINE_CACHE_TTL:300}

    # Connexions peer
    peer:
      max-channels-per-peer: ${GRPC_PEER_MAX_CHANNELS:4}
      connection-timeout-ms: ${GRPC_PEER_CONNECT_TIMEOUT:5000}
      idle-timeout-seconds: ${GRPC_PEER_IDLE_TIMEOUT:300}
      keep-alive-enabled: ${GRPC_PEER_KEEPALIVE:true}
      keep-alive-time-seconds: ${GRPC_PEER_KEEPALIVE_TIME:30}
      keep-alive-timeout-seconds: ${GRPC_PEER_KEEPALIVE_TIMEOUT:10}

Variables d’environnement

Variable Default Description
GRPC_ENABLED false Active le module gRPC
GRPC_PORT 9400 Port du serveur gRPC
SOCLE_ID ${app_name} Identifiant unique du Socle
GRPC_SESSION_TTL 1800 TTL des sessions en secondes
GRPC_MAX_PARTICIPANTS 100 Max participants par session
GRPC_PIPELINE_ENABLED true Active le pipeline de traitement

Services gRPC

DiscoveryService

Service de decouverte et health check.

service DiscoveryService {
    rpc GetCapabilities(CapabilitiesRequest) returns (CapabilitiesResponse);
    rpc Ping(PingRequest) returns (PingResponse);
}

Test avec grpcurl :

# Ping
grpcurl -plaintext localhost:9400 socle.DiscoveryService/Ping

# Capabilities
grpcurl -plaintext localhost:9400 socle.DiscoveryService/GetCapabilities

SessionService

Gestion du cycle de vie des sessions.

service SessionService {
    rpc CreateSession(CreateSessionRequest) returns (SessionInfo);
    rpc JoinSession(JoinSessionRequest) returns (JoinSessionResponse);
    rpc LeaveSession(LeaveSessionRequest) returns (LeaveSessionResponse);
    rpc GetSession(GetSessionRequest) returns (SessionInfo);
    rpc CloseSession(CloseSessionRequest) returns (CloseSessionResponse);
}

Exemples :

# Creer une session
grpcurl -plaintext -d '{
  "session_type": "chat",
  "owner_id": "user1",
  "ttl_seconds": 3600
}' localhost:9400 socle.SessionService/CreateSession

# Joindre une session
grpcurl -plaintext -d '{
  "session_id": "uuid-de-la-session",
  "participant_id": "user2",
  "display_name": "User 2"
}' localhost:9400 socle.SessionService/JoinSession

# Obtenir info session
grpcurl -plaintext -d '{
  "session_id": "uuid-de-la-session"
}' localhost:9400 socle.SessionService/GetSession

SocleComm

Streaming bidirectionnel pour l’echange de messages.

service SocleComm {
    rpc Exchange(stream SessionMessage) returns (stream SessionMessage);
}

Format des messages :

message SessionMessage {
    string session_id = 1;
    string sender_id = 2;
    MessageKind kind = 3;        // JOIN, LEAVE, DATA, REQUEST, RESPONSE, etc.
    repeated string target_ids = 4;  // Vide = broadcast
    string correlation_id = 5;
    int64 timestamp = 6;
    string payload = 7;          // JSON
    map<string, string> headers = 8;
}

Sessions

Cycle de vie

CREATE ──► ACTIVE ──► CLOSING ──► CLOSED
              │
              └──► EXPIRED (TTL depasse)

Stockage

Les sessions sont stockees dans :

  1. Cache memoire : Acces rapide, stream observers
  2. Redis (si KvBus en mode redis) : Partage entre instances
  3. TechDB : Audit et persistance

Tables TechDB

-- Sessions (audit)
CREATE TABLE grpc_sessions (
    x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
    x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
    x_dateChanged TIMESTAMP WITH TIME ZONE,
    x_sub VARCHAR(255),
    x_partition VARCHAR(30),
    x_comment CLOB,
    session_id VARCHAR(36) NOT NULL UNIQUE,
    session_type VARCHAR(100) NOT NULL,
    owner_id VARCHAR(100) NOT NULL,
    status VARCHAR(20) DEFAULT 'ACTIVE',
    datas CLOB
);

-- Configuration pipeline par type de session
CREATE TABLE grpc_pipeline_configs (
    x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
    x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
    x_dateChanged TIMESTAMP WITH TIME ZONE,
    x_sub VARCHAR(255),
    x_partition VARCHAR(30),
    x_comment CLOB,
    session_type VARCHAR(100) NOT NULL UNIQUE,
    janino_pre VARCHAR(255),
    jdm_rules VARCHAR(255),
    janino_post VARCHAR(255),
    default_targets VARCHAR(50) DEFAULT 'broadcast',
    enabled BOOLEAN DEFAULT TRUE,
    datas CLOB
);

Pipeline de traitement

Le pipeline permet de transformer les messages et determiner leur routage.

Etapes

Message entrant
      │
      ▼
┌─────────────────┐
│  1. Janino PRE  │  Transformation du payload
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  2. JDM Rules   │  Determination des cibles (routing)
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  3. Janino POST │  Transformation finale
└────────┬────────┘
         │
         ▼
    Routage
   /        \
Broadcast   Cible

Configuration du pipeline

Inserez dans grpc_pipeline_configs :

INSERT INTO grpc_pipeline_configs
(session_type, janino_pre, jdm_rules, janino_post, default_targets, enabled)
VALUES
('chat', NULL, 'chat_routing', NULL, 'broadcast', TRUE);

Exemple script Janino PRE

// repository/scripts/java/grpc/chat_filter.java
public class ChatFilter {
    public Object execute(Map<String, Object> context) {
        Map<String, Object> payload = (Map<String, Object>) context.get("payload");
        String senderId = (String) context.get("senderId");

        // Ajouter metadata
        payload.put("processed_at", System.currentTimeMillis());
        payload.put("sender", senderId);

        return payload;
    }
}

Exemple modele JDM pour routing

{
  "name": "chat_routing",
  "nodes": [
    {
      "id": "input",
      "type": "inputNode",
      "content": {
        "fields": [
          {"field": "payload.target", "type": "string"},
          {"field": "payload.type", "type": "string"}
        ]
      }
    },
    {
      "id": "decision",
      "type": "decisionTableNode",
      "content": {
        "hitPolicy": "first",
        "inputs": [
          {"field": "payload.type"}
        ],
        "outputs": [
          {"field": "targets", "type": "string"},
          {"field": "broadcast", "type": "boolean"}
        ],
        "rules": [
          {"_input": ["private"], "_output": ["${payload.target}", false]},
          {"_input": ["broadcast"], "_output": ["", true]},
          {"_input": ["*"], "_output": ["", true]}
        ]
      }
    }
  ]
}

Connexions Peer

Ajouter un peer programmatiquement

@Autowired
private PeerConnectionPool peerPool;

public void connectToPeer() {
    // Ajouter et connecter
    boolean connected = peerPool.addPeer("socle-b", "192.168.1.100", 9400);

    if (connected) {
        // Ouvrir un stream
        peerPool.openStream("socle-b", message -> {
            // Handler pour messages entrants
            System.out.println("Received: " + message.getPayload());
        });

        // Envoyer un message
        SessionMessage msg = SessionMessage.newBuilder()
            .setSessionId("...")
            .setSenderId("local-participant")
            .setKind(MessageKind.DATA)
            .setPayload("{\"text\":\"Hello\"}")
            .build();

        peerPool.sendToPeer("socle-b", msg);
    }
}

Broadcast vers tous les peers

SessionMessage msg = SessionMessage.newBuilder()
    .setSessionId(sessionId)
    .setSenderId(myId)
    .setKind(MessageKind.DATA)
    .setPayload(jsonPayload)
    .build();

int sent = peerPool.broadcast(msg, null);  // null = inclure tous

Worker gRPC

Priorites

Methode Valeur Description
getStartPriority() 800 Demarre apres Janino (25) et JDM (30)
getStopPriority() 10 S’arrete tot pour drain des connexions

Statistiques

@Autowired
private GrpcServerWorker grpcWorker;

Map<String, Object> stats = grpcWorker.getStats();
// {
//   "running": true,
//   "port": 9400,
//   "sessions": { "cached_sessions": 5, "active_streams": 12 },
//   "messaging": { "messages_received": 1234, "messages_sent": 5678 },
//   "peers": { "connected_peers": 2, "total_messages_sent": 100 }
// }

Types de sessions supportes

Type Description
chat Communication temps reel
sync Synchronisation de donnees
broadcast Diffusion un-vers-plusieurs
pipeline Traitement en chaine
custom Type personnalise

Securite

TLS (a venir)

Le support TLS sera ajoute dans une version future :

socle:
  grpc:
    tls:
      enabled: true
      cert-path: /path/to/server.crt
      key-path: /path/to/server.key
      ca-path: /path/to/ca.crt  # Pour mTLS

Authentification

L’authentification peut etre implementee via :

  • Metadata gRPC (tokens dans headers)
  • Intercepteurs personnalises
  • Integration avec le module Auth existant

Monitoring

Metriques exposees

Les metriques sont disponibles via l’endpoint Prometheus /actuator/prometheus :

  • grpc_sessions_active : Nombre de sessions actives
  • grpc_messages_received_total : Total messages recus
  • grpc_messages_sent_total : Total messages envoyes
  • grpc_peers_connected : Nombre de peers connectes

Logs

[grpc_server] Started on port 9400
[grpc_session] Session created: abc-123 (type=chat, owner=user1)
[grpc_session] Participant joined: user2 -> abc-123 (total: 2)
[grpc_comm] Message received: session=abc-123, sender=user1, kind=DATA
[grpc_pipeline] Executed for session abc-123 (targets=broadcast)

Troubleshooting

Le serveur ne demarre pas

  1. Verifiez que le port 9400 n’est pas utilise
  2. Verifiez GRPC_ENABLED=true
  3. Consultez les logs pour les erreurs d’initialisation

Sessions expirent trop vite

Augmentez le TTL :

socle:
  grpc:
    session:
      ttl-seconds: 7200  # 2 heures

Messages non delivres

  1. Verifiez que le destinataire a un stream actif
  2. Verifiez les logs du pipeline pour les erreurs
  3. Verifiez la configuration du routing dans grpc_pipeline_configs

Erreurs de connexion peer

  1. Verifiez la connectivite reseau
  2. Verifiez que le peer a gRPC active
  3. Augmentez le timeout de connexion :
socle:
  grpc:
    peer:
      connection-timeout-ms: 10000

Structure des fichiers

src/main/java/eu/lmvi/socle/grpc/
├── GrpcConfiguration.java        # Configuration Spring
├── GrpcServerWorker.java         # Worker principal
├── session/
│   ├── Session.java              # Record session
│   ├── Participant.java          # Record participant
│   ├── SessionStatus.java        # Enum ACTIVE/CLOSING/CLOSED/EXPIRED
│   ├── SessionManager.java       # Gestion sessions + streams
│   └── SessionRedisRepository.java  # Persistance Redis
├── pipeline/
│   ├── PipelineConfig.java       # Config pipeline
│   ├── PipelineResult.java       # Resultat pipeline
│   └── PipelineExecutor.java     # Execution Janino + JDM
├── peer/
│   ├── PeerConnection.java       # Connexion unique
│   └── PeerConnectionPool.java   # Pool avec reconnexion
└── service/
    ├── SocleCommService.java     # Streaming bidirectionnel
    ├── SessionServiceImpl.java   # CRUD sessions
    └── DiscoveryServiceImpl.java # Discovery + Ping

src/main/proto/
└── socle_comm.proto              # Definition Protocol Buffers

Commentaires

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *