rest-to-kafka-to-db v1.0.0
REST-Endpoint → Kafka-Topic → Camel-K-Consumer (Log + optionaler Webhook)
Voraussetzungen
- tenant camel-k crd integrations.camel.apache.org
- tenant strimzi-kafka kafka events -n kafka
Parameter
-
tenant *Pflichtschema:
{"pattern": "^[a-z][a-z0-9-]{1,30}$", "type": "string"} -
instance *Pflichtschema:
{"pattern": "^[a-z][a-z0-9-]{1,30}$", "type": "string"} -
topic default:
ordersschema:{"pattern": "^[a-zA-Z0-9._-]{1,249}$", "type": "string"} -
webhook_url default:
https://httpbin.org/postschema:{"pattern": "^https?://.+", "type": "string"}
Komponenten (3)
Phase 2
tenant
declarative
kafka-topic
waitFor: kafka-topic-ready
Template anzeigen
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: ${TOPIC}
namespace: kafka
labels:
strimzi.io/cluster: events
${RECIPE_LABELS}
spec:
partitions: 3
replicas: 3
config:
retention.ms: 604800000
Phase 2
tenant
declarative
camel-producer
waitFor: integration-ready
Template anzeigen
apiVersion: camel.apache.org/v1
kind: Integration
metadata:
name: ${INSTANCE}-producer
namespace: ipaas-flows
labels:
${RECIPE_LABELS}
spec:
traits:
service:
enabled: true
ingress:
enabled: false
flows:
- from:
uri: platform-http:/${INSTANCE}
parameters:
httpMethodRestrict: POST
steps:
- log:
message: "${INSTANCE} INBOUND: ${body}"
- to:
uri: kafka:${TOPIC}
parameters:
brokers: events-kafka-bootstrap.kafka.svc.cluster.local:9092
- setHeader:
name: Content-Type
simple: application/json
- setBody:
constant: '{"status":"queued","topic":"${TOPIC}","instance":"${INSTANCE}"}'
Phase 2
tenant
declarative
camel-consumer
waitFor: integration-ready
Template anzeigen
apiVersion: camel.apache.org/v1
kind: Integration
metadata:
name: ${INSTANCE}-consumer
namespace: ipaas-flows
labels:
${RECIPE_LABELS}
spec:
traits:
ingress:
enabled: false
flows:
- from:
uri: kafka:${TOPIC}
parameters:
brokers: events-kafka-bootstrap.kafka.svc.cluster.local:9092
groupId: ${INSTANCE}-consumer
autoOffsetReset: earliest
steps:
- log:
message: "${INSTANCE} CONSUMED: ${body}"
- setHeader:
name: CamelHttpMethod
constant: POST
- setHeader:
name: Content-Type
constant: application/json
- to:
uri: "${WEBHOOK_URL}"
- log:
message: "${INSTANCE} WEBHOOK: status=${header.CamelHttpResponseCode}"
Deployed Instances (0)
Noch keine Instances deployed.