31 – Communication gRPC Inter-Socles
Vue d’ensemble
Le module gRPC permet aux instances Socle V4 de communiquer entre elles via streaming bidirectionnel. Il offre :
- Sessions : Gestion de sessions multi-participants avec TTL
- Streaming bidirectionnel : Communication temps reel entre Socles
- Pipeline de traitement : Transformation des messages via Janino et JDM
- Fan-out : Routage broadcast ou cible vers les participants
- Pool de connexions : Connexions persistantes vers les peers
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Socle A │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ SessionMgr │ │ Pipeline │ │ GrpcServerWorker │ │
│ │ │ │ Executor │ │ (port 9400) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
│ │ │ │ │
│ └───────────────┼──────────────────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ gRPC Services │ │
│ │ - SocleComm │ │
│ │ - SessionService │ │
│ │ - DiscoveryService │ │
│ └──────────┬──────────┘ │
└─────────────────────────┼───────────────────────────────────────┘
│ gRPC/HTTP2
▼
┌─────────────────────────────────────────────────────────────────┐
│ Socle B │
│ ┌──────────────────────┐ │
│ │ PeerConnectionPool │ │
│ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Configuration
application.yml
socle:
grpc:
# Activation du module
enabled: ${GRPC_ENABLED:false}
# Port du serveur gRPC
port: ${GRPC_PORT:9400}
# Identification du Socle
socle-id: ${SOCLE_ID:${socle.app_name}}
socle-version: ${socle.version}
# Limites serveur
max-inbound-message-size: ${GRPC_MAX_MESSAGE_SIZE:4194304} # 4MB
max-concurrent-calls-per-connection: ${GRPC_MAX_CONCURRENT_CALLS:100}
# Sessions
session:
ttl-seconds: ${GRPC_SESSION_TTL:1800} # 30 min
max-participants: ${GRPC_MAX_PARTICIPANTS:100}
persist-to-tech-db: ${GRPC_PERSIST_SESSIONS:true}
cache-in-redis: ${GRPC_CACHE_REDIS:true}
# Pipeline de traitement
pipeline:
enabled: ${GRPC_PIPELINE_ENABLED:true}
config-cache-ttl-seconds: ${GRPC_PIPELINE_CACHE_TTL:300}
# Connexions peer
peer:
max-channels-per-peer: ${GRPC_PEER_MAX_CHANNELS:4}
connection-timeout-ms: ${GRPC_PEER_CONNECT_TIMEOUT:5000}
idle-timeout-seconds: ${GRPC_PEER_IDLE_TIMEOUT:300}
keep-alive-enabled: ${GRPC_PEER_KEEPALIVE:true}
keep-alive-time-seconds: ${GRPC_PEER_KEEPALIVE_TIME:30}
keep-alive-timeout-seconds: ${GRPC_PEER_KEEPALIVE_TIMEOUT:10}
Variables d’environnement
| Variable | Default | Description |
|---|---|---|
GRPC_ENABLED |
false |
Active le module gRPC |
GRPC_PORT |
9400 |
Port du serveur gRPC |
SOCLE_ID |
${app_name} |
Identifiant unique du Socle |
GRPC_SESSION_TTL |
1800 |
TTL des sessions en secondes |
GRPC_MAX_PARTICIPANTS |
100 |
Max participants par session |
GRPC_PIPELINE_ENABLED |
true |
Active le pipeline de traitement |
Services gRPC
DiscoveryService
Service de decouverte et health check.
service DiscoveryService {
rpc GetCapabilities(CapabilitiesRequest) returns (CapabilitiesResponse);
rpc Ping(PingRequest) returns (PingResponse);
}
Test avec grpcurl :
# Ping
grpcurl -plaintext localhost:9400 socle.DiscoveryService/Ping
# Capabilities
grpcurl -plaintext localhost:9400 socle.DiscoveryService/GetCapabilities
SessionService
Gestion du cycle de vie des sessions.
service SessionService {
rpc CreateSession(CreateSessionRequest) returns (SessionInfo);
rpc JoinSession(JoinSessionRequest) returns (JoinSessionResponse);
rpc LeaveSession(LeaveSessionRequest) returns (LeaveSessionResponse);
rpc GetSession(GetSessionRequest) returns (SessionInfo);
rpc CloseSession(CloseSessionRequest) returns (CloseSessionResponse);
}
Exemples :
# Creer une session
grpcurl -plaintext -d '{
"session_type": "chat",
"owner_id": "user1",
"ttl_seconds": 3600
}' localhost:9400 socle.SessionService/CreateSession
# Joindre une session
grpcurl -plaintext -d '{
"session_id": "uuid-de-la-session",
"participant_id": "user2",
"display_name": "User 2"
}' localhost:9400 socle.SessionService/JoinSession
# Obtenir info session
grpcurl -plaintext -d '{
"session_id": "uuid-de-la-session"
}' localhost:9400 socle.SessionService/GetSession
SocleComm
Streaming bidirectionnel pour l’echange de messages.
service SocleComm {
rpc Exchange(stream SessionMessage) returns (stream SessionMessage);
}
Format des messages :
message SessionMessage {
string session_id = 1;
string sender_id = 2;
MessageKind kind = 3; // JOIN, LEAVE, DATA, REQUEST, RESPONSE, etc.
repeated string target_ids = 4; // Vide = broadcast
string correlation_id = 5;
int64 timestamp = 6;
string payload = 7; // JSON
map<string, string> headers = 8;
}
Sessions
Cycle de vie
CREATE ──► ACTIVE ──► CLOSING ──► CLOSED
│
└──► EXPIRED (TTL depasse)
Stockage
Les sessions sont stockees dans :
- Cache memoire : Acces rapide, stream observers
- Redis (si KvBus en mode redis) : Partage entre instances
- TechDB : Audit et persistance
Tables TechDB
-- Sessions (audit)
CREATE TABLE grpc_sessions (
x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
x_dateChanged TIMESTAMP WITH TIME ZONE,
x_sub VARCHAR(255),
x_partition VARCHAR(30),
x_comment CLOB,
session_id VARCHAR(36) NOT NULL UNIQUE,
session_type VARCHAR(100) NOT NULL,
owner_id VARCHAR(100) NOT NULL,
status VARCHAR(20) DEFAULT 'ACTIVE',
datas CLOB
);
-- Configuration pipeline par type de session
CREATE TABLE grpc_pipeline_configs (
x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
x_dateChanged TIMESTAMP WITH TIME ZONE,
x_sub VARCHAR(255),
x_partition VARCHAR(30),
x_comment CLOB,
session_type VARCHAR(100) NOT NULL UNIQUE,
janino_pre VARCHAR(255),
jdm_rules VARCHAR(255),
janino_post VARCHAR(255),
default_targets VARCHAR(50) DEFAULT 'broadcast',
enabled BOOLEAN DEFAULT TRUE,
datas CLOB
);
Pipeline de traitement
Le pipeline permet de transformer les messages et determiner leur routage.
Etapes
Message entrant
│
▼
┌─────────────────┐
│ 1. Janino PRE │ Transformation du payload
└────────┬────────┘
│
▼
┌─────────────────┐
│ 2. JDM Rules │ Determination des cibles (routing)
└────────┬────────┘
│
▼
┌─────────────────┐
│ 3. Janino POST │ Transformation finale
└────────┬────────┘
│
▼
Routage
/ \
Broadcast Cible
Configuration du pipeline
Inserez dans grpc_pipeline_configs :
INSERT INTO grpc_pipeline_configs
(session_type, janino_pre, jdm_rules, janino_post, default_targets, enabled)
VALUES
('chat', NULL, 'chat_routing', NULL, 'broadcast', TRUE);
Exemple script Janino PRE
// repository/scripts/java/grpc/chat_filter.java
public class ChatFilter {
public Object execute(Map<String, Object> context) {
Map<String, Object> payload = (Map<String, Object>) context.get("payload");
String senderId = (String) context.get("senderId");
// Ajouter metadata
payload.put("processed_at", System.currentTimeMillis());
payload.put("sender", senderId);
return payload;
}
}
Exemple modele JDM pour routing
{
"name": "chat_routing",
"nodes": [
{
"id": "input",
"type": "inputNode",
"content": {
"fields": [
{"field": "payload.target", "type": "string"},
{"field": "payload.type", "type": "string"}
]
}
},
{
"id": "decision",
"type": "decisionTableNode",
"content": {
"hitPolicy": "first",
"inputs": [
{"field": "payload.type"}
],
"outputs": [
{"field": "targets", "type": "string"},
{"field": "broadcast", "type": "boolean"}
],
"rules": [
{"_input": ["private"], "_output": ["${payload.target}", false]},
{"_input": ["broadcast"], "_output": ["", true]},
{"_input": ["*"], "_output": ["", true]}
]
}
}
]
}
Connexions Peer
Ajouter un peer programmatiquement
@Autowired
private PeerConnectionPool peerPool;
public void connectToPeer() {
// Ajouter et connecter
boolean connected = peerPool.addPeer("socle-b", "192.168.1.100", 9400);
if (connected) {
// Ouvrir un stream
peerPool.openStream("socle-b", message -> {
// Handler pour messages entrants
System.out.println("Received: " + message.getPayload());
});
// Envoyer un message
SessionMessage msg = SessionMessage.newBuilder()
.setSessionId("...")
.setSenderId("local-participant")
.setKind(MessageKind.DATA)
.setPayload("{\"text\":\"Hello\"}")
.build();
peerPool.sendToPeer("socle-b", msg);
}
}
Broadcast vers tous les peers
SessionMessage msg = SessionMessage.newBuilder()
.setSessionId(sessionId)
.setSenderId(myId)
.setKind(MessageKind.DATA)
.setPayload(jsonPayload)
.build();
int sent = peerPool.broadcast(msg, null); // null = inclure tous
Worker gRPC
Priorites
| Methode | Valeur | Description |
|---|---|---|
getStartPriority() |
800 | Demarre apres Janino (25) et JDM (30) |
getStopPriority() |
10 | S’arrete tot pour drain des connexions |
Statistiques
@Autowired
private GrpcServerWorker grpcWorker;
Map<String, Object> stats = grpcWorker.getStats();
// {
// "running": true,
// "port": 9400,
// "sessions": { "cached_sessions": 5, "active_streams": 12 },
// "messaging": { "messages_received": 1234, "messages_sent": 5678 },
// "peers": { "connected_peers": 2, "total_messages_sent": 100 }
// }
Types de sessions supportes
| Type | Description |
|---|---|
chat |
Communication temps reel |
sync |
Synchronisation de donnees |
broadcast |
Diffusion un-vers-plusieurs |
pipeline |
Traitement en chaine |
custom |
Type personnalise |
Securite
TLS (a venir)
Le support TLS sera ajoute dans une version future :
socle:
grpc:
tls:
enabled: true
cert-path: /path/to/server.crt
key-path: /path/to/server.key
ca-path: /path/to/ca.crt # Pour mTLS
Authentification
L’authentification peut etre implementee via :
- Metadata gRPC (tokens dans headers)
- Intercepteurs personnalises
- Integration avec le module Auth existant
Monitoring
Metriques exposees
Les metriques sont disponibles via l’endpoint Prometheus /actuator/prometheus :
grpc_sessions_active: Nombre de sessions activesgrpc_messages_received_total: Total messages recusgrpc_messages_sent_total: Total messages envoyesgrpc_peers_connected: Nombre de peers connectes
Logs
[grpc_server] Started on port 9400
[grpc_session] Session created: abc-123 (type=chat, owner=user1)
[grpc_session] Participant joined: user2 -> abc-123 (total: 2)
[grpc_comm] Message received: session=abc-123, sender=user1, kind=DATA
[grpc_pipeline] Executed for session abc-123 (targets=broadcast)
Troubleshooting
Le serveur ne demarre pas
- Verifiez que le port 9400 n’est pas utilise
- Verifiez
GRPC_ENABLED=true - Consultez les logs pour les erreurs d’initialisation
Sessions expirent trop vite
Augmentez le TTL :
socle:
grpc:
session:
ttl-seconds: 7200 # 2 heures
Messages non delivres
- Verifiez que le destinataire a un stream actif
- Verifiez les logs du pipeline pour les erreurs
- Verifiez la configuration du routing dans
grpc_pipeline_configs
Erreurs de connexion peer
- Verifiez la connectivite reseau
- Verifiez que le peer a gRPC active
- Augmentez le timeout de connexion :
socle:
grpc:
peer:
connection-timeout-ms: 10000
Structure des fichiers
src/main/java/eu/lmvi/socle/grpc/
├── GrpcConfiguration.java # Configuration Spring
├── GrpcServerWorker.java # Worker principal
├── session/
│ ├── Session.java # Record session
│ ├── Participant.java # Record participant
│ ├── SessionStatus.java # Enum ACTIVE/CLOSING/CLOSED/EXPIRED
│ ├── SessionManager.java # Gestion sessions + streams
│ └── SessionRedisRepository.java # Persistance Redis
├── pipeline/
│ ├── PipelineConfig.java # Config pipeline
│ ├── PipelineResult.java # Resultat pipeline
│ └── PipelineExecutor.java # Execution Janino + JDM
├── peer/
│ ├── PeerConnection.java # Connexion unique
│ └── PeerConnectionPool.java # Pool avec reconnexion
└── service/
├── SocleCommService.java # Streaming bidirectionnel
├── SessionServiceImpl.java # CRUD sessions
└── DiscoveryServiceImpl.java # Discovery + Ping
src/main/proto/
└── socle_comm.proto # Definition Protocol Buffers






