Skip to content
  • About
    • What is Symfony?
    • Community
    • News
    • Contributing
    • Support
  • Documentation
    • Symfony Docs
    • Symfony Book
    • Screencasts
    • Symfony Bundles
    • Symfony Cloud
    • Training
  • Services
    • Platform.sh for Symfony Best platform to deploy Symfony apps
    • SymfonyInsight Automatic quality checks for your apps
    • Symfony Certification Prove your knowledge and boost your career
    • SensioLabs Professional services to help you with Symfony
    • Blackfire Profile and monitor performance of your apps
  • Other
  • Blog
  • Download
sponsored by
  1. Home
  2. Documentation
  3. Messenger
  4. How to Create Your own Messenger Transport

How to Create Your own Messenger Transport

Edit this page

Once you have written your transport's sender and receiver, you can register your transport factory to be able to use it via a DSN in the Symfony application.

Create your Transport Factory

You need to give FrameworkBundle the opportunity to create your transport from a DSN. You will need a transport factory:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class YourTransportFactory implements TransportFactoryInterface
{
    public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
    {
        return new YourTransport(/* ... */);
    }

    public function supports(string $dsn, array $options): bool
    {
        return 0 === strpos($dsn, 'my-transport://');
    }
}

The transport object needs to implement the TransportInterface (which combines the SenderInterface and ReceiverInterface). Here is a simplified example of a database transport:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Uid\Uuid;

class YourTransport implements TransportInterface
{
    private SerializerInterface $serializer;

    /**
     * @param FakeDatabase $db is used for demo purposes. It is not a real class.
     */
    public function __construct(
        private FakeDatabase $db,
        ?SerializerInterface $serializer = null,
    ) {
        $this->serializer = $serializer ?? new PhpSerializer();
    }

    public function get(): iterable
    {
        // Get a message from "my_queue"
        $row = $this->db->createQuery(
                'SELECT *
                FROM my_queue
                WHERE (delivered_at IS NULL OR delivered_at < :redeliver_timeout)
                AND handled = FALSE'
            )
            ->setParameter('redeliver_timeout', new DateTimeImmutable('-5 minutes'))
            ->getOneOrNullResult();

        if (null === $row) {
            return [];
        }

        $envelope = $this->serializer->decode([
            'body' => $row['envelope'],
        ]);

        return [$envelope->with(new TransportMessageIdStamp($row['id']))];
    }

    public function ack(Envelope $envelope): void
    {
        $stamp = $envelope->last(TransportMessageIdStamp::class);
        if (!$stamp instanceof TransportMessageIdStamp) {
            throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
        }

        // Mark the message as "handled"
        $this->db->createQuery('UPDATE my_queue SET handled = TRUE WHERE id = :id')
            ->setParameter('id', $stamp->getId())
            ->execute();
    }

    public function reject(Envelope $envelope): void
    {
        $stamp = $envelope->last(TransportMessageIdStamp::class);
        if (!$stamp instanceof TransportMessageIdStamp) {
            throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
        }

        // Delete the message from the "my_queue" table
        $this->db->createQuery('DELETE FROM my_queue WHERE id = :id')
            ->setParameter('id', $stamp->getId())
            ->execute();
    }

    public function send(Envelope $envelope): Envelope
    {
        $encodedMessage = $this->serializer->encode($envelope);
        $uuid = (string) Uuid::v4();
        // Add a message to the "my_queue" table
        $this->db->createQuery(
                'INSERT INTO my_queue (id, envelope, delivered_at, handled)
                VALUES (:id, :envelope, NULL, FALSE)'
            )
            ->setParameters([
                'id' => $uuid,
                'envelope' => $encodedMessage['body'],
            ])
            ->execute();

        return $envelope->with(new TransportMessageIdStamp($uuid));
    }
}

The implementation above is not runnable code but illustrates how a TransportInterface could be implemented. For real implementations see InMemoryTransport and DoctrineReceiver.

Register your Factory

