# pulsar-order
**Repository Path**: coderman_hero/pulsar-order
## Basic Information
- **Project Name**: pulsar-order
- **Description**: Apache Pulsar重试主题(延迟队列)自动关闭超时订单
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 2
- **Created**: 2023-07-04
- **Last Updated**: 2023-07-04
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# Apache Pulsar重试(延迟)队列处理超时订单
### 常见的解决方案
- RabbitMQ
- Redis
- Kafka TimeWheel(时间轮)
- **Apache Pulsar**
### 超时订单处理思路
Pulsar是一个经常拿来和Kafka对比的一个消息队列和流处理平台。
**Pulsar原生支持重试(延迟)队列和死信队列,实现起来非常简单。**
自动关闭超时订单的问题,一般的处理思路如下图:

### 部署Pulsar
使用docker compose部署pulsar[1]和pulsar manager[2]
```yaml
version: '3'
networks:
pulsar:
services:
# Start zookeeper
zookeeper:
image: apachepulsar/pulsar:3.0.0
container_name: zookeeper
restart: on-failure
networks:
- pulsar
volumes:
- ./data/zookeeper:/pulsar/data/zookeeper
environment:
- metadataStoreUrl=zk:zookeeper:2181
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=256m
command: >
bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \
bin/generate-zookeeper-config.sh conf/zookeeper.conf && \
exec bin/pulsar zookeeper"
healthcheck:
test: [ "CMD", "bin/pulsar-zookeeper-ruok.sh" ]
interval: 10s
timeout: 5s
retries: 30
# Init cluster metadata
pulsar-init:
container_name: pulsar-init
hostname: pulsar-init
image: apachepulsar/pulsar:3.0.0
networks:
- pulsar
command: >
bin/pulsar initialize-cluster-metadata \
--cluster cluster-a \
--zookeeper zookeeper:2181 \
--configuration-store zookeeper:2181 \
--web-service-url http://broker:8080 \
--broker-service-url pulsar://broker:6650
depends_on:
zookeeper:
condition: service_healthy
# Start bookie
bookie:
image: apachepulsar/pulsar:3.0.0
container_name: bookie
restart: on-failure
networks:
- pulsar
environment:
- clusterName=cluster-a
- zkServers=zookeeper:2181
- metadataServiceUri=metadata-store:zk:zookeeper:2181
# otherwise every time we run docker compose uo or down we fail to start due to Cookie
# See: https://github.com/apache/bookkeeper/blob/405e72acf42bb1104296447ea8840d805094c787/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java#L57-68
- advertisedAddress=bookie
- BOOKIE_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
depends_on:
zookeeper:
condition: service_healthy
pulsar-init:
condition: service_completed_successfully
# Map the local directory to the container to avoid bookie startup failure due to insufficient container disks.
volumes:
- ./data/bookkeeper:/pulsar/data/bookkeeper
command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie"
# Start broker
broker:
image: apachepulsar/pulsar:3.0.0
container_name: broker
hostname: broker
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- managedLedgerDefaultEnsembleSize=1
- managedLedgerDefaultWriteQuorum=1
- managedLedgerDefaultAckQuorum=1
- advertisedAddress=broker
- advertisedListeners=external:pulsar://127.0.0.1:6650
- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
ports:
- "6650:6650"
- "8080:8080"
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
pulsar-manager:
image: apachepulsar/pulsar-manager:v0.3.0
container_name: pulsar-manager
hostname: pulsar-manager
restart: on-failure
networks:
- pulsar
volumes:
- ./application.properties:/pulsar-manager/pulsar-manager/application.properties
environment:
- SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties
ports:
- "9527:9527"
- "7750:7750"
command: bash -c "pulsar-manager/bin/pulsar-manager --pulsar.peek.message=true"
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:7750" ]
interval: 10s
timeout: 5s
retries: 30
pulsar-manager-init:
container_name: pulsar-manager-init
hostname: pulsar-manager-init
image: apachepulsar/pulsar:3.0.0
networks:
- pulsar
command:
- bash
- -c
- |
set -ex
CSRF_TOKEN=$(curl http://pulsar-manager:7750/pulsar-manager/csrf-token)
curl \
-H 'X-XSRF-TOKEN: $${CSRF_TOKEN}' \
-H 'Cookie: XSRF-TOKEN=$${CSRF_TOKEN};' \
-H "Content-Type: application/json" \
-X PUT http://pulsar-manager:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'
depends_on:
pulsar-manager:
condition: service_healthy
```
```properties
spring.cloud.refresh.refreshable=none
server.port=7750
# configuration log
logging.path=
logging.file=pulsar-manager.log
# DEBUG print execute sql
logging.level.org.apache=INFO
mybatis.type-aliases-package=org.apache.pulsar.manager
# database connection
# SQLLite
#spring.datasource.driver-class-name=org.sqlite.JDBC
#spring.datasource.url=jdbc:sqlite:pulsar_manager.db
#spring.datasource.initialization-mode=always
#spring.datasource.schema=classpath:/META-INF/sql/sqlite-schema.sql
#spring.datasource.username=
#spring.datasource.password=
#HerdDB JDBC Driver
spring.datasource.driver-class-name=herddb.jdbc.Driver
# HerdDB - local in memory-only
#spring.datasource.url=jdbc:herddb:local
# HerdDB - start embedded server, data persisted on local disk (directory 'dbdata'), listening on localhost:7000
spring.datasource.url=jdbc:herddb:server:localhost:7000?server.start=true&server.base.dir=dbdata
# HerdDB - start embedded server 'diskless-cluster' mode, WAL and Data persisted on Bookies, Metadata on ZooKeeper in '/herd', listening on localhost:7000
#spring.datasource.url=jdbc:herddb:zookeeper:localhost:2181?server.start=true&server.base.dir=dbdata&server.mode=diskless-cluster&server.node.id=localhost
# HerdDB - connect to standalone server at localhost:7000
#spring.datasource.url=jdbc:herddb:server:localhost:7000
# HerdDB - connect to cluster, uses ZooKeeper for service discovery
#spring.datasource.url=jdbc:herddb:zookeeper:localhost:2181/herd
spring.datasource.schema=classpath:/META-INF/sql/herddb-schema.sql
spring.datasource.username=sa
spring.datasource.password=hdb
spring.datasource.initialization-mode=always
# postgresql configuration
#spring.datasource.driver-class-name=org.postgresql.Driver
#spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/pulsar_manager
#spring.datasource.username=postgres
#spring.datasource.password=postgres
# zuul config
# https://cloud.spring.io/spring-cloud-static/Dalston.SR5/multi/multi__router_and_filter_zuul.html
# By Default Zuul adds Authorization to be dropped headers list. Below we are manually setting it
zuul.sensitive-headers=Cookie,Set-Cookie
zuul.routes.admin.path=/admin/**
zuul.routes.admin.url=http://localhost:8080/admin/
zuul.routes.lookup.path=/lookup/**
zuul.routes.lookup.url=http://localhost:8080/lookup/
# pagehelper plugin
#pagehelper.helperDialect=sqlite
# force 'mysql' for HerdDB, comment out for postgresql
pagehelper.helperDialect=mysql
backend.directRequestBroker=true
backend.directRequestHost=http://localhost:8080
backend.jwt.token=
backend.broker.pulsarAdmin.authPlugin=
backend.broker.pulsarAdmin.authParams=
backend.broker.pulsarAdmin.tlsAllowInsecureConnection=false
backend.broker.pulsarAdmin.tlsTrustCertsFilePath=
backend.broker.pulsarAdmin.tlsEnableHostnameVerification=false
jwt.secret=dab1c8ba-b01b-11e9-b384-186590e06885
jwt.sessionTime=2592000
# If user.management.enable is true, the following account and password will no longer be valid.
pulsar-manager.account=pulsar
pulsar-manager.password=pulsar
# If true, the database is used for user management
user.management.enable=true
# Optional -> SECRET, PRIVATE, default -> PRIVATE, empty -> disable auth
# SECRET mode -> bin/pulsar tokens create --secret-key file:///path/to/my-secret.key --subject test-user
# PRIVATE mode -> bin/pulsar tokens create --private-key file:///path/to/my-private.key --subject test-user
# Detail information: http://pulsar.apache.org/docs/en/security-token-admin/
jwt.broker.token.mode=
jwt.broker.secret.key=file:///path/broker-secret.key
jwt.broker.public.key=file:///path/pulsar/broker-public.key
jwt.broker.private.key=file:///path/broker-private.key
# bookie
bookie.host=http://localhost:8050
bookie.enable=false
redirect.scheme=http
redirect.host=localhost
redirect.port=9527
# Stats interval
# millisecond
insert.stats.interval=30000
# millisecond
clear.stats.interval=300000
init.delay.interval=0
# cluster data reload
cluster.cache.reload.interval.ms=60000
user.access.token.expire=604800
# thymeleaf configuration for third login.
spring.thymeleaf.cache=false
spring.thymeleaf.prefix=classpath:/templates/
spring.thymeleaf.check-template-location=true
spring.thymeleaf.suffix=.html
spring.thymeleaf.encoding=UTF-8
spring.thymeleaf.servlet.content-type=text/html
spring.thymeleaf.mode=HTML5
# default environment configuration
default.environment.name=pulsar
default.environment.service_url=http://broker:8080
default.environment.bookie_url=pulsar://broker:6650
# enable tls encryption
# keytool -import -alias test-keystore -keystore ca-certs -file certs/ca.cert.pem
tls.enabled=false
tls.keystore=keystore-file-path
tls.keystore.password=keystore-password
tls.hostname.verifier=false
tls.pulsar.admin.ca-certs=ca-client-path
# support peek message, default false
pulsar.peek.message=true
# swagger configuration
swagger.enabled=true
# casdoor configuration
casdoor.endpoint = http://localhost:8000
casdoor.clientId = 6ba06c1e1a30929fdda7
casdoor.clientSecret = df92bbf913225ebbae9af7ba8d41fe19507eb079
casdoor.certificate=\
-----BEGIN CERTIFICATE-----\n\
MIIE+TCCAuGgAwIBAgIDAeJAMA0GCSqGSIb3DQEBCwUAMDYxHTAbBgNVBAoTFENh\n\
c2Rvb3IgT3JnYW5pemF0aW9uMRUwEwYDVQQDEwxDYXNkb29yIENlcnQwHhcNMjEx\n\
MDE1MDgxMTUyWhcNNDExMDE1MDgxMTUyWjA2MR0wGwYDVQQKExRDYXNkb29yIE9y\n\
Z2FuaXphdGlvbjEVMBMGA1UEAxMMQ2FzZG9vciBDZXJ0MIICIjANBgkqhkiG9w0B\n\
AQEFAAOCAg8AMIICCgKCAgEAsInpb5E1/ym0f1RfSDSSE8IR7y+lw+RJjI74e5ej\n\
rq4b8zMYk7HeHCyZr/hmNEwEVXnhXu1P0mBeQ5ypp/QGo8vgEmjAETNmzkI1NjOQ\n\
CjCYwUrasO/f/MnI1C0j13vx6mV1kHZjSrKsMhYY1vaxTEP3+VB8Hjg3MHFWrb07\n\
uvFMCJe5W8+0rKErZCKTR8+9VB3janeBz//zQePFVh79bFZate/hLirPK0Go9P1g\n\
OvwIoC1A3sarHTP4Qm/LQRt0rHqZFybdySpyWAQvhNaDFE7mTstRSBb/wUjNCUBD\n\
PTSLVjC04WllSf6Nkfx0Z7KvmbPstSj+btvcqsvRAGtvdsB9h62Kptjs1Yn7GAuo\n\
I3qt/4zoKbiURYxkQJXIvwCQsEftUuk5ew5zuPSlDRLoLByQTLbx0JqLAFNfW3g/\n\
pzSDjgd/60d6HTmvbZni4SmjdyFhXCDb1Kn7N+xTojnfaNkwep2REV+RMc0fx4Gu\n\
hRsnLsmkmUDeyIZ9aBL9oj11YEQfM2JZEq+RVtUx+wB4y8K/tD1bcY+IfnG5rBpw\n\
IDpS262boq4SRSvb3Z7bB0w4ZxvOfJ/1VLoRftjPbLIf0bhfr/AeZMHpIKOXvfz4\n\
yE+hqzi68wdF0VR9xYc/RbSAf7323OsjYnjjEgInUtRohnRgCpjIk/Mt2Kt84Kb0\n\
wn8CAwEAAaMQMA4wDAYDVR0TAQH/BAIwADANBgkqhkiG9w0BAQsFAAOCAgEAn2lf\n\
DKkLX+F1vKRO/5gJ+Plr8P5NKuQkmwH97b8CS2gS1phDyNgIc4/LSdzuf4Awe6ve\n\
C06lVdWSIis8UPUPdjmT2uMPSNjwLxG3QsrimMURNwFlLTfRem/heJe0Zgur9J1M\n\
8haawdSdJjH2RgmFoDeE2r8NVRfhbR8KnCO1ddTJKuS1N0/irHz21W4jt4rxzCvl\n\
2nR42Fybap3O/g2JXMhNNROwZmNjgpsF7XVENCSuFO1jTywLaqjuXCg54IL7XVLG\n\
omKNNNcc8h1FCeKj/nnbGMhodnFWKDTsJcbNmcOPNHo6ixzqMy/Hqc+mWYv7maAG\n\
Jtevs3qgMZ8F9Qzr3HpUc6R3ZYYWDY/xxPisuKftOPZgtH979XC4mdf0WPnOBLqL\n\
2DJ1zaBmjiGJolvb7XNVKcUfDXYw85ZTZQ5b9clI4e+6bmyWqQItlwt+Ati/uFEV\n\
XzCj70B4lALX6xau1kLEpV9O1GERizYRz5P9NJNA7KoO5AVMp9w0DQTkt+LbXnZE\n\
HHnWKy8xHQKZF9sR7YBPGLs/Ac6tviv5Ua15OgJ/8dLRZ/veyFfGo2yZsI+hKVU5\n\
nCCJHBcAyFnm1hdvdwEdH33jDBjNB6ciotJZrf/3VYaIWSalADosHAgMWfXuWP+h\n\
8XKXmzlxuHbTMQYtZPDgspS5aK+S4Q9wb8RRAYo=\n\
-----END CERTIFICATE-----\n\
casdoor.organizationName = pulsar
casdoor.applicationName = app-pulsar
```
### 生产者
在pom.xml文件中添加依赖[3],然后创建生产者发送消息[4]。
```java
package onehour.pulsar.examples;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
public class OrderProducer {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer producer = client.newProducer(Schema.STRING)
.topic("my-order")
.create();
for (int i = 10000; i < 10020; i++) {
System.out.println("send order id :" + i);
producer.send(String.valueOf(i));
}
producer.close();
client.close();
}
}
```
### 重试(延迟)队列和死信队列

