Event Routing

AgeDigitalTwins supports real-time event routing to various external systems when digital twins, relationships, or models are created, updated, or deleted. This enables you to build reactive systems, data pipelines, and analytics solutions.

AgeDigitalTwins supports real-time event routing to various external systems when digital twins, relationships, or models are created, updated, or deleted. This enables you to build reactive systems, data pipelines, and analytics solutions.

Overview

Event routing in AgeDigitalTwins works similarly to Azure Digital Twins, with the following key features:

  • Real-time streaming: Events are captured and routed in near real-time using PostgreSQL logical replication
  • CloudEvents format: All events conform to the CloudEvents specification
  • Multiple sinks: Route events to Kafka, Azure Data Explorer (Kusto), MQTT, and more
  • Event filtering: Configure which events are routed to which sinks
  • Two event types: Event Notifications and Data History events

Supported Event Sinks

Kafka / Azure Event Hubs

Stream events to Apache Kafka or Azure Event Hubs for real-time processing and integration with downstream systems.

Azure Data Explorer (Kusto)

Send events directly to Azure Data Explorer for analytics and time-series analysis, bypassing the need for intermediate Event Hubs.

MQTT

Route events to MQTT brokers for lightweight messaging and IoT scenarios. Uses structured CloudEvents format over MQTT.

Configuration

Configure event routing in your application settings. Here's the structure based on your example:

events:
  config:
    eventSinks:
      kafka:
        - name: KafkaSink
          brokerList: your-eventhub-namespace.servicebus.windows.net
          topic: your-eventhub-name
          saslMechanism: OAUTHBEARER
      kusto:
        - name: AdxSink
          ingestionUri: https://ingest-your-cluster.region.kusto.windows.net
          database: your-database
      mqtt:
        - name: MqttSink
          broker: your-mqtt-broker-host
          port: 1883
          topic: digitaltwins/events
          clientId: agedt-client
          username: your-username
          password: your-password

Federated Authentication for Event Sinks (Kusto & Event Hubs)

AgeDigitalTwins uses DefaultAzureCredential for authenticating to Azure services like Kusto (ADX) and Event Hubs (via Kafka). For secure, automated integration, we recommend using Azure AD Workload Identity Federation (OIDC) with a service principal.

1. Create a Service Principal in Your Azure Tenant

You (the customer) should create a service principal (app registration) in your own Azure AD tenant:

az ad app create --display-name "AgeDigitalTwins Event Sink"
az ad sp create --id <appId>

Or via the portal:

  • Go to Azure Active Directory → App registrations → New registration

2. Federate the Service Principal with AgeDigitalTwins