Before using your factory, you must register it. If you're using the default services.yaml configuration, this is already done for you, thanks to autoconfiguration. Otherwise, add the following:

1
2
3
4
# config/services.yaml
services:
    Your\Transport\YourTransportFactory:
        tags: [messenger.transport_factory]
1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- config/services.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://symfony.com/schema/dic/services
        https://symfony.com/schema/dic/services/services-1.0.xsd">

    <services>
        <service id="Your\Transport\YourTransportFactory">
           <tag name="messenger.transport_factory"/>
        </service>
    </services>
</container>
1
2
3
4
5
// config/services.php
use Your\Transport\YourTransportFactory;

$container->register(YourTransportFactory::class)
    ->setTags(['messenger.transport_factory']);

Use your Transport

Within the framework.messenger.transports.* configuration, create your named transport using your own DSN:

1
2
3
4
5
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            yours: 'my-transport://...'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:framework="http://symfony.com/schema/dic/symfony"
    xsi:schemaLocation="http://symfony.com/schema/dic/services
        https://symfony.com/schema/dic/services/services-1.0.xsd
        http://symfony.com/schema/dic/symfony
        https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

    <framework:config>
        <framework:messenger>
            <framework:transport name="yours" dsn="my-transport://..."/>
        </framework:messenger>
    </framework:config>
</container>
1
2
3
4
5
6
7
8
9
// config/packages/messenger.php
use Symfony\Config\FrameworkConfig;

return static function (FrameworkConfig $framework): void {
    $framework->messenger()
        ->transport('yours')
            ->dsn('my-transport://...')
    ;
};

In addition of being able to route your messages to the yours sender, this will give you access to the following services:

  1. messenger.sender.yours: the sender;
  2. messenger.receiver.yours: the receiver.
This work, including the code samples, is licensed under a Creative Commons BY-SA 3.0 license.
TOC
    Version

    Symfony 7.1 is backed by

    Peruse our complete Symfony & PHP solutions catalog for your web development needs.

    Peruse our complete Symfony & PHP solutions catalog for your web development needs.

    Make sure your project is risk free

    Make sure your project is risk free

    Version:

    Table of Contents

    • Create your Transport Factory
    • Register your Factory
    • Use your Transport

    Symfony footer

    Avatar of Mikkel Paulson, a Symfony contributor

    Thanks Mikkel Paulson for being a Symfony contributor

    1 commit • 46 lines changed

    View all contributors that help us make Symfony

    Become a Symfony contributor

    Be an active part of the community and contribute ideas, code and bug fixes. Both experts and newcomers are welcome.

    Learn how to contribute

    Symfony™ is a trademark of Symfony SAS. All rights reserved.

    • What is Symfony?

      • What is Symfony?
      • Symfony at a Glance
      • Symfony Components
      • Symfony Releases
      • Security Policy
      • Logo & Screenshots
      • Trademark & Licenses
      • symfony1 Legacy
    • Learn Symfony

      • Symfony Docs
      • Symfony Book
      • Reference
      • Bundles
      • Best Practices
      • Training
      • eLearning Platform
      • Certification
    • Screencasts

      • Learn Symfony
      • Learn PHP
      • Learn JavaScript
      • Learn Drupal
      • Learn RESTful APIs
    • Community

      • Symfony Community
      • SymfonyConnect
      • Events & Meetups
      • Projects using Symfony
      • Contributors
      • Symfony Jobs
      • Backers
      • Code of Conduct
      • Downloads Stats
      • Support
    • Blog

      • All Blog Posts
      • A Week of Symfony
      • Case Studies
      • Cloud
      • Community
      • Conferences
      • Diversity
      • Living on the edge
      • Releases
      • Security Advisories
      • Symfony Insight
      • Twig
      • SensioLabs Blog
    • Services

      • SensioLabs services
      • Train developers
      • Manage your project quality
      • Improve your project performance
      • Host Symfony projects

      Powered by

    Follow Symfony