消费者会自动订阅原始主题关联的重试队列(延迟队列),消费者通过调用`reconsumeLater`方法将消息放入重试队列[5]中,并设置多长时间后重试,超过最大充数次数,未成功消费的消息会被写入死信队列[6]。
```java
Consumer consumer = client.newConsumer(Schema.STRING)
.topic("my-order")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true) // 开启重试(延时)队列
.deadLetterPolicy(DeadLetterPolicy.builder()
.retryLetterTopic("retry-order") // 重试队列名称
.maxRedeliverCount(1) // 超过最大重试次数,将消息放入死信队列
.deadLetterTopic("dead-order") //死信队列名称
.initialSubscriptionName("dead-order-subscription")//订阅超时订单
.build())
.subscribe();
```
设置重试间隔时间(延迟时间),这里为了演示,设置为60秒。
```java
consumer.reconsumeLater(msg, 60, TimeUnit.SECONDS);
```
### 消费者
```java
package onehour.pulsar.examples;
import org.apache.pulsar.client.api.*;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class OrderConsumer {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer consumer = client.newConsumer(Schema.STRING)
.topic("my-order")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true) // 开启重试(延时)队列
.deadLetterPolicy(DeadLetterPolicy.builder()
.retryLetterTopic("retry-order") // 重试队列名称
.maxRedeliverCount(2) // 超过最大重试次数,将消息放入死信队列
.deadLetterTopic("dead-order") //死信队列名称
.initialSubscriptionName("dead-order-subscription")//处理超时订单
.build())
.subscribe();
while (true) {
Message msg = consumer.receive();
String orderId = (String) msg.getValue();
boolean isPaid = checkOrderStatus(orderId);
if (isPaid) {
System.out.println("order " + orderId + " is paid");
// 订单已支付,发送确认通知, 消息被成功消费
consumer.acknowledge(msg);
} else {
// 订单未支付,移入重试队列
System.out.println("order " + orderId + " is not paid");
consumer.reconsumeLater(msg, 60, TimeUnit.SECONDS);
// 消费者会自动订阅重试队列,60s后再次检查重试队列里的订单,若仍未支付,则消息被自动移入死信队列
}
}
}
private static boolean checkOrderStatus(String orderId) {
// read from database
return new Random().nextBoolean();
}
}
```
### 管理界面
浏览器打开 [http://localhost:9527](http://localhost:9527), 查看主题中的消息
用户名:`admin`, 密码:`apachepulsar`

### 自动关闭订单
```java
package onehour.pulsar.examples;
import org.apache.pulsar.client.api.*;
public class OrderCloseConsumer {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer consumer = client.newConsumer(Schema.STRING)
.topic("dead-order")
.subscriptionName("dead-order-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
while (true) {
Message msg = consumer.receive();
String orderId = (String) msg.getValue();
System.out.println("close order " + orderId);
}
}
}
```
参考文档:
[1] [https://pulsar.apache.org/docs/3.0.x/getting-started-docker-compose/](https://pulsar.apache.org/docs/3.0.x/getting-started-docker-compose/)
[2] [https://pulsar.apache.org/docs/3.0.x/administration-pulsar-manager/#configuration](https://pulsar.apache.org/docs/3.0.x/administration-pulsar-manager/#configuration)
[3] [https://pulsar.apache.org/docs/3.0.x/client-libraries-java-setup/](https://pulsar.apache.org/docs/3.0.x/client-libraries-java-setup/)
[4] [https://pulsar.apache.org/docs/3.0.x/client-libraries-java-use/](https://pulsar.apache.org/docs/3.0.x/client-libraries-java-use/)
[5] [https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#retry-letter-topic](https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#retry-letter-topic)
[6] [https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#dead-letter-topic](https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#dead-letter-topic)