We will provide you with the OIDC issuer URL for our cluster (e.g., https://<your-agedt-cluster>/oidc).

Add a federated credential to your app registration:

  • Go to your app registration in Azure Portal
  • Select Certificates & secrets → Federated credentials → Add credential
  • Set the Issuer to the OIDC issuer URL we provide
  • Set the Subject to the workload identity you want to allow (e.g., system:serviceaccount:<namespace>:<serviceaccount>)
  • Save

3. Assign Roles to the Service Principal

  • For Kusto: Assign the service principal the Contributor or Ingestor role on your Kusto cluster/database.
  • For Event Hubs: Assign the service principal the Azure Event Hubs Data Sender role on the Event Hub namespace.

Example (Kusto):

az kusto database-principal-assignment create \
  --cluster-name <cluster> \
  --database-name <db> \
  --principal-id <servicePrincipalObjectId> \
  --role Ingestor \
  --principal-type App

Example (Event Hubs):

az role assignment create \
  --assignee <servicePrincipalObjectId> \
  --role "Azure Event Hubs Data Sender" \
  --scope /subscriptions/<sub>/resourceGroups/<rg>/providers/Microsoft.EventHub/namespaces/<namespace>

4. Configure AgeDigitalTwins

No secrets or credentials need to be stored in AgeDigitalTwins. The platform will use the federated identity to obtain tokens via DefaultAzureCredential.

Example event sink config:

eventSinks:
  kusto:
    - name: AdxSink
      ingestionUri: https://ingest-your-cluster.region.kusto.windows.net
      database: your-database
  kafka:
    - name: KafkaSink
      brokerList: your-eventhub-namespace.servicebus.windows.net
      topic: your-eventhub-name
      saslMechanism: OAUTHBEARER

Event Routes

You can define event routes to control which events go to which sinks. Example:

eventRoutes:
  - eventType: TwinCreate
    sink: AdxSink
  - eventType: TwinUpdate
    sink: KafkaSink

Migration Note

If you previously used managed identities or connection strings in Azure Digital Twins, you now use federated credentials for secure, passwordless authentication.


Event Types

Event Notifications

Event notifications are fired whenever a digital twin, relationship, or model is created, updated, or deleted. These events contain the current state of the entity.

Twin Events

Twin Create:

{
  "specversion": "1.0",
  "type": "Konnektr.DigitalTwins.Twin.Create",
  "source": "https://your-adt-instance/",
  "id": "12345678-1234-1234-1234-123456789012",
  "time": "2023-01-01T12:00:00Z",
  "datacontenttype": "application/json",
  "subject": "myTwinId",
  "data": {
    "$dtId": "myTwinId",
    "$metadata": {
      "$model": "dtmi:example:Room;1"
    },
    "Temperature": 21.5,
    "Humidity": 45
  }
}

Twin Update:

{
  "specversion": "1.0",
  "type": "Konnektr.DigitalTwins.Twin.Update",
  "source": "https://your-adt-instance/",
  "id": "12345678-1234-1234-1234-123456789012",
  "time": "2023-01-01T12:00:00Z",
  "datacontenttype": "application/json",
  "subject": "myTwinId",
  "data": {
    "modelId": "dtmi:example:Room;1",
    "patch": [
      {
        "op": "replace",
        "path": "/Temperature",
        "value": 22.0
      }
    ]
  }
}

Twin Delete:

{
  "specversion": "1.0",
  "type": "Konnektr.DigitalTwins.Twin.Delete",
  "source": "https://your-adt-instance/",
  "id": "12345678-1234-1234-1234-123456789012",
  "time": "2023-01-01T12:00:00Z",
  "datacontenttype": "application/json",
  "subject": "myTwinId",
  "data": {
    "$dtId": "myTwinId",
    "$metadata": {
      "$model": "dtmi:example:Room;1"
    },
    "Temperature": 21.5,
    "Humidity": 45
  }
}

Relationship Events

Relationship Create:

{
  "specversion": "1.0",
  "type": "Konnektr.DigitalTwins.Relationship.Create",
  "source": "https://your-adt-instance/",
  "id": "12345678-1234-1234-1234-123456789012",
  "time": "2023-01-01T12:00:00Z",
  "datacontenttype": "application/json",
  "subject": "myTwinId/relationships/myRelationshipId",
  "data": {
    "$relationshipId": "myRelationshipId",
    "$sourceId": "myTwinId",
    "$targetId": "myTargetTwinId",
    "$relationshipName": "contains",
    "relationshipProperty": "value"
  }
}

Relationship Update:

{
  "specversion": "1.0",
  "type": "Konnektr.DigitalTwins.Relationship.Update",
  "source": "https://your-adt-instance/",
  "id": "12345678-1234-1234-1234-123456789012",
  "time": "2023-01-01T12:00:00Z",
  "datacontenttype": "application/json",
  "subject": "myTwinId/relationships/myRelationshipId",
  "data": {
    "modelId": "dtmi:example:Relationship;1",
    "patch": [
      {
        "op": "replace",
        "path": "/relationshipProperty",
        "value": "newValue"
      }
    ]
  }
}

Data History Events

Data history events provide detailed property-level change tracking for analytics and auditing purposes.

Property Events

{
  "specversion": "1.0",
  "type": "Konnektr.DigitalTwins.Property.Event",
  "source": "https://your-adt-instance/",
  "id": "12345678-1234-1234-1234-123456789012",
  "time": "2023-01-01T12:00:00Z",
  "datacontenttype": "application/json",
  "subject": "myTwinId",
  "data": {
    "twinId": "myTwinId",
    "propertyName": "Temperature",
    "propertyValue": 22.0,
    "propertyType": "double",
    "operation": "Replace",
    "timeStamp": "2023-01-01T12:00:00Z",
    "serviceId": "https://your-adt-instance/",
    "modelId": "dtmi:example:Room;1"
  }
}

Lifecycle Events

{
  "specversion": "1.0",
  "type": "Konnektr.DigitalTwins.Twin.Lifecycle",
  "source": "https://your-adt-instance/",
  "id": "12345678-1234-1234-1234-123456789012",
  "time": "2023-01-01T12:00:00Z",
  "datacontenttype": "application/json",
  "subject": "myTwinId",
  "data": {
    "twinId": "myTwinId",
    "action": "Create",
    "timeStamp": "2023-01-01T12:00:00Z",
    "serviceId": "https://your-adt-instance/",
    "modelId": "dtmi:example:Room;1"
  }
}

CloudEvents vs Azure Digital Twins Format

AgeDigitalTwins uses the official CloudEvents specification for event formatting. This differs from Azure Digital Twins Event Hub events, particularly for Kafka/Event Hubs integration:

Azure Digital TwinsAgeDigitalTwins CloudEventsDescription
cloudEvents:subjectce_subjectKafka header format only
cloudEvents:typece_typeKafka header format only
cloudEvents:sourcece_sourceKafka header format only
Custom propertiesStandard CloudEvents envelopeConsistent event structure
Event Hub specificCloudEvents Kafka bindingUses official CloudEvents library

Key Differences for Kafka/Event Hubs:

  • CloudEvents properties are prefixed with ce_ in Kafka headers (e.g., ce_subject, ce_type, ce_source)
  • Uses binary content mode for Kafka (more efficient than structured)
  • Follows CloudEvents Kafka binding specification
  • Other sinks (Kusto, MQTT) use standard CloudEvents format without ce_ prefixes

Why the ce_ prefix? The CloudEvents Kafka binding uses the ce_ prefix for binary mode to distinguish CloudEvents attributes from regular Kafka headers, as defined in the CloudEvents Kafka Protocol Binding.

Event Routing Configuration

Kafka Configuration

kafka:
  - name: KafkaSink
    brokerList: your-kafka-broker:9092  # Port 9093 will be auto-appended if not specified
    topic: digitaltwins-events
    saslMechanism: PLAIN  # or OAUTHBEARER for Azure Event Hubs
    saslUsername: your-username  # Required for PLAIN authentication
    saslPassword: your-password  # Required for PLAIN authentication

Notes:

  • For Azure Event Hubs, use OAUTHBEARER with Azure credentials
  • Port 9093 is automatically appended if not specified in brokerList
  • SASL/SSL is automatically configured for secure communication

Kusto Configuration

kusto:
  - name: AdxSink
    ingestionUri: https://ingest-your-cluster.region.kusto.windows.net
    database: your-database
    propertyEventsTable: AdtPropertyEvents  # Default table name
    twinLifeCycleEventsTable: AdtTwinLifeCycleEvents  # Default table name
    relationshipLifeCycleEventsTable: AdtRelationshipLifeCycleEvents  # Default table name

Notes:

  • Uses queued ingestion for optimal performance
  • Automatically creates JSON ingestion mappings for each event type
  • Table names are optional and will use defaults if not specified

MQTT Configuration

mqtt:
  - name: MqttSink
    broker: your-mqtt-broker-host
    port: 1883  # or 8883 for TLS
    topic: digitaltwins/events
    clientId: agedt-client-001
    username: your-username
    password: your-password
    protocolVersion: "5.0.0"  # Supports 3.1.0, 3.1.1, or 5.0.0

Notes:

  • Uses structured CloudEvents format for MQTT messages
  • Supports MQTT v3.1.0, v3.1.1, and v5.0.0
  • Automatic reconnection on connection loss

Performance and Reliability

  • Batching: Events are processed in configurable batches (default 50 events) for optimal performance
  • PostgreSQL Logical Replication: Uses PostgreSQL's built-in logical replication for real-time event capture
  • Replication Slots: Managed replication slots ensure no event loss during restarts or failovers
  • Connection Management: Enhanced timeout settings and automatic reconnection for high-load scenarios
  • Health Monitoring: Built-in health checks monitor replication connection status via IsHealthy property
  • Graceful Degradation: Service continues operating even if some sinks are unavailable
  • Error Handling: Individual event failures don't stop batch processing
  • TCP Keep-Alive: Configured for reliable long-running connections (30-second intervals)

Event Processing Architecture

The event routing system consists of two main components:

  1. Replication Producer: Captures changes from PostgreSQL using logical replication and queues events
  2. Event Consumer: Processes queued events in batches and routes them to configured sinks

Events flow through the following stages:

  1. Database changes trigger logical replication messages
  2. Changes are converted to EventData objects and queued
  3. Consumer processes events in batches and converts to CloudEvents
  4. CloudEvents are routed to configured sinks based on event routes
  5. Each sink handles delivery with appropriate retry logic

See Also